From d7ab6935ddec866b612a702d960e13db15f30aeb Mon Sep 17 00:00:00 2001 From: Cadenza-Li <362237642@qq.com> Date: Wed, 12 Jul 2023 17:23:47 +0800 Subject: [PATCH] update knowledge module; * add storage class; * new practice,execute,finance,infrastructure knowledge; * add query method to KnowledgeBase; --- qlib/finco/knowledge.py | 347 +++++++++++++++++++++++++++++++++------- qlib/finco/workflow.py | 13 +- 2 files changed, 294 insertions(+), 66 deletions(-) diff --git a/qlib/finco/knowledge.py b/qlib/finco/knowledge.py index 1c312d3fb..9a781bdd8 100644 --- a/qlib/finco/knowledge.py +++ b/qlib/finco/knowledge.py @@ -1,10 +1,107 @@ from pathlib import Path from jinja2 import Template -from typing import List +from typing import List, Union +import pickle +import yaml +import inspect from qlib.workflow import R from qlib.finco.log import FinCoLog from qlib.finco.llm import APIBackend +from qlib.finco.utils import similarity + + +class Storage: + """ + This class is responsible for storage and loading of Knowledge related data. + + """ + + def __init__(self, path: Union[str, Path]): + self.path = path if isinstance(path, Path) else Path(path) + self.logger = FinCoLog() + self.source = None + + # todo: get document by key + self.documents = [] + + def add(self, documents: List): + self.documents.extend(documents) + self.save() + + def load(self, **kwargs): + raise NotImplementedError(f"Please implement the `load` method.") + + def save(self, **kwargs): + raise NotImplementedError(f"Please implement the `save` method.") + + +class PickleStorage(Storage): + """ + This class is responsible for storage and loading of Knowledge related data in pickle format. + + """ + + def __init__(self, path: Union[str, Path]): + super().__init__(path) + + @classmethod + def load(cls, path: Union[str, Path]): + """use pickle as the default load method""" + path = path if isinstance(path, Path) else Path(path) + with open(path, "rb") as f: + return pickle.load(f) + + def save(self, **kwargs): + """use pickle as the default save method""" + with open(self.path, "wb") as f: + pickle.dump(self, f) + + +class YamlStorage(Storage): + """ + This class is responsible for storage and loading of Knowledge related data in yaml format. + + """ + + def __init__(self, path: Union[str, Path]): + super().__init__(path) + self.load() + + def load(self): + """load data from yaml format file""" + try: + self.documents = yaml.load(open(self.path, "r"), Loader=yaml.FullLoader) + except FileNotFoundError: + self.logger.warning(f"YamlStorage: file {self.path} doesn't exist.") + + def save(self, **kwargs): + """use pickle as the default save method""" + with open(self.path, 'w') as f: + yaml.dump(self.documents, f) + + +class ExperimentStorage(Storage): + """ + This class is responsible for storage and loading of mlflow related data. + + """ + + def __init__(self, exp_name, path=None): + super().__init__(path=path) + self.exp_name = exp_name + self.exp = None + self.recs = [] + self.docs = [] + + def load(self, exp_name, rec_id=None): + recs = [] + self.exp = R.get_exp(experiment_name=exp_name) + for r in self.exp.list_recorders(rtype=self.exp.RT_L): + if rec_id is not None and r.id != rec_id: + continue + recs.append(r) + self.recs.extend(recs) class Knowledge: @@ -12,8 +109,23 @@ class Knowledge: Use to handle knowledge in finCo such as experiment and outside domain information """ - def __init__(self): + def __init__(self, storage: Storage): self.logger = FinCoLog() + self.storage = storage + self.knowledge = [] + + def summarize(self, **kwargs): + """ + summarize storage data to knowledge, default knowledge is storage.documents + + Parameters + ---------- + + Return + ------ + """ + + self.knowledge = self.storage.documents def load(self, **kwargs): """ @@ -39,39 +151,130 @@ class Knowledge: """ raise NotImplementedError(f"Please implement the `load` method.") + def save(self, **kwargs): + """save knowledge persistently""" + self.storage.save(**kwargs) -class KnowledgeExperiment(Knowledge): + +class ExperimentKnowledge(Knowledge): """ Handle knowledge from experiments """ - def __init__(self, exp_name, rec_id=None): - super().__init__() - self.exp_name = exp_name - self.exp = None - self.recs = [] - - self.load(exp_name=exp_name, rec_id=rec_id) - - def load(self, exp_name, rec_id=None): - recs = [] - self.exp = R.get_exp(experiment_name=exp_name) - for r in self.exp.list_recorders(rtype=self.exp.RT_L): - if rec_id is not None and r.id != rec_id: - continue - recs.append(r) - self.recs.extend(recs) + def __init__(self, storage: ExperimentStorage): + super().__init__(storage=storage) + self.storage = storage def brief(self): docs = [] - for recorder in self.recs: - docs.append({"exp_name": self.exp.name, "record_info": recorder.info, + for recorder in self.storage.recs: + docs.append({"exp_name": self.storage.exp.name, "record_info": recorder.info, "config": recorder.load_object("config"), "context_summary": recorder.load_object("context_summary")}) - return docs +class PracticeKnowledge(Knowledge): + """ + some template sentence for now + """ + + def __init__(self, storage: YamlStorage): + super().__init__(storage=storage) + + self.summarize() + + def add(self, docs: List): + self.storage.add(docs) + self.summarize() + + self.save() + + +class FinanceKnowledge(Knowledge): + """ + Knowledge from articles + """ + + def __init__(self, storage: YamlStorage): + super().__init__(storage=storage) + if len(self.storage.documents) == 0: + docs = self.read_files_in_directory(self.storage.path) + self.add(docs) + self.summarize() + + def add(self, docs: List): + self.storage.add(docs) + self.summarize() + + self.save() + + @staticmethod + def read_files_in_directory(directory): + """ + read all .txt files under directory + """ + # todo: split article in trunks + file_contents = [] + for file_path in Path(directory).rglob("*.txt"): + if file_path.is_file(): + file_content = file_path.read_text(encoding="utf-8") + file_contents.append(file_content) + return file_contents + + +class ExecuteKnowledge(Knowledge): + """ + Config and associate execution result(pass or error message). We can regard the example in prompt as pass execution + """ + + def __init__(self, storage: YamlStorage): + super().__init__(storage=storage) + self.summarize() + + def add(self, docs: List): + self.storage.add(docs) + self.summarize() + + self.save() + + +class InfrastructureKnowledge(Knowledge): + """ + Knowledge from sentences, docstring, and code + """ + + def __init__(self, storage: YamlStorage): + super().__init__(storage=storage) + + if len(self.storage.documents) == 0: + # todo: change the path to qlib root path + docs = self.get_functions_and_docstrings(Path.cwd().parent) + self.add(docs) + + def add(self, docs: List): + self.storage.add(docs) + self.summarize() + + self.save() + + @staticmethod + def get_functions_and_docstrings(directory): + """ + get all method and docstring in .py files under directory + """ + functions = [] + for file_path in Path(directory).rglob("*.py"): + with file_path.open("r", encoding="utf-8") as f: + lines = f.readlines() + for line in lines: + if line.startswith("def "): + function_name = line.split("(")[0][4:].strip() + function_docstring = inspect.getdoc(eval(function_name)) + functions.append({"function_name": function_name, "docstring": function_docstring}) + return functions + + class Topic: def __init__(self, name: str, describe: Template): @@ -97,60 +300,84 @@ class KnowledgeBase: Load knowledge, offer brief information of knowledge and common handle interfaces """ - def __init__(self, init_path=None, topics: List[Topic] = None): + KT_EXECUTE = "execute" + KT_PRACTICE = "practice" + KT_FINANCE = "finance" + + def __init__(self, init_path=None): self.logger = FinCoLog() - init_path = init_path if init_path else Path.cwd() + self.init_path = Path(init_path) if init_path else Path.cwd() - if not init_path.exists(): - self.logger.warning(f"{init_path} not exist, create empty directory.") - Path.mkdir(init_path) + if not self.init_path.exists(): + self.logger.warning(f"{self.init_path} not exist, create empty directory.") + Path.mkdir(self.init_path) - self.knowledge = self.load(path=init_path) + self.practice_knowledge = self.load_practice_knowledge(self.init_path) + self.execute_knowledge = self.load_execute_knowledge(self.init_path) + self.finance_knowledge = self.load_finance_knowledge(self.init_path) - # todo: replace list with persistent storage strategy such as ES/pinecone to enable - # literal search/semantic search - self.docs = self.brief(knowledge=self.knowledge) - - self.topics = topics if topics else [] - - def load(self, path) -> List: + def load_experiment_knowledge(self, path) -> List: + # similar to practice knowledge, not use for now if isinstance(path, str): path = Path(path) knowledge = [] path = path if path.name == "mlruns" else path.joinpath("mlruns") + # todo: check the influence of set uri R.set_uri(path.as_uri()) for exp_name in R.list_experiments(): - knowledge.append(KnowledgeExperiment(exp_name=exp_name)) + knowledge.append(ExperimentKnowledge(storage=ExperimentStorage(exp_name=exp_name))) self.logger.plain_info(f"Load knowledge from: {path} finished.") return knowledge - def update(self, path): - # note: only update new knowledge in future - knowledge = self.load(path) - self.knowledge = knowledge - self.docs = self.brief(self.knowledge) - self.logger.plain_info(f"Update knowledge finished.") + def load_practice_knowledge(self, path: Path) -> PracticeKnowledge: + self.practice_knowledge = PracticeKnowledge(YamlStorage(path.joinpath("practice_knowledge.yaml"))) + return self.practice_knowledge - def brief(self, knowledge: List[Knowledge]) -> List: - docs = [] + def load_execute_knowledge(self, path: Path) -> ExecuteKnowledge: + self.execute_knowledge = ExecuteKnowledge(YamlStorage(path.joinpath("execute_knowledge.yaml"))) + return self.execute_knowledge + + def load_finance_knowledge(self, path: Path) -> FinanceKnowledge: + self.finance_knowledge = FinanceKnowledge(YamlStorage(path.joinpath("finance_knowledge.yaml"))) + return self.finance_knowledge + + def knowledge(self, knowledge_type: str = None): + if knowledge_type == self.KT_EXECUTE: + knowledge = self.execute_knowledge + elif knowledge_type == self.KT_PRACTICE: + knowledge = self.practice_knowledge + elif knowledge_type == self.KT_FINANCE: + knowledge = self.finance_knowledge + else: + knowledge = self.execute_knowledge.knowledge + self.practice_knowledge.knowledge \ + + self.finance_knowledge.knowledge + return knowledge + + def query(self, knowledge_type: str = None, content: str = None, n: int = 5): + """ + + @param knowledge_type: self.KT_EXECUTE, self.KT_PRACTICE or self.KT_FINANCE + @param content: content to query KnowledgeBase + @param n: top n knowledge to ask ChatGPT + @return: + """ + # todo: replace list with persistent storage strategy such as ES/pinecone to enable + # literal search/semantic search + + knowledge = self.knowledge(knowledge_type=knowledge_type) + scores = [] for k in knowledge: - docs.extend(k.brief()) + scores.append(similarity(str(k), content)) + sorted_indexes = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True) + similar_n_indexes = sorted_indexes[:n] + similar_n_docs = [knowledge[i] for i in similar_n_indexes] - self.logger.plain_info(f"Generate brief knowledge summary finished.") - return docs + prompt = Template("""summarize this information: '{{docs}}'""") + prompt_workflow_selection = prompt.render(docs=similar_n_docs) + response = APIBackend().build_messages_and_create_chat_completion( + user_prompt=prompt_workflow_selection + ) - def query(self, content: str = None): - # todo: query by DSL - return self.docs - - def query_topics(self): - knowledge_of_topics = [] - for topic in self.topics: - knowledge_of_topics.append({topic.name: topic.knowledge}) - return knowledge_of_topics - - def summarize_by_topic(self): - for topic in self.topics: - topic.summarize(self.docs) + return response diff --git a/qlib/finco/workflow.py b/qlib/finco/workflow.py index 324961b30..b6d00f966 100644 --- a/qlib/finco/workflow.py +++ b/qlib/finco/workflow.py @@ -174,16 +174,14 @@ class LearnManager: self.epoch = 0 self.wm = WorkflowManager() - topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in - self.__DEFAULT_TOPICS] - self.knowledge_base = KnowledgeBase(init_path=Path.cwd().joinpath('knowledge'), topics=topics) + self.topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in + self.__DEFAULT_TOPICS] + self.knowledge_base = KnowledgeBase(init_path=Path.cwd().joinpath('knowledge')) def run(self, prompt): # todo: add early stop condition for i in range(10): self.wm.run(prompt) - self.knowledge_base.update(self.wm._workspace) - self.knowledge_base.summarize_by_topic() self.learn() self.epoch += 1 @@ -204,9 +202,12 @@ class LearnManager: user_prompt = self.wm.context.get_context("user_prompt") summary = self.wm.context.get_context("summary") + [topic.summarize(self.knowledge_base.knowledge()) for topic in self.topics] + knowledge_of_topics = [{topic.name: topic.knowledge} for topic in self.topics] + for task in task_finished: prompt_workflow_selection = self.wm.prompt_template.get(f"{self.__class__.__name__}_user").render( - summary=summary, brief=self.knowledge_base.query_topics(), + summary=summary, brief=knowledge_of_topics, task_finished=[str(t) for t in task_finished], task=task.__class__.__name__, system=task.system.render(), user_prompt=user_prompt )