1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-01 18:11:18 +08:00

update knowledge module;

* add storage class;
* new practice,execute,finance,infrastructure knowledge;
* add query method to KnowledgeBase;
This commit is contained in:
Cadenza-Li
2023-07-12 17:23:47 +08:00
parent effed382e9
commit d7ab6935dd
2 changed files with 294 additions and 66 deletions

View File

@@ -1,10 +1,107 @@
from pathlib import Path
from jinja2 import Template
from typing import List
from typing import List, Union
import pickle
import yaml
import inspect
from qlib.workflow import R
from qlib.finco.log import FinCoLog
from qlib.finco.llm import APIBackend
from qlib.finco.utils import similarity
class Storage:
"""
This class is responsible for storage and loading of Knowledge related data.
"""
def __init__(self, path: Union[str, Path]):
self.path = path if isinstance(path, Path) else Path(path)
self.logger = FinCoLog()
self.source = None
# todo: get document by key
self.documents = []
def add(self, documents: List):
self.documents.extend(documents)
self.save()
def load(self, **kwargs):
raise NotImplementedError(f"Please implement the `load` method.")
def save(self, **kwargs):
raise NotImplementedError(f"Please implement the `save` method.")
class PickleStorage(Storage):
"""
This class is responsible for storage and loading of Knowledge related data in pickle format.
"""
def __init__(self, path: Union[str, Path]):
super().__init__(path)
@classmethod
def load(cls, path: Union[str, Path]):
"""use pickle as the default load method"""
path = path if isinstance(path, Path) else Path(path)
with open(path, "rb") as f:
return pickle.load(f)
def save(self, **kwargs):
"""use pickle as the default save method"""
with open(self.path, "wb") as f:
pickle.dump(self, f)
class YamlStorage(Storage):
"""
This class is responsible for storage and loading of Knowledge related data in yaml format.
"""
def __init__(self, path: Union[str, Path]):
super().__init__(path)
self.load()
def load(self):
"""load data from yaml format file"""
try:
self.documents = yaml.load(open(self.path, "r"), Loader=yaml.FullLoader)
except FileNotFoundError:
self.logger.warning(f"YamlStorage: file {self.path} doesn't exist.")
def save(self, **kwargs):
"""use pickle as the default save method"""
with open(self.path, 'w') as f:
yaml.dump(self.documents, f)
class ExperimentStorage(Storage):
"""
This class is responsible for storage and loading of mlflow related data.
"""
def __init__(self, exp_name, path=None):
super().__init__(path=path)
self.exp_name = exp_name
self.exp = None
self.recs = []
self.docs = []
def load(self, exp_name, rec_id=None):
recs = []
self.exp = R.get_exp(experiment_name=exp_name)
for r in self.exp.list_recorders(rtype=self.exp.RT_L):
if rec_id is not None and r.id != rec_id:
continue
recs.append(r)
self.recs.extend(recs)
class Knowledge:
@@ -12,8 +109,23 @@ class Knowledge:
Use to handle knowledge in finCo such as experiment and outside domain information
"""
def __init__(self):
def __init__(self, storage: Storage):
self.logger = FinCoLog()
self.storage = storage
self.knowledge = []
def summarize(self, **kwargs):
"""
summarize storage data to knowledge, default knowledge is storage.documents
Parameters
----------
Return
------
"""
self.knowledge = self.storage.documents
def load(self, **kwargs):
"""
@@ -39,39 +151,130 @@ class Knowledge:
"""
raise NotImplementedError(f"Please implement the `load` method.")
def save(self, **kwargs):
"""save knowledge persistently"""
self.storage.save(**kwargs)
class KnowledgeExperiment(Knowledge):
class ExperimentKnowledge(Knowledge):
"""
Handle knowledge from experiments
"""
def __init__(self, exp_name, rec_id=None):
super().__init__()
self.exp_name = exp_name
self.exp = None
self.recs = []
self.load(exp_name=exp_name, rec_id=rec_id)
def load(self, exp_name, rec_id=None):
recs = []
self.exp = R.get_exp(experiment_name=exp_name)
for r in self.exp.list_recorders(rtype=self.exp.RT_L):
if rec_id is not None and r.id != rec_id:
continue
recs.append(r)
self.recs.extend(recs)
def __init__(self, storage: ExperimentStorage):
super().__init__(storage=storage)
self.storage = storage
def brief(self):
docs = []
for recorder in self.recs:
docs.append({"exp_name": self.exp.name, "record_info": recorder.info,
for recorder in self.storage.recs:
docs.append({"exp_name": self.storage.exp.name, "record_info": recorder.info,
"config": recorder.load_object("config"),
"context_summary": recorder.load_object("context_summary")})
return docs
class PracticeKnowledge(Knowledge):
"""
some template sentence for now
"""
def __init__(self, storage: YamlStorage):
super().__init__(storage=storage)
self.summarize()
def add(self, docs: List):
self.storage.add(docs)
self.summarize()
self.save()
class FinanceKnowledge(Knowledge):
"""
Knowledge from articles
"""
def __init__(self, storage: YamlStorage):
super().__init__(storage=storage)
if len(self.storage.documents) == 0:
docs = self.read_files_in_directory(self.storage.path)
self.add(docs)
self.summarize()
def add(self, docs: List):
self.storage.add(docs)
self.summarize()
self.save()
@staticmethod
def read_files_in_directory(directory):
"""
read all .txt files under directory
"""
# todo: split article in trunks
file_contents = []
for file_path in Path(directory).rglob("*.txt"):
if file_path.is_file():
file_content = file_path.read_text(encoding="utf-8")
file_contents.append(file_content)
return file_contents
class ExecuteKnowledge(Knowledge):
"""
Config and associate execution result(pass or error message). We can regard the example in prompt as pass execution
"""
def __init__(self, storage: YamlStorage):
super().__init__(storage=storage)
self.summarize()
def add(self, docs: List):
self.storage.add(docs)
self.summarize()
self.save()
class InfrastructureKnowledge(Knowledge):
"""
Knowledge from sentences, docstring, and code
"""
def __init__(self, storage: YamlStorage):
super().__init__(storage=storage)
if len(self.storage.documents) == 0:
# todo: change the path to qlib root path
docs = self.get_functions_and_docstrings(Path.cwd().parent)
self.add(docs)
def add(self, docs: List):
self.storage.add(docs)
self.summarize()
self.save()
@staticmethod
def get_functions_and_docstrings(directory):
"""
get all method and docstring in .py files under directory
"""
functions = []
for file_path in Path(directory).rglob("*.py"):
with file_path.open("r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
if line.startswith("def "):
function_name = line.split("(")[0][4:].strip()
function_docstring = inspect.getdoc(eval(function_name))
functions.append({"function_name": function_name, "docstring": function_docstring})
return functions
class Topic:
def __init__(self, name: str, describe: Template):
@@ -97,60 +300,84 @@ class KnowledgeBase:
Load knowledge, offer brief information of knowledge and common handle interfaces
"""
def __init__(self, init_path=None, topics: List[Topic] = None):
KT_EXECUTE = "execute"
KT_PRACTICE = "practice"
KT_FINANCE = "finance"
def __init__(self, init_path=None):
self.logger = FinCoLog()
init_path = init_path if init_path else Path.cwd()
self.init_path = Path(init_path) if init_path else Path.cwd()
if not init_path.exists():
self.logger.warning(f"{init_path} not exist, create empty directory.")
Path.mkdir(init_path)
if not self.init_path.exists():
self.logger.warning(f"{self.init_path} not exist, create empty directory.")
Path.mkdir(self.init_path)
self.knowledge = self.load(path=init_path)
self.practice_knowledge = self.load_practice_knowledge(self.init_path)
self.execute_knowledge = self.load_execute_knowledge(self.init_path)
self.finance_knowledge = self.load_finance_knowledge(self.init_path)
# todo: replace list with persistent storage strategy such as ES/pinecone to enable
# literal search/semantic search
self.docs = self.brief(knowledge=self.knowledge)
self.topics = topics if topics else []
def load(self, path) -> List:
def load_experiment_knowledge(self, path) -> List:
# similar to practice knowledge, not use for now
if isinstance(path, str):
path = Path(path)
knowledge = []
path = path if path.name == "mlruns" else path.joinpath("mlruns")
# todo: check the influence of set uri
R.set_uri(path.as_uri())
for exp_name in R.list_experiments():
knowledge.append(KnowledgeExperiment(exp_name=exp_name))
knowledge.append(ExperimentKnowledge(storage=ExperimentStorage(exp_name=exp_name)))
self.logger.plain_info(f"Load knowledge from: {path} finished.")
return knowledge
def update(self, path):
# note: only update new knowledge in future
knowledge = self.load(path)
self.knowledge = knowledge
self.docs = self.brief(self.knowledge)
self.logger.plain_info(f"Update knowledge finished.")
def load_practice_knowledge(self, path: Path) -> PracticeKnowledge:
self.practice_knowledge = PracticeKnowledge(YamlStorage(path.joinpath("practice_knowledge.yaml")))
return self.practice_knowledge
def brief(self, knowledge: List[Knowledge]) -> List:
docs = []
def load_execute_knowledge(self, path: Path) -> ExecuteKnowledge:
self.execute_knowledge = ExecuteKnowledge(YamlStorage(path.joinpath("execute_knowledge.yaml")))
return self.execute_knowledge
def load_finance_knowledge(self, path: Path) -> FinanceKnowledge:
self.finance_knowledge = FinanceKnowledge(YamlStorage(path.joinpath("finance_knowledge.yaml")))
return self.finance_knowledge
def knowledge(self, knowledge_type: str = None):
if knowledge_type == self.KT_EXECUTE:
knowledge = self.execute_knowledge
elif knowledge_type == self.KT_PRACTICE:
knowledge = self.practice_knowledge
elif knowledge_type == self.KT_FINANCE:
knowledge = self.finance_knowledge
else:
knowledge = self.execute_knowledge.knowledge + self.practice_knowledge.knowledge \
+ self.finance_knowledge.knowledge
return knowledge
def query(self, knowledge_type: str = None, content: str = None, n: int = 5):
"""
@param knowledge_type: self.KT_EXECUTE, self.KT_PRACTICE or self.KT_FINANCE
@param content: content to query KnowledgeBase
@param n: top n knowledge to ask ChatGPT
@return:
"""
# todo: replace list with persistent storage strategy such as ES/pinecone to enable
# literal search/semantic search
knowledge = self.knowledge(knowledge_type=knowledge_type)
scores = []
for k in knowledge:
docs.extend(k.brief())
scores.append(similarity(str(k), content))
sorted_indexes = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
similar_n_indexes = sorted_indexes[:n]
similar_n_docs = [knowledge[i] for i in similar_n_indexes]
self.logger.plain_info(f"Generate brief knowledge summary finished.")
return docs
prompt = Template("""summarize this information: '{{docs}}'""")
prompt_workflow_selection = prompt.render(docs=similar_n_docs)
response = APIBackend().build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection
)
def query(self, content: str = None):
# todo: query by DSL
return self.docs
def query_topics(self):
knowledge_of_topics = []
for topic in self.topics:
knowledge_of_topics.append({topic.name: topic.knowledge})
return knowledge_of_topics
def summarize_by_topic(self):
for topic in self.topics:
topic.summarize(self.docs)
return response

View File

@@ -174,16 +174,14 @@ class LearnManager:
self.epoch = 0
self.wm = WorkflowManager()
topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in
self.__DEFAULT_TOPICS]
self.knowledge_base = KnowledgeBase(init_path=Path.cwd().joinpath('knowledge'), topics=topics)
self.topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in
self.__DEFAULT_TOPICS]
self.knowledge_base = KnowledgeBase(init_path=Path.cwd().joinpath('knowledge'))
def run(self, prompt):
# todo: add early stop condition
for i in range(10):
self.wm.run(prompt)
self.knowledge_base.update(self.wm._workspace)
self.knowledge_base.summarize_by_topic()
self.learn()
self.epoch += 1
@@ -204,9 +202,12 @@ class LearnManager:
user_prompt = self.wm.context.get_context("user_prompt")
summary = self.wm.context.get_context("summary")
[topic.summarize(self.knowledge_base.knowledge()) for topic in self.topics]
knowledge_of_topics = [{topic.name: topic.knowledge} for topic in self.topics]
for task in task_finished:
prompt_workflow_selection = self.wm.prompt_template.get(f"{self.__class__.__name__}_user").render(
summary=summary, brief=self.knowledge_base.query_topics(),
summary=summary, brief=knowledge_of_topics,
task_finished=[str(t) for t in task_finished],
task=task.__class__.__name__, system=task.system.render(), user_prompt=user_prompt
)