From d7ab6935ddec866b612a702d960e13db15f30aeb Mon Sep 17 00:00:00 2001 From: Cadenza-Li <362237642@qq.com> Date: Wed, 12 Jul 2023 17:23:47 +0800 Subject: [PATCH 1/3] update knowledge module; * add storage class; * new practice,execute,finance,infrastructure knowledge; * add query method to KnowledgeBase; --- qlib/finco/knowledge.py | 347 +++++++++++++++++++++++++++++++++------- qlib/finco/workflow.py | 13 +- 2 files changed, 294 insertions(+), 66 deletions(-) diff --git a/qlib/finco/knowledge.py b/qlib/finco/knowledge.py index 1c312d3fb..9a781bdd8 100644 --- a/qlib/finco/knowledge.py +++ b/qlib/finco/knowledge.py @@ -1,10 +1,107 @@ from pathlib import Path from jinja2 import Template -from typing import List +from typing import List, Union +import pickle +import yaml +import inspect from qlib.workflow import R from qlib.finco.log import FinCoLog from qlib.finco.llm import APIBackend +from qlib.finco.utils import similarity + + +class Storage: + """ + This class is responsible for storage and loading of Knowledge related data. + + """ + + def __init__(self, path: Union[str, Path]): + self.path = path if isinstance(path, Path) else Path(path) + self.logger = FinCoLog() + self.source = None + + # todo: get document by key + self.documents = [] + + def add(self, documents: List): + self.documents.extend(documents) + self.save() + + def load(self, **kwargs): + raise NotImplementedError(f"Please implement the `load` method.") + + def save(self, **kwargs): + raise NotImplementedError(f"Please implement the `save` method.") + + +class PickleStorage(Storage): + """ + This class is responsible for storage and loading of Knowledge related data in pickle format. + + """ + + def __init__(self, path: Union[str, Path]): + super().__init__(path) + + @classmethod + def load(cls, path: Union[str, Path]): + """use pickle as the default load method""" + path = path if isinstance(path, Path) else Path(path) + with open(path, "rb") as f: + return pickle.load(f) + + def save(self, **kwargs): + """use pickle as the default save method""" + with open(self.path, "wb") as f: + pickle.dump(self, f) + + +class YamlStorage(Storage): + """ + This class is responsible for storage and loading of Knowledge related data in yaml format. + + """ + + def __init__(self, path: Union[str, Path]): + super().__init__(path) + self.load() + + def load(self): + """load data from yaml format file""" + try: + self.documents = yaml.load(open(self.path, "r"), Loader=yaml.FullLoader) + except FileNotFoundError: + self.logger.warning(f"YamlStorage: file {self.path} doesn't exist.") + + def save(self, **kwargs): + """use pickle as the default save method""" + with open(self.path, 'w') as f: + yaml.dump(self.documents, f) + + +class ExperimentStorage(Storage): + """ + This class is responsible for storage and loading of mlflow related data. + + """ + + def __init__(self, exp_name, path=None): + super().__init__(path=path) + self.exp_name = exp_name + self.exp = None + self.recs = [] + self.docs = [] + + def load(self, exp_name, rec_id=None): + recs = [] + self.exp = R.get_exp(experiment_name=exp_name) + for r in self.exp.list_recorders(rtype=self.exp.RT_L): + if rec_id is not None and r.id != rec_id: + continue + recs.append(r) + self.recs.extend(recs) class Knowledge: @@ -12,8 +109,23 @@ class Knowledge: Use to handle knowledge in finCo such as experiment and outside domain information """ - def __init__(self): + def __init__(self, storage: Storage): self.logger = FinCoLog() + self.storage = storage + self.knowledge = [] + + def summarize(self, **kwargs): + """ + summarize storage data to knowledge, default knowledge is storage.documents + + Parameters + ---------- + + Return + ------ + """ + + self.knowledge = self.storage.documents def load(self, **kwargs): """ @@ -39,39 +151,130 @@ class Knowledge: """ raise NotImplementedError(f"Please implement the `load` method.") + def save(self, **kwargs): + """save knowledge persistently""" + self.storage.save(**kwargs) -class KnowledgeExperiment(Knowledge): + +class ExperimentKnowledge(Knowledge): """ Handle knowledge from experiments """ - def __init__(self, exp_name, rec_id=None): - super().__init__() - self.exp_name = exp_name - self.exp = None - self.recs = [] - - self.load(exp_name=exp_name, rec_id=rec_id) - - def load(self, exp_name, rec_id=None): - recs = [] - self.exp = R.get_exp(experiment_name=exp_name) - for r in self.exp.list_recorders(rtype=self.exp.RT_L): - if rec_id is not None and r.id != rec_id: - continue - recs.append(r) - self.recs.extend(recs) + def __init__(self, storage: ExperimentStorage): + super().__init__(storage=storage) + self.storage = storage def brief(self): docs = [] - for recorder in self.recs: - docs.append({"exp_name": self.exp.name, "record_info": recorder.info, + for recorder in self.storage.recs: + docs.append({"exp_name": self.storage.exp.name, "record_info": recorder.info, "config": recorder.load_object("config"), "context_summary": recorder.load_object("context_summary")}) - return docs +class PracticeKnowledge(Knowledge): + """ + some template sentence for now + """ + + def __init__(self, storage: YamlStorage): + super().__init__(storage=storage) + + self.summarize() + + def add(self, docs: List): + self.storage.add(docs) + self.summarize() + + self.save() + + +class FinanceKnowledge(Knowledge): + """ + Knowledge from articles + """ + + def __init__(self, storage: YamlStorage): + super().__init__(storage=storage) + if len(self.storage.documents) == 0: + docs = self.read_files_in_directory(self.storage.path) + self.add(docs) + self.summarize() + + def add(self, docs: List): + self.storage.add(docs) + self.summarize() + + self.save() + + @staticmethod + def read_files_in_directory(directory): + """ + read all .txt files under directory + """ + # todo: split article in trunks + file_contents = [] + for file_path in Path(directory).rglob("*.txt"): + if file_path.is_file(): + file_content = file_path.read_text(encoding="utf-8") + file_contents.append(file_content) + return file_contents + + +class ExecuteKnowledge(Knowledge): + """ + Config and associate execution result(pass or error message). We can regard the example in prompt as pass execution + """ + + def __init__(self, storage: YamlStorage): + super().__init__(storage=storage) + self.summarize() + + def add(self, docs: List): + self.storage.add(docs) + self.summarize() + + self.save() + + +class InfrastructureKnowledge(Knowledge): + """ + Knowledge from sentences, docstring, and code + """ + + def __init__(self, storage: YamlStorage): + super().__init__(storage=storage) + + if len(self.storage.documents) == 0: + # todo: change the path to qlib root path + docs = self.get_functions_and_docstrings(Path.cwd().parent) + self.add(docs) + + def add(self, docs: List): + self.storage.add(docs) + self.summarize() + + self.save() + + @staticmethod + def get_functions_and_docstrings(directory): + """ + get all method and docstring in .py files under directory + """ + functions = [] + for file_path in Path(directory).rglob("*.py"): + with file_path.open("r", encoding="utf-8") as f: + lines = f.readlines() + for line in lines: + if line.startswith("def "): + function_name = line.split("(")[0][4:].strip() + function_docstring = inspect.getdoc(eval(function_name)) + functions.append({"function_name": function_name, "docstring": function_docstring}) + return functions + + class Topic: def __init__(self, name: str, describe: Template): @@ -97,60 +300,84 @@ class KnowledgeBase: Load knowledge, offer brief information of knowledge and common handle interfaces """ - def __init__(self, init_path=None, topics: List[Topic] = None): + KT_EXECUTE = "execute" + KT_PRACTICE = "practice" + KT_FINANCE = "finance" + + def __init__(self, init_path=None): self.logger = FinCoLog() - init_path = init_path if init_path else Path.cwd() + self.init_path = Path(init_path) if init_path else Path.cwd() - if not init_path.exists(): - self.logger.warning(f"{init_path} not exist, create empty directory.") - Path.mkdir(init_path) + if not self.init_path.exists(): + self.logger.warning(f"{self.init_path} not exist, create empty directory.") + Path.mkdir(self.init_path) - self.knowledge = self.load(path=init_path) + self.practice_knowledge = self.load_practice_knowledge(self.init_path) + self.execute_knowledge = self.load_execute_knowledge(self.init_path) + self.finance_knowledge = self.load_finance_knowledge(self.init_path) - # todo: replace list with persistent storage strategy such as ES/pinecone to enable - # literal search/semantic search - self.docs = self.brief(knowledge=self.knowledge) - - self.topics = topics if topics else [] - - def load(self, path) -> List: + def load_experiment_knowledge(self, path) -> List: + # similar to practice knowledge, not use for now if isinstance(path, str): path = Path(path) knowledge = [] path = path if path.name == "mlruns" else path.joinpath("mlruns") + # todo: check the influence of set uri R.set_uri(path.as_uri()) for exp_name in R.list_experiments(): - knowledge.append(KnowledgeExperiment(exp_name=exp_name)) + knowledge.append(ExperimentKnowledge(storage=ExperimentStorage(exp_name=exp_name))) self.logger.plain_info(f"Load knowledge from: {path} finished.") return knowledge - def update(self, path): - # note: only update new knowledge in future - knowledge = self.load(path) - self.knowledge = knowledge - self.docs = self.brief(self.knowledge) - self.logger.plain_info(f"Update knowledge finished.") + def load_practice_knowledge(self, path: Path) -> PracticeKnowledge: + self.practice_knowledge = PracticeKnowledge(YamlStorage(path.joinpath("practice_knowledge.yaml"))) + return self.practice_knowledge - def brief(self, knowledge: List[Knowledge]) -> List: - docs = [] + def load_execute_knowledge(self, path: Path) -> ExecuteKnowledge: + self.execute_knowledge = ExecuteKnowledge(YamlStorage(path.joinpath("execute_knowledge.yaml"))) + return self.execute_knowledge + + def load_finance_knowledge(self, path: Path) -> FinanceKnowledge: + self.finance_knowledge = FinanceKnowledge(YamlStorage(path.joinpath("finance_knowledge.yaml"))) + return self.finance_knowledge + + def knowledge(self, knowledge_type: str = None): + if knowledge_type == self.KT_EXECUTE: + knowledge = self.execute_knowledge + elif knowledge_type == self.KT_PRACTICE: + knowledge = self.practice_knowledge + elif knowledge_type == self.KT_FINANCE: + knowledge = self.finance_knowledge + else: + knowledge = self.execute_knowledge.knowledge + self.practice_knowledge.knowledge \ + + self.finance_knowledge.knowledge + return knowledge + + def query(self, knowledge_type: str = None, content: str = None, n: int = 5): + """ + + @param knowledge_type: self.KT_EXECUTE, self.KT_PRACTICE or self.KT_FINANCE + @param content: content to query KnowledgeBase + @param n: top n knowledge to ask ChatGPT + @return: + """ + # todo: replace list with persistent storage strategy such as ES/pinecone to enable + # literal search/semantic search + + knowledge = self.knowledge(knowledge_type=knowledge_type) + scores = [] for k in knowledge: - docs.extend(k.brief()) + scores.append(similarity(str(k), content)) + sorted_indexes = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True) + similar_n_indexes = sorted_indexes[:n] + similar_n_docs = [knowledge[i] for i in similar_n_indexes] - self.logger.plain_info(f"Generate brief knowledge summary finished.") - return docs + prompt = Template("""summarize this information: '{{docs}}'""") + prompt_workflow_selection = prompt.render(docs=similar_n_docs) + response = APIBackend().build_messages_and_create_chat_completion( + user_prompt=prompt_workflow_selection + ) - def query(self, content: str = None): - # todo: query by DSL - return self.docs - - def query_topics(self): - knowledge_of_topics = [] - for topic in self.topics: - knowledge_of_topics.append({topic.name: topic.knowledge}) - return knowledge_of_topics - - def summarize_by_topic(self): - for topic in self.topics: - topic.summarize(self.docs) + return response diff --git a/qlib/finco/workflow.py b/qlib/finco/workflow.py index 324961b30..b6d00f966 100644 --- a/qlib/finco/workflow.py +++ b/qlib/finco/workflow.py @@ -174,16 +174,14 @@ class LearnManager: self.epoch = 0 self.wm = WorkflowManager() - topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in - self.__DEFAULT_TOPICS] - self.knowledge_base = KnowledgeBase(init_path=Path.cwd().joinpath('knowledge'), topics=topics) + self.topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in + self.__DEFAULT_TOPICS] + self.knowledge_base = KnowledgeBase(init_path=Path.cwd().joinpath('knowledge')) def run(self, prompt): # todo: add early stop condition for i in range(10): self.wm.run(prompt) - self.knowledge_base.update(self.wm._workspace) - self.knowledge_base.summarize_by_topic() self.learn() self.epoch += 1 @@ -204,9 +202,12 @@ class LearnManager: user_prompt = self.wm.context.get_context("user_prompt") summary = self.wm.context.get_context("summary") + [topic.summarize(self.knowledge_base.knowledge()) for topic in self.topics] + knowledge_of_topics = [{topic.name: topic.knowledge} for topic in self.topics] + for task in task_finished: prompt_workflow_selection = self.wm.prompt_template.get(f"{self.__class__.__name__}_user").render( - summary=summary, brief=self.knowledge_base.query_topics(), + summary=summary, brief=knowledge_of_topics, task_finished=[str(t) for t in task_finished], task=task.__class__.__name__, system=task.system.render(), user_prompt=user_prompt ) From 37d83fd74731d479081407c962854385b58f116c Mon Sep 17 00:00:00 2001 From: Cadenza-Li <362237642@qq.com> Date: Thu, 13 Jul 2023 17:20:22 +0800 Subject: [PATCH 2/3] update knowledge module; * Knowledge.storage to storages list; * optimize Knowledge & Storage save and load method; * optimize Knowledge query prompt; --- qlib/finco/knowledge.py | 205 +++++++++++++++++++++++++++------------- qlib/finco/utils.py | 7 ++ qlib/finco/workflow.py | 6 +- 3 files changed, 150 insertions(+), 68 deletions(-) diff --git a/qlib/finco/knowledge.py b/qlib/finco/knowledge.py index 9a781bdd8..6920b6295 100644 --- a/qlib/finco/knowledge.py +++ b/qlib/finco/knowledge.py @@ -3,12 +3,13 @@ from jinja2 import Template from typing import List, Union import pickle import yaml -import inspect from qlib.workflow import R from qlib.finco.log import FinCoLog from qlib.finco.llm import APIBackend -from qlib.finco.utils import similarity +from qlib.finco.utils import similarity, random_string + +logger = FinCoLog() class Storage: @@ -17,9 +18,9 @@ class Storage: """ - def __init__(self, path: Union[str, Path]): + def __init__(self, path: Union[str, Path], name: str = None): self.path = path if isinstance(path, Path) else Path(path) - self.logger = FinCoLog() + self.name = name if name else self.path.name self.source = None # todo: get document by key @@ -54,6 +55,7 @@ class PickleStorage(Storage): def save(self, **kwargs): """use pickle as the default save method""" + Path.mkdir(self.path.parent, exist_ok=True) with open(self.path, "wb") as f: pickle.dump(self, f) @@ -63,9 +65,11 @@ class YamlStorage(Storage): This class is responsible for storage and loading of Knowledge related data in yaml format. """ + DEFAULT_NAME = "storage.yml" def __init__(self, path: Union[str, Path]): super().__init__(path) + assert self.path.name, "Yaml storage should specify file name." self.load() def load(self): @@ -73,10 +77,11 @@ class YamlStorage(Storage): try: self.documents = yaml.load(open(self.path, "r"), Loader=yaml.FullLoader) except FileNotFoundError: - self.logger.warning(f"YamlStorage: file {self.path} doesn't exist.") + logger.warning(f"YamlStorage: file {self.path} doesn't exist.") def save(self, **kwargs): """use pickle as the default save method""" + Path.mkdir(self.path.parent, exist_ok=True) with open(self.path, 'w') as f: yaml.dump(self.documents, f) @@ -109,11 +114,21 @@ class Knowledge: Use to handle knowledge in finCo such as experiment and outside domain information """ - def __init__(self, storage: Storage): - self.logger = FinCoLog() - self.storage = storage + def __init__(self, storages: Union[List[Storage], Storage], name: str = None): + self.name = name if name else random_string() + self.workdir = Path.cwd().joinpath("knowledge") + self.storages = [storages] if isinstance(storages, Storage) else storages self.knowledge = [] + def get_storage(self, name: str): + """ + return first storage matched given name, else return None + """ + for storage in self.storages: + if storage.name == name: + return storage + return None + def summarize(self, **kwargs): """ summarize storage data to knowledge, default knowledge is storage.documents @@ -124,20 +139,24 @@ class Knowledge: Return ------ """ + for storage in self.storages: + self.knowledge.extend(storage.documents) - self.knowledge = self.storage.documents - - def load(self, **kwargs): + @classmethod + def load(cls, path: Union[str, Path]): """ Load knowledge in memory - + use pickle as the default file type Parameters ---------- Return ------ """ - raise NotImplementedError(f"Please implement the `load` method.") + """""" + path = path if isinstance(path, Path) else Path(path) + with open(path, "rb") as f: + return pickle.load(f) def brief(self, **kwargs): """ @@ -153,7 +172,10 @@ class Knowledge: def save(self, **kwargs): """save knowledge persistently""" - self.storage.save(**kwargs) + # todo: storages save index only + Path.mkdir(self.workdir.joinpath(self.name), exist_ok=True) + with open(self.workdir.joinpath(self.name).joinpath("knowledge.pkl"), "wb") as f: + pickle.dump(self, f) class ExperimentKnowledge(Knowledge): @@ -161,9 +183,9 @@ class ExperimentKnowledge(Knowledge): Handle knowledge from experiments """ - def __init__(self, storage: ExperimentStorage): - super().__init__(storage=storage) - self.storage = storage + def __init__(self, storages: Union[List[ExperimentStorage], ExperimentStorage]): + super().__init__(storages=storages) + self.storage = storages def brief(self): docs = [] @@ -179,13 +201,15 @@ class PracticeKnowledge(Knowledge): some template sentence for now """ - def __init__(self, storage: YamlStorage): - super().__init__(storage=storage) + def __init__(self, storages: Union[List[YamlStorage], YamlStorage]): + super().__init__(storages=storages, name="practice") self.summarize() def add(self, docs: List): - self.storage.add(docs) + storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME)) + storage.add(documents=docs) + self.storages.append(storage) self.summarize() self.save() @@ -196,15 +220,17 @@ class FinanceKnowledge(Knowledge): Knowledge from articles """ - def __init__(self, storage: YamlStorage): - super().__init__(storage=storage) - if len(self.storage.documents) == 0: - docs = self.read_files_in_directory(self.storage.path) - self.add(docs) + def __init__(self, storages: Union[List[YamlStorage], YamlStorage]): + super().__init__(storages=storages, name="finance") + + docs = self.read_files_in_directory(self.workdir.joinpath(self.name)) + self.add(docs) self.summarize() def add(self, docs: List): - self.storage.add(docs) + storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME)) + storage.add(documents=docs) + self.storages.append(storage) self.summarize() self.save() @@ -228,12 +254,14 @@ class ExecuteKnowledge(Knowledge): Config and associate execution result(pass or error message). We can regard the example in prompt as pass execution """ - def __init__(self, storage: YamlStorage): - super().__init__(storage=storage) + def __init__(self, storages: Union[List[YamlStorage], YamlStorage]): + super().__init__(storages=storages, name="execute") self.summarize() def add(self, docs: List): - self.storage.add(docs) + storage = YamlStorage(path=self.workdir.joinpath(YamlStorage.DEFAULT_NAME)) + storage.add(documents=docs) + self.storages.append(storage) self.summarize() self.save() @@ -244,34 +272,65 @@ class InfrastructureKnowledge(Knowledge): Knowledge from sentences, docstring, and code """ - def __init__(self, storage: YamlStorage): - super().__init__(storage=storage) + def __init__(self, storages: Union[List[YamlStorage], YamlStorage]): + super().__init__(storages=storages, name="infrastructure") - if len(self.storage.documents) == 0: - # todo: change the path to qlib root path - docs = self.get_functions_and_docstrings(Path.cwd().parent) + storage = self.get_storage(YamlStorage.DEFAULT_NAME) + if len(storage.documents) == 0: + docs = self.get_functions_and_docstrings(Path(__file__).parent.parent.parent) self.add(docs) def add(self, docs: List): - self.storage.add(docs) + storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME)) + storage.add(documents=docs) + self.storages.append(storage) self.summarize() self.save() - @staticmethod - def get_functions_and_docstrings(directory): + def get_functions_and_docstrings(self, directory): """ get all method and docstring in .py files under directory + """ functions = [] - for file_path in Path(directory).rglob("*.py"): - with file_path.open("r", encoding="utf-8") as f: - lines = f.readlines() - for line in lines: - if line.startswith("def "): - function_name = line.split("(")[0][4:].strip() - function_docstring = inspect.getdoc(eval(function_name)) - functions.append({"function_name": function_name, "docstring": function_docstring}) + for py_file_path in Path(directory).rglob('*.py'): + for _functions in self.get_functions_with_docstrings(py_file_path): + functions.append(_functions) + + return functions + + @staticmethod + def get_functions_with_docstrings(file_path): + """ + Extract method name and docstring using string matching method + """ + with open(file_path, "r", encoding="utf8") as f: + lines = f.readlines() + + functions = [] + current_func = None + docstring = None + for line in lines: + if line.strip().startswith("def ") or line.strip().startswith("class "): + func = line.strip().split(' ')[1].split('(')[0] + if func.startswith("__"): + continue + if current_func is not None: + docstring = docstring.replace('"""', "") if docstring else docstring + functions.append({"function": current_func, "docstring": docstring}) + current_func = f"{file_path.name.split('.')[0]}.{func}" + docstring = None + elif current_func is not None and docstring is None and line.strip().startswith('"""'): + docstring = line + elif current_func is not None and docstring is not None: + docstring += line.strip() + if line.strip().endswith('"""'): + docstring = docstring.replace('"""', "") if docstring else docstring + functions.append({"function": current_func, "docstring": docstring}) + current_func = None + docstring = None + return functions @@ -303,18 +362,20 @@ class KnowledgeBase: KT_EXECUTE = "execute" KT_PRACTICE = "practice" KT_FINANCE = "finance" + KT_INFRASTRUCTURE = "infrastructure" - def __init__(self, init_path=None): + def __init__(self, workdir=None): self.logger = FinCoLog() - self.init_path = Path(init_path) if init_path else Path.cwd() + self.workdir = Path(workdir) if workdir else Path.cwd() - if not self.init_path.exists(): - self.logger.warning(f"{self.init_path} not exist, create empty directory.") - Path.mkdir(self.init_path) + if not self.workdir.exists(): + self.logger.warning(f"{self.workdir} not exist, create empty directory.") + Path.mkdir(self.workdir) - self.practice_knowledge = self.load_practice_knowledge(self.init_path) - self.execute_knowledge = self.load_execute_knowledge(self.init_path) - self.finance_knowledge = self.load_finance_knowledge(self.init_path) + self.practice_knowledge = self.load_practice_knowledge(self.workdir) + self.execute_knowledge = self.load_execute_knowledge(self.workdir) + self.finance_knowledge = self.load_finance_knowledge(self.workdir) + self.infrastructure_knowledge = self.load_infrastructure_knowledge(self.workdir) def load_experiment_knowledge(self, path) -> List: # similar to practice knowledge, not use for now @@ -326,33 +387,43 @@ class KnowledgeBase: # todo: check the influence of set uri R.set_uri(path.as_uri()) for exp_name in R.list_experiments(): - knowledge.append(ExperimentKnowledge(storage=ExperimentStorage(exp_name=exp_name))) + knowledge.append(ExperimentKnowledge(storages=ExperimentStorage(exp_name=exp_name))) self.logger.plain_info(f"Load knowledge from: {path} finished.") return knowledge def load_practice_knowledge(self, path: Path) -> PracticeKnowledge: - self.practice_knowledge = PracticeKnowledge(YamlStorage(path.joinpath("practice_knowledge.yaml"))) + self.practice_knowledge = PracticeKnowledge( + YamlStorage(path.joinpath(f"{self.KT_PRACTICE}/{YamlStorage.DEFAULT_NAME}"))) return self.practice_knowledge def load_execute_knowledge(self, path: Path) -> ExecuteKnowledge: - self.execute_knowledge = ExecuteKnowledge(YamlStorage(path.joinpath("execute_knowledge.yaml"))) + self.execute_knowledge = ExecuteKnowledge( + YamlStorage(path.joinpath(f"{self.KT_EXECUTE}/{YamlStorage.DEFAULT_NAME}"))) return self.execute_knowledge def load_finance_knowledge(self, path: Path) -> FinanceKnowledge: - self.finance_knowledge = FinanceKnowledge(YamlStorage(path.joinpath("finance_knowledge.yaml"))) + self.finance_knowledge = FinanceKnowledge( + YamlStorage(path.joinpath(f"{self.KT_FINANCE}/{YamlStorage.DEFAULT_NAME}"))) return self.finance_knowledge - def knowledge(self, knowledge_type: str = None): + def load_infrastructure_knowledge(self, path: Path) -> InfrastructureKnowledge: + self.infrastructure_knowledge = InfrastructureKnowledge( + YamlStorage(path.joinpath(f"{self.KT_INFRASTRUCTURE}/{YamlStorage.DEFAULT_NAME}"))) + return self.infrastructure_knowledge + + def get_knowledge(self, knowledge_type: str = None): if knowledge_type == self.KT_EXECUTE: - knowledge = self.execute_knowledge + knowledge = self.execute_knowledge.knowledge elif knowledge_type == self.KT_PRACTICE: - knowledge = self.practice_knowledge + knowledge = self.practice_knowledge.knowledge elif knowledge_type == self.KT_FINANCE: - knowledge = self.finance_knowledge + knowledge = self.finance_knowledge.knowledge + elif knowledge_type == self.KT_INFRASTRUCTURE: + knowledge = self.infrastructure_knowledge.knowledge else: knowledge = self.execute_knowledge.knowledge + self.practice_knowledge.knowledge \ - + self.finance_knowledge.knowledge + + self.finance_knowledge.knowledge + self.infrastructure_knowledge.knowledge return knowledge def query(self, knowledge_type: str = None, content: str = None, n: int = 5): @@ -366,7 +437,7 @@ class KnowledgeBase: # todo: replace list with persistent storage strategy such as ES/pinecone to enable # literal search/semantic search - knowledge = self.knowledge(knowledge_type=knowledge_type) + knowledge = self.get_knowledge(knowledge_type=knowledge_type) scores = [] for k in knowledge: scores.append(similarity(str(k), content)) @@ -374,10 +445,12 @@ class KnowledgeBase: similar_n_indexes = sorted_indexes[:n] similar_n_docs = [knowledge[i] for i in similar_n_indexes] - prompt = Template("""summarize this information: '{{docs}}'""") - prompt_workflow_selection = prompt.render(docs=similar_n_docs) + prompt = Template( + """find the most relevant doc with this query: '{{content}}' from docs='{{docs}}'. + Just return the most relevant item I provided, no more explain. For example: {'function': 'config.resolve_path', 'docstring': None}""") + prompt_workflow_selection = prompt.render(content=content, docs=similar_n_docs) response = APIBackend().build_messages_and_create_chat_completion( - user_prompt=prompt_workflow_selection + user_prompt=prompt_workflow_selection, system_prompt="You are an excellent assistant." ) return response diff --git a/qlib/finco/utils.py b/qlib/finco/utils.py index aa4107021..379ef6375 100644 --- a/qlib/finco/utils.py +++ b/qlib/finco/utils.py @@ -1,4 +1,6 @@ import json +import string +import random from fuzzywuzzy import fuzz @@ -36,3 +38,8 @@ def similarity(text1, text2): # Maybe we can use other similarity algorithm such as tfidf return fuzz.ratio(text1, text2) + + +def random_string(length=10): + letters = string.ascii_letters + string.digits + return ''.join(random.choice(letters) for i in range(length)) diff --git a/qlib/finco/workflow.py b/qlib/finco/workflow.py index b6d00f966..3084a2012 100644 --- a/qlib/finco/workflow.py +++ b/qlib/finco/workflow.py @@ -176,7 +176,9 @@ class LearnManager: self.topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in self.__DEFAULT_TOPICS] - self.knowledge_base = KnowledgeBase(init_path=Path.cwd().joinpath('knowledge')) + self.knowledge_base = KnowledgeBase(workdir=Path.cwd().joinpath('knowledge')) + self.knowledge_base.execute_knowledge.add([]) + self.knowledge_base.query(knowledge_type="infrastructure", content="resolve_path") def run(self, prompt): # todo: add early stop condition @@ -202,7 +204,7 @@ class LearnManager: user_prompt = self.wm.context.get_context("user_prompt") summary = self.wm.context.get_context("summary") - [topic.summarize(self.knowledge_base.knowledge()) for topic in self.topics] + [topic.summarize(self.knowledge_base.get_knowledge()) for topic in self.topics] knowledge_of_topics = [{topic.name: topic.knowledge} for topic in self.topics] for task in task_finished: From 8a56cf69b4bad651339bf313529e95bf713db168 Mon Sep 17 00:00:00 2001 From: Cadenza-Li <128388363+Fivele-Li@users.noreply.github.com> Date: Fri, 14 Jul 2023 22:25:43 +0800 Subject: [PATCH 3/3] add KnowledgeBase to workflow; * Update CMDTask prompt example for Windows OS; * Windows OS decode output of subprocess in gbk by default, specify encoding format explict; * Add KnowledgeBase's 4 knowledge types to corresponding task; --- qlib/finco/knowledge.py | 10 +++++-- qlib/finco/prompt_template.yaml | 6 ++++ qlib/finco/task.py | 53 ++++++++++++++++++++++++++------- 3 files changed, 56 insertions(+), 13 deletions(-) diff --git a/qlib/finco/knowledge.py b/qlib/finco/knowledge.py index 6920b6295..7a8633e56 100644 --- a/qlib/finco/knowledge.py +++ b/qlib/finco/knowledge.py @@ -223,9 +223,10 @@ class FinanceKnowledge(Knowledge): def __init__(self, storages: Union[List[YamlStorage], YamlStorage]): super().__init__(storages=storages, name="finance") - docs = self.read_files_in_directory(self.workdir.joinpath(self.name)) - self.add(docs) - self.summarize() + storage = self.get_storage(YamlStorage.DEFAULT_NAME) + if len(storage.documents) == 0: + docs = self.read_files_in_directory(self.workdir.joinpath(self.name)) + self.add(docs) def add(self, docs: List): storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME)) @@ -438,6 +439,9 @@ class KnowledgeBase: # literal search/semantic search knowledge = self.get_knowledge(knowledge_type=knowledge_type) + if len(knowledge) == 0: + return "" + scores = [] for k in knowledge: scores.append(similarity(str(k), content)) diff --git a/qlib/finco/prompt_template.yaml b/qlib/finco/prompt_template.yaml index 2ae4bba41..b15bf8a9c 100644 --- a/qlib/finco/prompt_template.yaml +++ b/qlib/finco/prompt_template.yaml @@ -216,6 +216,12 @@ CMDTask_system : |- Example output: cp -r a/b/c d/e/f + Example input: + - User intention: Copy the folder from a/b/c to d/e/f + - User OS: Windows + Example output: + xcopy /Y /f a/b/c d/e/f + CMDTask_user : |- Example input: - User intention: "{{cmd_intention}}" diff --git a/qlib/finco/task.py b/qlib/finco/task.py index 7951408fe..f92cbed02 100644 --- a/qlib/finco/task.py +++ b/qlib/finco/task.py @@ -9,6 +9,7 @@ import re import subprocess import platform import inspect +from jinja2 import Template from qlib.finco.llm import APIBackend from qlib.finco.tpl import get_tpl_path @@ -17,6 +18,7 @@ from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer from qlib.workflow import R from qlib.finco.log import FinCoLog, LogColors from qlib.finco.conf import Config +from qlib.finco.knowledge import KnowledgeBase, Topic COMPONENT_LIST = ["Dataset", "DataHandler", "Model", "Record", "Strategy", "Backtest"] @@ -176,8 +178,14 @@ class HighLevelPlanTask(PlanTask): assert thinking_detail is not None, "The thinking detail is not provided" assert user_intention is not None, "The user intention is not provided" + practice_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_PRACTICE, content=user_intention) + finance_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_FINANCE, content=user_intention) + system_prompt = self.system.render() - user_prompt = self.user.render(target=target, deliverable=deliverable, business_level=business_level, algorithm_level=algorithm_level, thinking_detail=thinking_detail, user_intention=user_intention) + user_prompt = self.user.render(target=target, deliverable=deliverable, business_level=business_level, + algorithm_level=algorithm_level, thinking_detail=thinking_detail, + practice_knowledge=practice_knowledge, finance_knowledge=finance_knowledge, + user_intention=user_intention) response = APIBackend().build_messages_and_create_chat_completion( user_prompt, system_prompt @@ -229,8 +237,14 @@ class SLPlanTask(PlanTask): experiment_count = max([i for i in range(10) if f"{i}." in experiments]) + infrastructure_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_INFRASTRUCTURE, + content=experiments) + system_prompt = self.system.render() - user_prompt = self.user.render(target=target, deliverable=deliverable, business_level=business_level, algorithm_level=algorithm_level, thinking_detail=thinking_detail, user_intention=user_intention, experiments=experiments) + user_prompt = self.user.render(target=target, deliverable=deliverable, business_level=business_level, + algorithm_level=algorithm_level, thinking_detail=thinking_detail, + infrastructure_knowledge=infrastructure_knowledge, + user_intention=user_intention, experiments=experiments) former_messages = [] if self.replan: @@ -341,11 +355,14 @@ class TrainTask(Task): try: # Run the command and capture the output workspace = self._context_manager.get_context("workspace") - result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, cwd=str(workspace)) + result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, + text=True, encoding="utf8", cwd=str(workspace)) except subprocess.CalledProcessError as e: print(f"An error occurred while running the subprocess: {e.stderr} {e.stdout}") real_error = e.stderr+e.stdout + KnowledgeBase().execute_knowledge.add([real_error]) + if "data" in e.stdout.lower() or "handler" in e.stdout.lower(): return [HyperparameterActionTask("Dataset", regenerate=True, error=real_error), HyperparameterActionTask("DataHandler", regenerate=True, error=real_error), @@ -432,11 +449,9 @@ class AnalysisTask(Task): else "workflow_config.yaml" ) workspace = self._context_manager.get_context("workspace") - workflow_path = workspace.joinpath(workflow_config) - with workflow_path.open() as f: - workflow = yaml.safe_load(f) - experiment_name = workflow["experiment_name"] if "experiment_name" in workflow else "workflow" + # todo: analysis multi experiment(get recorder by id) + experiment_name = "workflow" R.set_uri(Path.joinpath(workspace, 'mlruns').as_uri()) tasks = [] @@ -650,11 +665,19 @@ class HyperparameterActionTask(ActionTask): hyperparameters.remove("dataset") hyperparameters.remove("recorder") target_component_classes_and_hyperparameters.append((module_path, class_name, hyperparameters)) + + execute_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_EXECUTE, + content=target_component_plan) + infrastructure_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_INFRASTRUCTURE, + content=target_component_plan) + user_prompt = self.user.render( user_requirement=user_prompt, target_component_plan=target_component_plan, target_component=self.target_component, - target_component_classes_and_hyperparameters=target_component_classes_and_hyperparameters + target_component_classes_and_hyperparameters=target_component_classes_and_hyperparameters, + execute_knowledge=execute_knowledge, + infrastructure_knowledge=infrastructure_knowledge ) former_messages = [] if self.regenerate: @@ -987,7 +1010,9 @@ class SummarizeTask(Task): file_info = self.get_info_from_file(workspace) context_info = self.get_info_from_context() # too long context make response unstable. - record_info = self.get_info_from_recorder(workspace, workflow_yaml["experiment_name"]) + + # todo: experiments perhaps have the same name, summarize experiment by loop + record_info = self.get_info_from_recorder(workspace, "workflow") figure_path = self.get_figure_path(workspace) information = context_info + file_info + record_info @@ -1012,7 +1037,7 @@ class SummarizeTask(Task): ) context_summary.update({key: response}) - recorder = R.get_recorder(experiment_name=workflow_yaml["experiment_name"]) + recorder = R.get_recorder(experiment_name="workflow") recorder.save_objects(context_summary=context_summary) prompt_workflow_selection = self.summarize_metrics_user.render( @@ -1029,6 +1054,14 @@ class SummarizeTask(Task): user_prompt=prompt_workflow_selection, system_prompt=self.system.render() ) + KnowledgeBase().practice_knowledge.add([{"user_intention": user_prompt, + "experiment_metrics": metrics_response}]) + + # notes: summarize after all experiment added to KnowledgeBase + topic = Topic(name="rollingModel", describe=Template("What conclusion can you draw")) + topic.summarize(KnowledgeBase().practice_knowledge.knowledge) + self.logger.info(f"Summary of topic: {topic.name}: {topic.knowledge}") + self._context_manager.set_context("summary", response) self.save_markdown(content=response, path=workspace) self.logger.info(f"Report has saved to {self.__DEFAULT_REPORT_NAME}", title="End")