1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-03 19:10:58 +08:00

Add knowledge module and tune summarizeTask (#1582)

* Add knowledge module
* add KnowledgeExperiment add KnowledgeBase;
* add knowledge associate prompts to template;

* Add Topic class
* add Topic to summarize knowledge;
* add recorder's metric to summarizeTask;

---------

Co-authored-by: Cadenza-Li <362237642@qq.com>
This commit is contained in:
Fivele-Li
2023-07-06 11:39:36 +08:00
committed by GitHub
parent aef11536e3
commit 86ffd1799d
4 changed files with 259 additions and 42 deletions

154
qlib/finco/knowledge.py Normal file
View File

@@ -0,0 +1,154 @@
from pathlib import Path
from qlib.workflow import R
from qlib.finco.log import FinCoLog
from qlib.finco.llm import APIBackend
from jinja2 import Template
class Knowledge:
"""
Use to handle knowledge in finCo such as experiment and outside domain information
"""
def __init__(self):
self.logger = FinCoLog()
def load(self, **kwargs):
"""
Load knowledge in memory
Parameters
----------
Return
------
"""
raise NotImplementedError(f"Please implement the `load` method.")
def brief(self, **kwargs):
"""
Return a brief summary of knowledge
Parameters
----------
Return
------
"""
raise NotImplementedError(f"Please implement the `load` method.")
class KnowledgeExperiment(Knowledge):
"""
Handle knowledge from experiments
"""
def __init__(self, exp_name, rec_id=None):
super().__init__()
self.exp_name = exp_name
self.exp = None
self.recs = []
self.load(exp_name=exp_name, rec_id=rec_id)
def load(self, exp_name, rec_id=None):
recs = []
self.exp = R.get_exp(experiment_name=exp_name)
for r in self.exp.list_recorders(rtype=self.exp.RT_L):
if rec_id is not None and r.id != rec_id:
continue
recs.append(r)
self.recs.extend(recs)
def brief(self):
docs = []
for recorder in self.recs:
docs.append({"exp_name": self.exp.name, "record_info": recorder.info,
"config": recorder.load_object("config"),
"context_summary": recorder.load_object("context_summary")})
return docs
class Topic:
def __init__(self, name: str, describe: Template):
self.name = name
self.describe = describe
self.docs = []
self.knowledge = None
self.logger = FinCoLog()
def summarize(self, docs: list):
self.logger.info(f"Summarize topic: \nname: {self.name}\ndescribe: {self.describe.module}")
prompt_workflow_selection = self.describe.render(docs=docs)
response = APIBackend().build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection
)
self.knowledge = response
self.docs = docs
class KnowledgeBase:
"""
Load knowledge, offer brief information of knowledge and common handle interfaces
"""
def __init__(self, init_path=None, topics: list[Topic] = None):
self.logger = FinCoLog()
init_path = init_path if init_path else Path.cwd()
if not init_path.exists():
self.logger.warning(f"{init_path} not exist, create empty directory.")
Path.mkdir(init_path)
self.knowledge = self.load(path=init_path)
# todo: replace list with persistent storage strategy such as ES/pinecone to enable
# literal search/semantic search
self.docs = self.brief(knowledge=self.knowledge)
self.topics = topics if topics else []
def load(self, path) -> list:
if isinstance(path, str):
path = Path(path)
knowledge = []
path = path if path.name == "mlruns" else path.joinpath("mlruns")
R.set_uri(path.as_uri())
for exp_name in R.list_experiments():
knowledge.append(KnowledgeExperiment(exp_name=exp_name))
self.logger.plain_info(f"Load knowledge from: {path} finished.")
return knowledge
def update(self, path):
# note: only update new knowledge in future
knowledge = self.load(path)
self.knowledge = knowledge
self.docs = self.brief(self.knowledge)
self.logger.plain_info(f"Update knowledge finished.")
def brief(self, knowledge: list[Knowledge]) -> list:
docs = []
for k in knowledge:
docs.extend(k.brief())
self.logger.plain_info(f"Generate brief knowledge summary finished.")
return docs
def query(self, content: str = None):
# todo: query by DSL
return self.docs
def query_topics(self):
knowledge_of_topics = []
for topic in self.topics:
knowledge_of_topics.append({topic.name: topic.knowledge})
return knowledge_of_topics
def summarize_by_topic(self):
for topic in self.topics:
topic.summarize(self.docs)

View File

@@ -610,9 +610,22 @@ SummarizeTask_user : |-
Here is my information: '{{information}}'
My intention is: {{user_prompt}}. Please provide me with a summary and recommendation based on my intention and the information I have provided. There are some figures which absolute path are: {{figure_path}}, You must display these images in markdown using the appropriate image format.
BackForwardTask_system : |-
SummarizeTask_context_system : |-
Your purpose is to find the important information offered by user and summarize it.
SummarizeTask_context_user : |-
Here is my information: '{{key}}:{{value}}'
Please summarize it.
LearnManager_system : |-
Your task is adjusting system prompt in each task to fulfill user's intention
BackForwardTask_user : |-
Here is the final summary: '{{summary}}'
Tasks I have run are: {{task_finished}}, {{task}}'s system prompt is: {{system}}. User's intention is: {{user_prompt}}. you will adjust it to:
LearnManager_user : |-
Here is the final summary:\n{{summary}}\n. Brief of this workflow is:{{brief}}\n
Tasks I have run are: {{task_finished}}, \n{{task}}'s system prompt is: {{system}}. \nUser's intention is: {{user_prompt}}. you will adjust it to:
Topic_IC : |-
Summarize the influence of parameters on IC: {{docs}}
Topic_MaxDropDown : |-
Summarize the influence of parameters on max dropdown: {{docs}}

View File

@@ -13,7 +13,6 @@ import inspect
from qlib.finco.llm import APIBackend
from qlib.finco.tpl import get_tpl_path
from qlib.finco.prompt_template import PromptTemplate
from qlib.workflow.record_temp import HFSignalRecord, SignalRecord
from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer
from qlib.workflow import R
from qlib.finco.log import FinCoLog, LogColors
@@ -254,6 +253,7 @@ class TrainTask(Task):
workflow_path = workspace.joinpath(workflow_config)
with workflow_path.open() as f:
workflow = yaml.safe_load(f)
self._context_manager.set_context("workflow_yaml", workflow)
confirm = self.interact(f"I select this workflow file: "
f"{LogColors().render(workflow_path, color=LogColors.YELLOW, style=LogColors.BOLD)}\n"
@@ -271,10 +271,14 @@ class TrainTask(Task):
except subprocess.CalledProcessError as e:
print(f"An error occurred while running the subprocess: {e.stderr} {e.stdout}")
real_error = e.stderr+e.stdout
if "model" in e.stdout.lower():
return [HyperparameterActionTask("Model", regenerate=True, error=real_error), ConfigActionTask("Model"), YamlEditTask("Model"), TrainTask()]
elif "dataset" in e.stdout.lower() or "handler" in e.stdout.lower():
return [HyperparameterActionTask("Dataset", regenerate=True, error=real_error), HyperparameterActionTask("DataHandler", regenerate=True, error=real_error), ConfigActionTask("Dataset"), ConfigActionTask("DataHandler"), YamlEditTask("Dataset"), YamlEditTask("DataHandler"), TrainTask()]
if "data" in e.stdout.lower() or "handler" in e.stdout.lower():
return [HyperparameterActionTask("Dataset", regenerate=True, error=real_error),
HyperparameterActionTask("DataHandler", regenerate=True, error=real_error),
ConfigActionTask("Dataset"), ConfigActionTask("DataHandler"), YamlEditTask("Dataset"),
YamlEditTask("DataHandler"), TrainTask()]
elif "model" in e.stdout.lower():
return [HyperparameterActionTask("Model", regenerate=True, error=real_error),
ConfigActionTask("Model"), YamlEditTask("Model"), TrainTask()]
else:
ret_list = []
for component in COMPONENT_LIST:
@@ -752,12 +756,9 @@ class CodeDumpTask(ActionTask):
return [ImplementActionTask(self.target_component, reimplement=True), CodeDumpTask(self.target_component)]
return []
class SummarizeTask(Task):
__DEFAULT_WORKSPACE = "./"
__DEFAULT_USER_PROMPT = (
"Summarize the information I offered and give me some advice."
)
class SummarizeTask(Task):
__DEFAULT_SUMMARIZE_CONTEXT = ["workflow_yaml", "metrics"]
# TODO: 2048 is close to exceed GPT token limit
__MAX_LENGTH_OF_FILE = 2048
@@ -765,39 +766,60 @@ class SummarizeTask(Task):
def __init__(self):
super().__init__()
self.workspace = self.__DEFAULT_WORKSPACE
self.workspace = "./"
@property
def summarize_context_system(self):
return self.prompt_template.get(self.__class__.__name__ + "_context_system")
@property
def summarize_context_user(self):
return self.prompt_template.get(self.__class__.__name__ + "_context_user")
def execute(self) -> Any:
workspace = self._context_manager.get_context("workspace")
if workspace is not None:
self.workspace = workspace
user_prompt = self._context_manager.get_context("user_prompt")
user_prompt = (
user_prompt if user_prompt is not None else self.__DEFAULT_USER_PROMPT
)
workflow_yaml = self._context_manager.get_context("workflow_yaml")
file_info = self.get_info_from_file(workspace)
context_info = [] # too long context make response unstable.
figure_path = self.get_figure_path()
context_info = self.get_info_from_context() # too long context make response unstable.
record_info = self.get_info_from_recorder(workspace, workflow_yaml["experiment_name"])
figure_path = self.get_figure_path(workspace)
information = context_info + file_info
prompt_workflow_selection = self.user.render(
information=information, figure_path=figure_path, user_prompt=user_prompt
)
information = context_info + file_info + record_info
def _get_value_from_info(info: list, k: str):
for i in information:
if k in i.keys():
return i.get(k)
return ""
# todo: remove 'be' after test
be = APIBackend()
bak_debug_mode = be.debug_mode
be.debug_mode = False
context_summary = {}
for key in self.__DEFAULT_SUMMARIZE_CONTEXT:
prompt_workflow_selection = self.summarize_context_user.render(
key=key, value=_get_value_from_info(info=information, k=key)
)
response = be.build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection, system_prompt=self.summarize_context_system.render()
)
context_summary.update({key: response})
recorder = R.get_recorder(experiment_name=workflow_yaml["experiment_name"])
recorder.save_objects(context_summary=context_summary)
prompt_workflow_selection = self.user.render(
information=file_info + record_info, figure_path=figure_path, user_prompt=user_prompt
)
response = be.build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection, system_prompt=self.system.render()
)
self._context_manager.set_context("summary", response)
be.debug_mode = bak_debug_mode
self.save_markdown(content=response)
self.save_markdown(content=response, path=workspace)
self.logger.info(f"Report has saved to {self.__DEFAULT_REPORT_NAME}", title="End")
return []
@@ -850,18 +872,36 @@ class SummarizeTask(Task):
context.append({key: c[: self.__MAX_LENGTH_OF_FILE]})
return context
def get_figure_path(self):
def get_info_from_recorder(self, path, exp_name) -> list:
path = path if path.name == "mlruns" else path.joinpath("mlruns")
R.set_uri(Path(path).as_uri())
exp = R.get_exp(experiment_name=exp_name)
records = []
recorders = exp.list_recorders(rtype=exp.RT_L)
if len(recorders) == 0:
return records
# get info from the latest recorder, sort by end time is considerable
recorders = sorted(recorders, key=lambda x: x.experiment_id)
recorder = recorders[-1]
records.append({"metrics": recorder.list_metrics()})
return records
def get_figure_path(self, path):
file_list = []
for root, dirs, files in os.walk(Path(self.workspace)):
for root, dirs, files in os.walk(Path(path)):
for filename in files:
postfix = filename.split(".")[-1]
if postfix in ["jpeg"]:
description = self._context_manager.retrieve(filename)
file_list.append({"file_name": filename, "description": description,
"path": str(Path(self.workspace).joinpath(filename))})
"path": str(Path(path).joinpath(filename))})
return file_list
def save_markdown(self, content: str):
with open(Path(self.workspace).joinpath(self.__DEFAULT_REPORT_NAME), "w") as f:
def save_markdown(self, content: str, path):
with open(Path(path).joinpath(self.__DEFAULT_REPORT_NAME), "w") as f:
f.write(content)

View File

@@ -9,6 +9,7 @@ from qlib.finco.log import FinCoLog, LogColors
from qlib.finco.utils import similarity
from qlib.finco.llm import APIBackend
from qlib.finco.conf import Config
from qlib.finco.knowledge import KnowledgeBase, Topic
class WorkflowContextManager:
@@ -78,7 +79,7 @@ class WorkflowManager:
self.prompt_template = PromptTemplate()
self.context = WorkflowContextManager()
self.context.set_context("workspace", self._workspace)
self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China A csi800. Please help to use lightgbm model."
self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China A csi300. Please help to use lightgbm model."
def _confirm_and_rm(self):
# if workspace exists, please confirm and remove it. Otherwise exit.
@@ -166,34 +167,43 @@ class WorkflowManager:
class LearnManager:
__DEFAULT_TOPICS = ["IC", "MaxDropDown"]
def __init__(self):
self.epoch = 0
self.wm = WorkflowManager()
def run(self, prompt):
topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in
self.__DEFAULT_TOPICS]
self.knowledge_base = KnowledgeBase(init_path=Path.cwd().joinpath('knowledge'), topics=topics)
def run(self, prompt):
# todo: add early stop condition
for i in range(10):
self.wm.run(prompt)
self.knowledge_base.update(self.wm._workspace)
self.knowledge_base.summarize_by_topic()
self.learn()
self.epoch += 1
def learn(self):
workspace = self.wm.context.get_context("workspace")
task_finished = self.wm.context.get_context("task_finished")
# one task maybe run several times in workflow
task_finished = list(set(self.wm.context.get_context("task_finished")))
user_prompt = self.wm.context.get_context("user_prompt")
summary = self.wm.context.get_context("summary")
for task in task_finished:
prompt_workflow_selection = task.user.render(
summary=summary, task_finished=[str(task) for task in task_finished],
prompt_workflow_selection = self.wm.prompt_template.get(f"{self.__class__.__name__}_user").render(
summary=summary, brief=self.knowledge_base.query_topics(),
task_finished=[str(task) for task in task_finished],
task=task.__class__, system=task.system, user_prompt=user_prompt
)
response = APIBackend().build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection, system_prompt=task.system.render()
user_prompt=prompt_workflow_selection,
system_prompt=self.wm.prompt_template.get(f"{self.__class__.__name__}_user").render()
)
# todo: response assertion