From 86ffd1799d7c25f5af60800a76525e4fb1e4113c Mon Sep 17 00:00:00 2001 From: Fivele-Li <128388363+Fivele-Li@users.noreply.github.com> Date: Thu, 6 Jul 2023 11:39:36 +0800 Subject: [PATCH] Add knowledge module and tune summarizeTask (#1582) * Add knowledge module * add KnowledgeExperiment add KnowledgeBase; * add knowledge associate prompts to template; * Add Topic class * add Topic to summarize knowledge; * add recorder's metric to summarizeTask; --------- Co-authored-by: Cadenza-Li <362237642@qq.com> --- qlib/finco/knowledge.py | 154 ++++++++++++++++++++++++++++++++ qlib/finco/prompt_template.yaml | 21 ++++- qlib/finco/task.py | 104 ++++++++++++++------- qlib/finco/workflow.py | 22 +++-- 4 files changed, 259 insertions(+), 42 deletions(-) create mode 100644 qlib/finco/knowledge.py diff --git a/qlib/finco/knowledge.py b/qlib/finco/knowledge.py new file mode 100644 index 000000000..459132318 --- /dev/null +++ b/qlib/finco/knowledge.py @@ -0,0 +1,154 @@ +from pathlib import Path +from qlib.workflow import R +from qlib.finco.log import FinCoLog +from qlib.finco.llm import APIBackend +from jinja2 import Template + + +class Knowledge: + """ + Use to handle knowledge in finCo such as experiment and outside domain information + """ + + def __init__(self): + self.logger = FinCoLog() + + def load(self, **kwargs): + """ + Load knowledge in memory + + Parameters + ---------- + + Return + ------ + """ + raise NotImplementedError(f"Please implement the `load` method.") + + def brief(self, **kwargs): + """ + Return a brief summary of knowledge + + Parameters + ---------- + + Return + ------ + """ + raise NotImplementedError(f"Please implement the `load` method.") + + +class KnowledgeExperiment(Knowledge): + """ + Handle knowledge from experiments + """ + + def __init__(self, exp_name, rec_id=None): + super().__init__() + self.exp_name = exp_name + self.exp = None + self.recs = [] + + self.load(exp_name=exp_name, rec_id=rec_id) + + def load(self, exp_name, rec_id=None): + recs = [] + self.exp = R.get_exp(experiment_name=exp_name) + for r in self.exp.list_recorders(rtype=self.exp.RT_L): + if rec_id is not None and r.id != rec_id: + continue + recs.append(r) + self.recs.extend(recs) + + def brief(self): + docs = [] + for recorder in self.recs: + docs.append({"exp_name": self.exp.name, "record_info": recorder.info, + "config": recorder.load_object("config"), + "context_summary": recorder.load_object("context_summary")}) + + return docs + + +class Topic: + + def __init__(self, name: str, describe: Template): + self.name = name + self.describe = describe + self.docs = [] + self.knowledge = None + self.logger = FinCoLog() + + def summarize(self, docs: list): + self.logger.info(f"Summarize topic: \nname: {self.name}\ndescribe: {self.describe.module}") + prompt_workflow_selection = self.describe.render(docs=docs) + response = APIBackend().build_messages_and_create_chat_completion( + user_prompt=prompt_workflow_selection + ) + + self.knowledge = response + self.docs = docs + + +class KnowledgeBase: + """ + Load knowledge, offer brief information of knowledge and common handle interfaces + """ + + def __init__(self, init_path=None, topics: list[Topic] = None): + self.logger = FinCoLog() + init_path = init_path if init_path else Path.cwd() + + if not init_path.exists(): + self.logger.warning(f"{init_path} not exist, create empty directory.") + Path.mkdir(init_path) + + self.knowledge = self.load(path=init_path) + + # todo: replace list with persistent storage strategy such as ES/pinecone to enable + # literal search/semantic search + self.docs = self.brief(knowledge=self.knowledge) + + self.topics = topics if topics else [] + + def load(self, path) -> list: + if isinstance(path, str): + path = Path(path) + + knowledge = [] + path = path if path.name == "mlruns" else path.joinpath("mlruns") + R.set_uri(path.as_uri()) + for exp_name in R.list_experiments(): + knowledge.append(KnowledgeExperiment(exp_name=exp_name)) + + self.logger.plain_info(f"Load knowledge from: {path} finished.") + return knowledge + + def update(self, path): + # note: only update new knowledge in future + knowledge = self.load(path) + self.knowledge = knowledge + self.docs = self.brief(self.knowledge) + self.logger.plain_info(f"Update knowledge finished.") + + def brief(self, knowledge: list[Knowledge]) -> list: + docs = [] + for k in knowledge: + docs.extend(k.brief()) + + self.logger.plain_info(f"Generate brief knowledge summary finished.") + return docs + + def query(self, content: str = None): + # todo: query by DSL + return self.docs + + def query_topics(self): + knowledge_of_topics = [] + for topic in self.topics: + knowledge_of_topics.append({topic.name: topic.knowledge}) + return knowledge_of_topics + + def summarize_by_topic(self): + for topic in self.topics: + topic.summarize(self.docs) diff --git a/qlib/finco/prompt_template.yaml b/qlib/finco/prompt_template.yaml index 0e17304bb..fba0d0402 100644 --- a/qlib/finco/prompt_template.yaml +++ b/qlib/finco/prompt_template.yaml @@ -610,9 +610,22 @@ SummarizeTask_user : |- Here is my information: '{{information}}' My intention is: {{user_prompt}}. Please provide me with a summary and recommendation based on my intention and the information I have provided. There are some figures which absolute path are: {{figure_path}}, You must display these images in markdown using the appropriate image format. -BackForwardTask_system : |- +SummarizeTask_context_system : |- + Your purpose is to find the important information offered by user and summarize it. + +SummarizeTask_context_user : |- + Here is my information: '{{key}}:{{value}}' + Please summarize it. + +LearnManager_system : |- Your task is adjusting system prompt in each task to fulfill user's intention -BackForwardTask_user : |- - Here is the final summary: '{{summary}}' - Tasks I have run are: {{task_finished}}, {{task}}'s system prompt is: {{system}}. User's intention is: {{user_prompt}}. you will adjust it to: +LearnManager_user : |- + Here is the final summary:\n{{summary}}\n. Brief of this workflow is:{{brief}}\n + Tasks I have run are: {{task_finished}}, \n{{task}}'s system prompt is: {{system}}. \nUser's intention is: {{user_prompt}}. you will adjust it to: + +Topic_IC : |- + Summarize the influence of parameters on IC: {{docs}} + +Topic_MaxDropDown : |- + Summarize the influence of parameters on max dropdown: {{docs}} \ No newline at end of file diff --git a/qlib/finco/task.py b/qlib/finco/task.py index d9db799b8..ee429b851 100644 --- a/qlib/finco/task.py +++ b/qlib/finco/task.py @@ -13,7 +13,6 @@ import inspect from qlib.finco.llm import APIBackend from qlib.finco.tpl import get_tpl_path from qlib.finco.prompt_template import PromptTemplate -from qlib.workflow.record_temp import HFSignalRecord, SignalRecord from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer from qlib.workflow import R from qlib.finco.log import FinCoLog, LogColors @@ -254,6 +253,7 @@ class TrainTask(Task): workflow_path = workspace.joinpath(workflow_config) with workflow_path.open() as f: workflow = yaml.safe_load(f) + self._context_manager.set_context("workflow_yaml", workflow) confirm = self.interact(f"I select this workflow file: " f"{LogColors().render(workflow_path, color=LogColors.YELLOW, style=LogColors.BOLD)}\n" @@ -271,10 +271,14 @@ class TrainTask(Task): except subprocess.CalledProcessError as e: print(f"An error occurred while running the subprocess: {e.stderr} {e.stdout}") real_error = e.stderr+e.stdout - if "model" in e.stdout.lower(): - return [HyperparameterActionTask("Model", regenerate=True, error=real_error), ConfigActionTask("Model"), YamlEditTask("Model"), TrainTask()] - elif "dataset" in e.stdout.lower() or "handler" in e.stdout.lower(): - return [HyperparameterActionTask("Dataset", regenerate=True, error=real_error), HyperparameterActionTask("DataHandler", regenerate=True, error=real_error), ConfigActionTask("Dataset"), ConfigActionTask("DataHandler"), YamlEditTask("Dataset"), YamlEditTask("DataHandler"), TrainTask()] + if "data" in e.stdout.lower() or "handler" in e.stdout.lower(): + return [HyperparameterActionTask("Dataset", regenerate=True, error=real_error), + HyperparameterActionTask("DataHandler", regenerate=True, error=real_error), + ConfigActionTask("Dataset"), ConfigActionTask("DataHandler"), YamlEditTask("Dataset"), + YamlEditTask("DataHandler"), TrainTask()] + elif "model" in e.stdout.lower(): + return [HyperparameterActionTask("Model", regenerate=True, error=real_error), + ConfigActionTask("Model"), YamlEditTask("Model"), TrainTask()] else: ret_list = [] for component in COMPONENT_LIST: @@ -752,12 +756,9 @@ class CodeDumpTask(ActionTask): return [ImplementActionTask(self.target_component, reimplement=True), CodeDumpTask(self.target_component)] return [] -class SummarizeTask(Task): - __DEFAULT_WORKSPACE = "./" - __DEFAULT_USER_PROMPT = ( - "Summarize the information I offered and give me some advice." - ) +class SummarizeTask(Task): + __DEFAULT_SUMMARIZE_CONTEXT = ["workflow_yaml", "metrics"] # TODO: 2048 is close to exceed GPT token limit __MAX_LENGTH_OF_FILE = 2048 @@ -765,39 +766,60 @@ class SummarizeTask(Task): def __init__(self): super().__init__() - self.workspace = self.__DEFAULT_WORKSPACE + self.workspace = "./" + + @property + def summarize_context_system(self): + return self.prompt_template.get(self.__class__.__name__ + "_context_system") + + @property + def summarize_context_user(self): + return self.prompt_template.get(self.__class__.__name__ + "_context_user") def execute(self) -> Any: workspace = self._context_manager.get_context("workspace") - if workspace is not None: - self.workspace = workspace - user_prompt = self._context_manager.get_context("user_prompt") - user_prompt = ( - user_prompt if user_prompt is not None else self.__DEFAULT_USER_PROMPT - ) + workflow_yaml = self._context_manager.get_context("workflow_yaml") file_info = self.get_info_from_file(workspace) - context_info = [] # too long context make response unstable. - figure_path = self.get_figure_path() + context_info = self.get_info_from_context() # too long context make response unstable. + record_info = self.get_info_from_recorder(workspace, workflow_yaml["experiment_name"]) + figure_path = self.get_figure_path(workspace) - information = context_info + file_info - prompt_workflow_selection = self.user.render( - information=information, figure_path=figure_path, user_prompt=user_prompt - ) + information = context_info + file_info + record_info + + def _get_value_from_info(info: list, k: str): + for i in information: + if k in i.keys(): + return i.get(k) + return "" # todo: remove 'be' after test be = APIBackend() - bak_debug_mode = be.debug_mode be.debug_mode = False + + context_summary = {} + for key in self.__DEFAULT_SUMMARIZE_CONTEXT: + prompt_workflow_selection = self.summarize_context_user.render( + key=key, value=_get_value_from_info(info=information, k=key) + ) + response = be.build_messages_and_create_chat_completion( + user_prompt=prompt_workflow_selection, system_prompt=self.summarize_context_system.render() + ) + context_summary.update({key: response}) + + recorder = R.get_recorder(experiment_name=workflow_yaml["experiment_name"]) + recorder.save_objects(context_summary=context_summary) + + prompt_workflow_selection = self.user.render( + information=file_info + record_info, figure_path=figure_path, user_prompt=user_prompt + ) response = be.build_messages_and_create_chat_completion( user_prompt=prompt_workflow_selection, system_prompt=self.system.render() ) self._context_manager.set_context("summary", response) - be.debug_mode = bak_debug_mode - - self.save_markdown(content=response) + self.save_markdown(content=response, path=workspace) self.logger.info(f"Report has saved to {self.__DEFAULT_REPORT_NAME}", title="End") return [] @@ -850,18 +872,36 @@ class SummarizeTask(Task): context.append({key: c[: self.__MAX_LENGTH_OF_FILE]}) return context - def get_figure_path(self): + def get_info_from_recorder(self, path, exp_name) -> list: + path = path if path.name == "mlruns" else path.joinpath("mlruns") + + R.set_uri(Path(path).as_uri()) + exp = R.get_exp(experiment_name=exp_name) + + records = [] + recorders = exp.list_recorders(rtype=exp.RT_L) + if len(recorders) == 0: + return records + + # get info from the latest recorder, sort by end time is considerable + recorders = sorted(recorders, key=lambda x: x.experiment_id) + recorder = recorders[-1] + + records.append({"metrics": recorder.list_metrics()}) + return records + + def get_figure_path(self, path): file_list = [] - for root, dirs, files in os.walk(Path(self.workspace)): + for root, dirs, files in os.walk(Path(path)): for filename in files: postfix = filename.split(".")[-1] if postfix in ["jpeg"]: description = self._context_manager.retrieve(filename) file_list.append({"file_name": filename, "description": description, - "path": str(Path(self.workspace).joinpath(filename))}) + "path": str(Path(path).joinpath(filename))}) return file_list - def save_markdown(self, content: str): - with open(Path(self.workspace).joinpath(self.__DEFAULT_REPORT_NAME), "w") as f: + def save_markdown(self, content: str, path): + with open(Path(path).joinpath(self.__DEFAULT_REPORT_NAME), "w") as f: f.write(content) diff --git a/qlib/finco/workflow.py b/qlib/finco/workflow.py index 4d9e2a16c..75c8e0950 100644 --- a/qlib/finco/workflow.py +++ b/qlib/finco/workflow.py @@ -9,6 +9,7 @@ from qlib.finco.log import FinCoLog, LogColors from qlib.finco.utils import similarity from qlib.finco.llm import APIBackend from qlib.finco.conf import Config +from qlib.finco.knowledge import KnowledgeBase, Topic class WorkflowContextManager: @@ -78,7 +79,7 @@ class WorkflowManager: self.prompt_template = PromptTemplate() self.context = WorkflowContextManager() self.context.set_context("workspace", self._workspace) - self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China A csi800. Please help to use lightgbm model." + self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China A csi300. Please help to use lightgbm model." def _confirm_and_rm(self): # if workspace exists, please confirm and remove it. Otherwise exit. @@ -166,34 +167,43 @@ class WorkflowManager: class LearnManager: + __DEFAULT_TOPICS = ["IC", "MaxDropDown"] def __init__(self): self.epoch = 0 self.wm = WorkflowManager() - def run(self, prompt): + topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in + self.__DEFAULT_TOPICS] + self.knowledge_base = KnowledgeBase(init_path=Path.cwd().joinpath('knowledge'), topics=topics) + def run(self, prompt): # todo: add early stop condition for i in range(10): self.wm.run(prompt) + self.knowledge_base.update(self.wm._workspace) + self.knowledge_base.summarize_by_topic() self.learn() self.epoch += 1 def learn(self): workspace = self.wm.context.get_context("workspace") - task_finished = self.wm.context.get_context("task_finished") + # one task maybe run several times in workflow + task_finished = list(set(self.wm.context.get_context("task_finished"))) user_prompt = self.wm.context.get_context("user_prompt") summary = self.wm.context.get_context("summary") for task in task_finished: - prompt_workflow_selection = task.user.render( - summary=summary, task_finished=[str(task) for task in task_finished], + prompt_workflow_selection = self.wm.prompt_template.get(f"{self.__class__.__name__}_user").render( + summary=summary, brief=self.knowledge_base.query_topics(), + task_finished=[str(task) for task in task_finished], task=task.__class__, system=task.system, user_prompt=user_prompt ) response = APIBackend().build_messages_and_create_chat_completion( - user_prompt=prompt_workflow_selection, system_prompt=task.system.render() + user_prompt=prompt_workflow_selection, + system_prompt=self.wm.prompt_template.get(f"{self.__class__.__name__}_user").render() ) # todo: response assertion