1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-03 02:50:58 +08:00

Merge pull request #1592 from Fivele-Li/update_knowledge_module

update knowledge module;
This commit is contained in:
Xu Yang
2023-07-16 11:36:31 +08:00
committed by GitHub
5 changed files with 433 additions and 80 deletions

View File

@@ -1,10 +1,112 @@
from pathlib import Path
from jinja2 import Template
from typing import List
from typing import List, Union
import pickle
import yaml
from qlib.workflow import R
from qlib.finco.log import FinCoLog
from qlib.finco.llm import APIBackend
from qlib.finco.utils import similarity, random_string
logger = FinCoLog()
class Storage:
"""
This class is responsible for storage and loading of Knowledge related data.
"""
def __init__(self, path: Union[str, Path], name: str = None):
self.path = path if isinstance(path, Path) else Path(path)
self.name = name if name else self.path.name
self.source = None
# todo: get document by key
self.documents = []
def add(self, documents: List):
self.documents.extend(documents)
self.save()
def load(self, **kwargs):
raise NotImplementedError(f"Please implement the `load` method.")
def save(self, **kwargs):
raise NotImplementedError(f"Please implement the `save` method.")
class PickleStorage(Storage):
"""
This class is responsible for storage and loading of Knowledge related data in pickle format.
"""
def __init__(self, path: Union[str, Path]):
super().__init__(path)
@classmethod
def load(cls, path: Union[str, Path]):
"""use pickle as the default load method"""
path = path if isinstance(path, Path) else Path(path)
with open(path, "rb") as f:
return pickle.load(f)
def save(self, **kwargs):
"""use pickle as the default save method"""
Path.mkdir(self.path.parent, exist_ok=True)
with open(self.path, "wb") as f:
pickle.dump(self, f)
class YamlStorage(Storage):
"""
This class is responsible for storage and loading of Knowledge related data in yaml format.
"""
DEFAULT_NAME = "storage.yml"
def __init__(self, path: Union[str, Path]):
super().__init__(path)
assert self.path.name, "Yaml storage should specify file name."
self.load()
def load(self):
"""load data from yaml format file"""
try:
self.documents = yaml.load(open(self.path, "r"), Loader=yaml.FullLoader)
except FileNotFoundError:
logger.warning(f"YamlStorage: file {self.path} doesn't exist.")
def save(self, **kwargs):
"""use pickle as the default save method"""
Path.mkdir(self.path.parent, exist_ok=True)
with open(self.path, 'w') as f:
yaml.dump(self.documents, f)
class ExperimentStorage(Storage):
"""
This class is responsible for storage and loading of mlflow related data.
"""
def __init__(self, exp_name, path=None):
super().__init__(path=path)
self.exp_name = exp_name
self.exp = None
self.recs = []
self.docs = []
def load(self, exp_name, rec_id=None):
recs = []
self.exp = R.get_exp(experiment_name=exp_name)
for r in self.exp.list_recorders(rtype=self.exp.RT_L):
if rec_id is not None and r.id != rec_id:
continue
recs.append(r)
self.recs.extend(recs)
class Knowledge:
@@ -12,12 +114,24 @@ class Knowledge:
Use to handle knowledge in finCo such as experiment and outside domain information
"""
def __init__(self):
self.logger = FinCoLog()
def __init__(self, storages: Union[List[Storage], Storage], name: str = None):
self.name = name if name else random_string()
self.workdir = Path.cwd().joinpath("knowledge")
self.storages = [storages] if isinstance(storages, Storage) else storages
self.knowledge = []
def load(self, **kwargs):
def get_storage(self, name: str):
"""
Load knowledge in memory
return first storage matched given name, else return None
"""
for storage in self.storages:
if storage.name == name:
return storage
return None
def summarize(self, **kwargs):
"""
summarize storage data to knowledge, default knowledge is storage.documents
Parameters
----------
@@ -25,7 +139,24 @@ class Knowledge:
Return
------
"""
raise NotImplementedError(f"Please implement the `load` method.")
for storage in self.storages:
self.knowledge.extend(storage.documents)
@classmethod
def load(cls, path: Union[str, Path]):
"""
Load knowledge in memory
use pickle as the default file type
Parameters
----------
Return
------
"""
""""""
path = path if isinstance(path, Path) else Path(path)
with open(path, "rb") as f:
return pickle.load(f)
def brief(self, **kwargs):
"""
@@ -39,39 +170,171 @@ class Knowledge:
"""
raise NotImplementedError(f"Please implement the `load` method.")
def save(self, **kwargs):
"""save knowledge persistently"""
# todo: storages save index only
Path.mkdir(self.workdir.joinpath(self.name), exist_ok=True)
with open(self.workdir.joinpath(self.name).joinpath("knowledge.pkl"), "wb") as f:
pickle.dump(self, f)
class KnowledgeExperiment(Knowledge):
class ExperimentKnowledge(Knowledge):
"""
Handle knowledge from experiments
"""
def __init__(self, exp_name, rec_id=None):
super().__init__()
self.exp_name = exp_name
self.exp = None
self.recs = []
self.load(exp_name=exp_name, rec_id=rec_id)
def load(self, exp_name, rec_id=None):
recs = []
self.exp = R.get_exp(experiment_name=exp_name)
for r in self.exp.list_recorders(rtype=self.exp.RT_L):
if rec_id is not None and r.id != rec_id:
continue
recs.append(r)
self.recs.extend(recs)
def __init__(self, storages: Union[List[ExperimentStorage], ExperimentStorage]):
super().__init__(storages=storages)
self.storage = storages
def brief(self):
docs = []
for recorder in self.recs:
docs.append({"exp_name": self.exp.name, "record_info": recorder.info,
for recorder in self.storage.recs:
docs.append({"exp_name": self.storage.exp.name, "record_info": recorder.info,
"config": recorder.load_object("config"),
"context_summary": recorder.load_object("context_summary")})
return docs
class PracticeKnowledge(Knowledge):
"""
some template sentence for now
"""
def __init__(self, storages: Union[List[YamlStorage], YamlStorage]):
super().__init__(storages=storages, name="practice")
self.summarize()
def add(self, docs: List):
storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME))
storage.add(documents=docs)
self.storages.append(storage)
self.summarize()
self.save()
class FinanceKnowledge(Knowledge):
"""
Knowledge from articles
"""
def __init__(self, storages: Union[List[YamlStorage], YamlStorage]):
super().__init__(storages=storages, name="finance")
storage = self.get_storage(YamlStorage.DEFAULT_NAME)
if len(storage.documents) == 0:
docs = self.read_files_in_directory(self.workdir.joinpath(self.name))
self.add(docs)
def add(self, docs: List):
storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME))
storage.add(documents=docs)
self.storages.append(storage)
self.summarize()
self.save()
@staticmethod
def read_files_in_directory(directory):
"""
read all .txt files under directory
"""
# todo: split article in trunks
file_contents = []
for file_path in Path(directory).rglob("*.txt"):
if file_path.is_file():
file_content = file_path.read_text(encoding="utf-8")
file_contents.append(file_content)
return file_contents
class ExecuteKnowledge(Knowledge):
"""
Config and associate execution result(pass or error message). We can regard the example in prompt as pass execution
"""
def __init__(self, storages: Union[List[YamlStorage], YamlStorage]):
super().__init__(storages=storages, name="execute")
self.summarize()
def add(self, docs: List):
storage = YamlStorage(path=self.workdir.joinpath(YamlStorage.DEFAULT_NAME))
storage.add(documents=docs)
self.storages.append(storage)
self.summarize()
self.save()
class InfrastructureKnowledge(Knowledge):
"""
Knowledge from sentences, docstring, and code
"""
def __init__(self, storages: Union[List[YamlStorage], YamlStorage]):
super().__init__(storages=storages, name="infrastructure")
storage = self.get_storage(YamlStorage.DEFAULT_NAME)
if len(storage.documents) == 0:
docs = self.get_functions_and_docstrings(Path(__file__).parent.parent.parent)
self.add(docs)
def add(self, docs: List):
storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME))
storage.add(documents=docs)
self.storages.append(storage)
self.summarize()
self.save()
def get_functions_and_docstrings(self, directory):
"""
get all method and docstring in .py files under directory
"""
functions = []
for py_file_path in Path(directory).rglob('*.py'):
for _functions in self.get_functions_with_docstrings(py_file_path):
functions.append(_functions)
return functions
@staticmethod
def get_functions_with_docstrings(file_path):
"""
Extract method name and docstring using string matching method
"""
with open(file_path, "r", encoding="utf8") as f:
lines = f.readlines()
functions = []
current_func = None
docstring = None
for line in lines:
if line.strip().startswith("def ") or line.strip().startswith("class "):
func = line.strip().split(' ')[1].split('(')[0]
if func.startswith("__"):
continue
if current_func is not None:
docstring = docstring.replace('"""', "") if docstring else docstring
functions.append({"function": current_func, "docstring": docstring})
current_func = f"{file_path.name.split('.')[0]}.{func}"
docstring = None
elif current_func is not None and docstring is None and line.strip().startswith('"""'):
docstring = line
elif current_func is not None and docstring is not None:
docstring += line.strip()
if line.strip().endswith('"""'):
docstring = docstring.replace('"""', "") if docstring else docstring
functions.append({"function": current_func, "docstring": docstring})
current_func = None
docstring = None
return functions
class Topic:
def __init__(self, name: str, describe: Template):
@@ -97,60 +360,101 @@ class KnowledgeBase:
Load knowledge, offer brief information of knowledge and common handle interfaces
"""
def __init__(self, init_path=None, topics: List[Topic] = None):
KT_EXECUTE = "execute"
KT_PRACTICE = "practice"
KT_FINANCE = "finance"
KT_INFRASTRUCTURE = "infrastructure"
def __init__(self, workdir=None):
self.logger = FinCoLog()
init_path = init_path if init_path else Path.cwd()
self.workdir = Path(workdir) if workdir else Path.cwd()
if not init_path.exists():
self.logger.warning(f"{init_path} not exist, create empty directory.")
Path.mkdir(init_path)
if not self.workdir.exists():
self.logger.warning(f"{self.workdir} not exist, create empty directory.")
Path.mkdir(self.workdir)
self.knowledge = self.load(path=init_path)
self.practice_knowledge = self.load_practice_knowledge(self.workdir)
self.execute_knowledge = self.load_execute_knowledge(self.workdir)
self.finance_knowledge = self.load_finance_knowledge(self.workdir)
self.infrastructure_knowledge = self.load_infrastructure_knowledge(self.workdir)
# todo: replace list with persistent storage strategy such as ES/pinecone to enable
# literal search/semantic search
self.docs = self.brief(knowledge=self.knowledge)
self.topics = topics if topics else []
def load(self, path) -> List:
def load_experiment_knowledge(self, path) -> List:
# similar to practice knowledge, not use for now
if isinstance(path, str):
path = Path(path)
knowledge = []
path = path if path.name == "mlruns" else path.joinpath("mlruns")
# todo: check the influence of set uri
R.set_uri(path.as_uri())
for exp_name in R.list_experiments():
knowledge.append(KnowledgeExperiment(exp_name=exp_name))
knowledge.append(ExperimentKnowledge(storages=ExperimentStorage(exp_name=exp_name)))
self.logger.plain_info(f"Load knowledge from: {path} finished.")
return knowledge
def update(self, path):
# note: only update new knowledge in future
knowledge = self.load(path)
self.knowledge = knowledge
self.docs = self.brief(self.knowledge)
self.logger.plain_info(f"Update knowledge finished.")
def load_practice_knowledge(self, path: Path) -> PracticeKnowledge:
self.practice_knowledge = PracticeKnowledge(
YamlStorage(path.joinpath(f"{self.KT_PRACTICE}/{YamlStorage.DEFAULT_NAME}")))
return self.practice_knowledge
def brief(self, knowledge: List[Knowledge]) -> List:
docs = []
def load_execute_knowledge(self, path: Path) -> ExecuteKnowledge:
self.execute_knowledge = ExecuteKnowledge(
YamlStorage(path.joinpath(f"{self.KT_EXECUTE}/{YamlStorage.DEFAULT_NAME}")))
return self.execute_knowledge
def load_finance_knowledge(self, path: Path) -> FinanceKnowledge:
self.finance_knowledge = FinanceKnowledge(
YamlStorage(path.joinpath(f"{self.KT_FINANCE}/{YamlStorage.DEFAULT_NAME}")))
return self.finance_knowledge
def load_infrastructure_knowledge(self, path: Path) -> InfrastructureKnowledge:
self.infrastructure_knowledge = InfrastructureKnowledge(
YamlStorage(path.joinpath(f"{self.KT_INFRASTRUCTURE}/{YamlStorage.DEFAULT_NAME}")))
return self.infrastructure_knowledge
def get_knowledge(self, knowledge_type: str = None):
if knowledge_type == self.KT_EXECUTE:
knowledge = self.execute_knowledge.knowledge
elif knowledge_type == self.KT_PRACTICE:
knowledge = self.practice_knowledge.knowledge
elif knowledge_type == self.KT_FINANCE:
knowledge = self.finance_knowledge.knowledge
elif knowledge_type == self.KT_INFRASTRUCTURE:
knowledge = self.infrastructure_knowledge.knowledge
else:
knowledge = self.execute_knowledge.knowledge + self.practice_knowledge.knowledge \
+ self.finance_knowledge.knowledge + self.infrastructure_knowledge.knowledge
return knowledge
def query(self, knowledge_type: str = None, content: str = None, n: int = 5):
"""
@param knowledge_type: self.KT_EXECUTE, self.KT_PRACTICE or self.KT_FINANCE
@param content: content to query KnowledgeBase
@param n: top n knowledge to ask ChatGPT
@return:
"""
# todo: replace list with persistent storage strategy such as ES/pinecone to enable
# literal search/semantic search
knowledge = self.get_knowledge(knowledge_type=knowledge_type)
if len(knowledge) == 0:
return ""
scores = []
for k in knowledge:
docs.extend(k.brief())
scores.append(similarity(str(k), content))
sorted_indexes = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
similar_n_indexes = sorted_indexes[:n]
similar_n_docs = [knowledge[i] for i in similar_n_indexes]
self.logger.plain_info(f"Generate brief knowledge summary finished.")
return docs
prompt = Template(
"""find the most relevant doc with this query: '{{content}}' from docs='{{docs}}'.
Just return the most relevant item I provided, no more explain. For example: {'function': 'config.resolve_path', 'docstring': None}""")
prompt_workflow_selection = prompt.render(content=content, docs=similar_n_docs)
response = APIBackend().build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection, system_prompt="You are an excellent assistant."
)
def query(self, content: str = None):
# todo: query by DSL
return self.docs
def query_topics(self):
knowledge_of_topics = []
for topic in self.topics:
knowledge_of_topics.append({topic.name: topic.knowledge})
return knowledge_of_topics
def summarize_by_topic(self):
for topic in self.topics:
topic.summarize(self.docs)
return response

View File

@@ -216,6 +216,12 @@ CMDTask_system : |-
Example output:
cp -r a/b/c d/e/f
Example input:
- User intention: Copy the folder from a/b/c to d/e/f
- User OS: Windows
Example output:
xcopy /Y /f a/b/c d/e/f
CMDTask_user : |-
Example input:
- User intention: "{{cmd_intention}}"

View File

@@ -9,6 +9,7 @@ import re
import subprocess
import platform
import inspect
from jinja2 import Template
from qlib.finco.llm import APIBackend
from qlib.finco.tpl import get_tpl_path
@@ -17,6 +18,7 @@ from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer
from qlib.workflow import R
from qlib.finco.log import FinCoLog, LogColors
from qlib.finco.conf import Config
from qlib.finco.knowledge import KnowledgeBase, Topic
COMPONENT_LIST = ["Dataset", "DataHandler", "Model", "Record", "Strategy", "Backtest"]
@@ -176,8 +178,14 @@ class HighLevelPlanTask(PlanTask):
assert thinking_detail is not None, "The thinking detail is not provided"
assert user_intention is not None, "The user intention is not provided"
practice_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_PRACTICE, content=user_intention)
finance_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_FINANCE, content=user_intention)
system_prompt = self.system.render()
user_prompt = self.user.render(target=target, deliverable=deliverable, business_level=business_level, algorithm_level=algorithm_level, thinking_detail=thinking_detail, user_intention=user_intention)
user_prompt = self.user.render(target=target, deliverable=deliverable, business_level=business_level,
algorithm_level=algorithm_level, thinking_detail=thinking_detail,
practice_knowledge=practice_knowledge, finance_knowledge=finance_knowledge,
user_intention=user_intention)
response = APIBackend().build_messages_and_create_chat_completion(
user_prompt, system_prompt
@@ -229,8 +237,14 @@ class SLPlanTask(PlanTask):
experiment_count = max([i for i in range(10) if f"{i}." in experiments])
infrastructure_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_INFRASTRUCTURE,
content=experiments)
system_prompt = self.system.render()
user_prompt = self.user.render(target=target, deliverable=deliverable, business_level=business_level, algorithm_level=algorithm_level, thinking_detail=thinking_detail, user_intention=user_intention, experiments=experiments)
user_prompt = self.user.render(target=target, deliverable=deliverable, business_level=business_level,
algorithm_level=algorithm_level, thinking_detail=thinking_detail,
infrastructure_knowledge=infrastructure_knowledge,
user_intention=user_intention, experiments=experiments)
former_messages = []
if self.replan:
@@ -341,11 +355,14 @@ class TrainTask(Task):
try:
# Run the command and capture the output
workspace = self._context_manager.get_context("workspace")
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, cwd=str(workspace))
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True,
text=True, encoding="utf8", cwd=str(workspace))
except subprocess.CalledProcessError as e:
print(f"An error occurred while running the subprocess: {e.stderr} {e.stdout}")
real_error = e.stderr+e.stdout
KnowledgeBase().execute_knowledge.add([real_error])
if "data" in e.stdout.lower() or "handler" in e.stdout.lower():
return [HyperparameterActionTask("Dataset", regenerate=True, error=real_error),
HyperparameterActionTask("DataHandler", regenerate=True, error=real_error),
@@ -432,11 +449,9 @@ class AnalysisTask(Task):
else "workflow_config.yaml"
)
workspace = self._context_manager.get_context("workspace")
workflow_path = workspace.joinpath(workflow_config)
with workflow_path.open() as f:
workflow = yaml.safe_load(f)
experiment_name = workflow["experiment_name"] if "experiment_name" in workflow else "workflow"
# todo: analysis multi experiment(get recorder by id)
experiment_name = "workflow"
R.set_uri(Path.joinpath(workspace, 'mlruns').as_uri())
tasks = []
@@ -650,11 +665,19 @@ class HyperparameterActionTask(ActionTask):
hyperparameters.remove("dataset")
hyperparameters.remove("recorder")
target_component_classes_and_hyperparameters.append((module_path, class_name, hyperparameters))
execute_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_EXECUTE,
content=target_component_plan)
infrastructure_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_INFRASTRUCTURE,
content=target_component_plan)
user_prompt = self.user.render(
user_requirement=user_prompt,
target_component_plan=target_component_plan,
target_component=self.target_component,
target_component_classes_and_hyperparameters=target_component_classes_and_hyperparameters
target_component_classes_and_hyperparameters=target_component_classes_and_hyperparameters,
execute_knowledge=execute_knowledge,
infrastructure_knowledge=infrastructure_knowledge
)
former_messages = []
if self.regenerate:
@@ -987,7 +1010,9 @@ class SummarizeTask(Task):
file_info = self.get_info_from_file(workspace)
context_info = self.get_info_from_context() # too long context make response unstable.
record_info = self.get_info_from_recorder(workspace, workflow_yaml["experiment_name"])
# todo: experiments perhaps have the same name, summarize experiment by loop
record_info = self.get_info_from_recorder(workspace, "workflow")
figure_path = self.get_figure_path(workspace)
information = context_info + file_info + record_info
@@ -1012,7 +1037,7 @@ class SummarizeTask(Task):
)
context_summary.update({key: response})
recorder = R.get_recorder(experiment_name=workflow_yaml["experiment_name"])
recorder = R.get_recorder(experiment_name="workflow")
recorder.save_objects(context_summary=context_summary)
prompt_workflow_selection = self.summarize_metrics_user.render(
@@ -1029,6 +1054,14 @@ class SummarizeTask(Task):
user_prompt=prompt_workflow_selection, system_prompt=self.system.render()
)
KnowledgeBase().practice_knowledge.add([{"user_intention": user_prompt,
"experiment_metrics": metrics_response}])
# notes: summarize after all experiment added to KnowledgeBase
topic = Topic(name="rollingModel", describe=Template("What conclusion can you draw"))
topic.summarize(KnowledgeBase().practice_knowledge.knowledge)
self.logger.info(f"Summary of topic: {topic.name}: {topic.knowledge}")
self._context_manager.set_context("summary", response)
self.save_markdown(content=response, path=workspace)
self.logger.info(f"Report has saved to {self.__DEFAULT_REPORT_NAME}", title="End")

View File

@@ -1,4 +1,6 @@
import json
import string
import random
from fuzzywuzzy import fuzz
@@ -36,3 +38,8 @@ def similarity(text1, text2):
# Maybe we can use other similarity algorithm such as tfidf
return fuzz.ratio(text1, text2)
def random_string(length=10):
letters = string.ascii_letters + string.digits
return ''.join(random.choice(letters) for i in range(length))

View File

@@ -174,16 +174,16 @@ class LearnManager:
self.epoch = 0
self.wm = WorkflowManager()
topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in
self.__DEFAULT_TOPICS]
self.knowledge_base = KnowledgeBase(init_path=Path.cwd().joinpath('knowledge'), topics=topics)
self.topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in
self.__DEFAULT_TOPICS]
self.knowledge_base = KnowledgeBase(workdir=Path.cwd().joinpath('knowledge'))
self.knowledge_base.execute_knowledge.add([])
self.knowledge_base.query(knowledge_type="infrastructure", content="resolve_path")
def run(self, prompt):
# todo: add early stop condition
for i in range(10):
self.wm.run(prompt)
self.knowledge_base.update(self.wm._workspace)
self.knowledge_base.summarize_by_topic()
self.learn()
self.epoch += 1
@@ -204,9 +204,12 @@ class LearnManager:
user_prompt = self.wm.context.get_context("user_prompt")
summary = self.wm.context.get_context("summary")
[topic.summarize(self.knowledge_base.get_knowledge()) for topic in self.topics]
knowledge_of_topics = [{topic.name: topic.knowledge} for topic in self.topics]
for task in task_finished:
prompt_workflow_selection = self.wm.prompt_template.get(f"{self.__class__.__name__}_user").render(
summary=summary, brief=self.knowledge_base.query_topics(),
summary=summary, brief=knowledge_of_topics,
task_finished=[str(t) for t in task_finished],
task=task.__class__.__name__, system=task.system.render(), user_prompt=user_prompt
)