diff --git a/qlib/finco/.env.example b/qlib/finco/.env.example index c7049b9c4..9931294e9 100644 --- a/qlib/finco/.env.example +++ b/qlib/finco/.env.example @@ -15,4 +15,6 @@ MAX_TOKENS=1600 MAX_RETRY=120 CONTINOUS_MODE=True -DEBUG_MODE=True \ No newline at end of file +DEBUG_MODE=True + +# TEMPERATURE= diff --git a/qlib/finco/__init__.py b/qlib/finco/__init__.py index dba6156a1..f8a0f94c2 100644 --- a/qlib/finco/__init__.py +++ b/qlib/finco/__init__.py @@ -10,4 +10,4 @@ def get_finco_path() -> Path: return the template path Because the template path is located in the folder. We don't know where it is located. So __file__ for this module will be used. """ - return DIRNAME \ No newline at end of file + return DIRNAME diff --git a/qlib/finco/conf.py b/qlib/finco/conf.py index 85e02e2cc..2ee04fc04 100644 --- a/qlib/finco/conf.py +++ b/qlib/finco/conf.py @@ -1,5 +1,5 @@ # TODO: use pydantic for other modules in Qlib -from pydantic import BaseSettings +# from pydantic_settings import BaseSettings from qlib.finco.utils import SingletonBaseClass import os diff --git a/qlib/finco/context.py b/qlib/finco/context.py new file mode 100644 index 000000000..226824ece --- /dev/null +++ b/qlib/finco/context.py @@ -0,0 +1,97 @@ +from dataclasses import dataclass, field +import copy +from pathlib import Path +from typing import Optional, List +from qlib.finco.log import FinCoLog +from qlib.typehint import Literal + +from qlib.finco.utils import similarity + + +@dataclass +class Design: + plan: str + classes: str + decision: str + + +@dataclass +class Exp: + """Experiment""" + + # compoments + dataset: Optional[Design] = None + datahandler: Optional[Design] = None + model: Optional[Design] = None + record: Optional[Design] = None + strategy: Optional[Design] = None + backtest: Optional[Design] = None + + # basic + template: Optional[Path] = None + + # rolling strategy. None indicates no rolling + rolling: Optional[Literal["base", "ddgda"]] = None + + +@dataclass +class StructContext: + """Part of the context have clear meaning and structure, so they will be saved here and can be easily retrieved and understood""" + + # TODO: move more content in WorkflowContextManager.context to here + workspace: Path + exp_list: List[Exp] = field(default_factory=list) # the planned experiments + + +class WorkflowContextManager: + """Context Manager stores the context of the workflow""" + + """All context are key value pairs which saves the input, output and status of the whole workflow""" + + def __init__(self, workspace: Path) -> None: + self.context = {} + self.logger = FinCoLog() + # this context is public + self.struct_context = StructContext(workspace) # TODO: move more content in context to here + self.set_context("workspace", workspace) # TODO: remove me + + def set_context(self, key, value): + if key in self.context: + self.logger.warning("The key already exists in the context, the value will be overwritten") + self.context[key] = value + + def get_context(self, key): + # NOTE: if the key doesn't exist, return None. In the future, we may raise an error to detect abnormal behavior + if key not in self.context: + self.logger.warning("The key doesn't exist in the context") + return None + return self.context[key] + + def update_context(self, key, new_value): + # NOTE: if the key doesn't exist, return None. In the future, we may raise an error to detect abnormal behavior + if key not in self.context: + self.logger.warning("The key doesn't exist in the context") + self.context.update({key: new_value}) + + def get_all_context(self): + """return a deep copy of the context""" + """TODO: do we need to return a deep copy?""" + return copy.deepcopy(self.context) + + def retrieve(self, query: str) -> dict: + if query in self.context.keys(): + return {query: self.context.get(query)} + + # Note: retrieve information from context by string similarity maybe abandon in future + scores = {} + for k, v in self.context.items(): + scores.update({k: max(similarity(query, k), similarity(query, v))}) + max_score_key = max(scores, key=scores.get) + return {max_score_key: self.context.get(max_score_key)} + + def clear(self, reserve: list = None): + if reserve is None: + reserve = [] + + _context = {k: self.get_context(k) for k in reserve} + self.context = _context diff --git a/qlib/finco/knowledge.py b/qlib/finco/knowledge.py index 7a8633e56..f3f834ea5 100644 --- a/qlib/finco/knowledge.py +++ b/qlib/finco/knowledge.py @@ -65,6 +65,7 @@ class YamlStorage(Storage): This class is responsible for storage and loading of Knowledge related data in yaml format. """ + DEFAULT_NAME = "storage.yml" def __init__(self, path: Union[str, Path]): @@ -82,7 +83,7 @@ class YamlStorage(Storage): def save(self, **kwargs): """use pickle as the default save method""" Path.mkdir(self.path.parent, exist_ok=True) - with open(self.path, 'w') as f: + with open(self.path, "w") as f: yaml.dump(self.documents, f) @@ -190,9 +191,14 @@ class ExperimentKnowledge(Knowledge): def brief(self): docs = [] for recorder in self.storage.recs: - docs.append({"exp_name": self.storage.exp.name, "record_info": recorder.info, - "config": recorder.load_object("config"), - "context_summary": recorder.load_object("context_summary")}) + docs.append( + { + "exp_name": self.storage.exp.name, + "record_info": recorder.info, + "config": recorder.load_object("config"), + "context_summary": recorder.load_object("context_summary"), + } + ) return docs @@ -295,7 +301,7 @@ class InfrastructureKnowledge(Knowledge): """ functions = [] - for py_file_path in Path(directory).rglob('*.py'): + for py_file_path in Path(directory).rglob("*.py"): for _functions in self.get_functions_with_docstrings(py_file_path): functions.append(_functions) @@ -314,7 +320,7 @@ class InfrastructureKnowledge(Knowledge): docstring = None for line in lines: if line.strip().startswith("def ") or line.strip().startswith("class "): - func = line.strip().split(' ')[1].split('(')[0] + func = line.strip().split(" ")[1].split("(")[0] if func.startswith("__"): continue if current_func is not None: @@ -336,7 +342,6 @@ class InfrastructureKnowledge(Knowledge): class Topic: - def __init__(self, name: str, describe: Template): self.name = name self.describe = describe @@ -347,9 +352,7 @@ class Topic: def summarize(self, docs: list): self.logger.info(f"Summarize topic: \nname: {self.name}\ndescribe: {self.describe.module}") prompt_workflow_selection = self.describe.render(docs=docs) - response = APIBackend().build_messages_and_create_chat_completion( - user_prompt=prompt_workflow_selection - ) + response = APIBackend().build_messages_and_create_chat_completion(user_prompt=prompt_workflow_selection) self.knowledge = response self.docs = docs @@ -395,22 +398,26 @@ class KnowledgeBase: def load_practice_knowledge(self, path: Path) -> PracticeKnowledge: self.practice_knowledge = PracticeKnowledge( - YamlStorage(path.joinpath(f"{self.KT_PRACTICE}/{YamlStorage.DEFAULT_NAME}"))) + YamlStorage(path.joinpath(f"{self.KT_PRACTICE}/{YamlStorage.DEFAULT_NAME}")) + ) return self.practice_knowledge def load_execute_knowledge(self, path: Path) -> ExecuteKnowledge: self.execute_knowledge = ExecuteKnowledge( - YamlStorage(path.joinpath(f"{self.KT_EXECUTE}/{YamlStorage.DEFAULT_NAME}"))) + YamlStorage(path.joinpath(f"{self.KT_EXECUTE}/{YamlStorage.DEFAULT_NAME}")) + ) return self.execute_knowledge def load_finance_knowledge(self, path: Path) -> FinanceKnowledge: self.finance_knowledge = FinanceKnowledge( - YamlStorage(path.joinpath(f"{self.KT_FINANCE}/{YamlStorage.DEFAULT_NAME}"))) + YamlStorage(path.joinpath(f"{self.KT_FINANCE}/{YamlStorage.DEFAULT_NAME}")) + ) return self.finance_knowledge def load_infrastructure_knowledge(self, path: Path) -> InfrastructureKnowledge: self.infrastructure_knowledge = InfrastructureKnowledge( - YamlStorage(path.joinpath(f"{self.KT_INFRASTRUCTURE}/{YamlStorage.DEFAULT_NAME}"))) + YamlStorage(path.joinpath(f"{self.KT_INFRASTRUCTURE}/{YamlStorage.DEFAULT_NAME}")) + ) return self.infrastructure_knowledge def get_knowledge(self, knowledge_type: str = None): @@ -423,8 +430,12 @@ class KnowledgeBase: elif knowledge_type == self.KT_INFRASTRUCTURE: knowledge = self.infrastructure_knowledge.knowledge else: - knowledge = self.execute_knowledge.knowledge + self.practice_knowledge.knowledge \ - + self.finance_knowledge.knowledge + self.infrastructure_knowledge.knowledge + knowledge = ( + self.execute_knowledge.knowledge + + self.practice_knowledge.knowledge + + self.finance_knowledge.knowledge + + self.infrastructure_knowledge.knowledge + ) return knowledge def query(self, knowledge_type: str = None, content: str = None, n: int = 5): @@ -450,8 +461,9 @@ class KnowledgeBase: similar_n_docs = [knowledge[i] for i in similar_n_indexes] prompt = Template( - """find the most relevant doc with this query: '{{content}}' from docs='{{docs}}'. - Just return the most relevant item I provided, no more explain. For example: {'function': 'config.resolve_path', 'docstring': None}""") + """find the most relevant doc with this query: '{{content}}' from docs='{{docs}}'. + Just return the most relevant item I provided, no more explain. For example: {'function': 'config.resolve_path', 'docstring': None}""" + ) prompt_workflow_selection = prompt.render(content=content, docs=similar_n_docs) response = APIBackend().build_messages_and_create_chat_completion( user_prompt=prompt_workflow_selection, system_prompt="You are an excellent assistant." diff --git a/qlib/finco/llm.py b/qlib/finco/llm.py index e07073096..b7216fe1a 100644 --- a/qlib/finco/llm.py +++ b/qlib/finco/llm.py @@ -43,7 +43,7 @@ class APIBackend(SingletonBaseClass): "content": system_prompt, } ] - messages.extend(former_messages[-1*cfg.max_past_message_include:]) + messages.extend(former_messages[-1 * cfg.max_past_message_include :]) messages.append( { "role": "user", @@ -82,7 +82,6 @@ class APIBackend(SingletonBaseClass): temperature: float = None, max_tokens: Optional[int] = None, ) -> str: - if self.debug_mode: key = json.dumps(messages) if key in self.cache: diff --git a/qlib/finco/log.py b/qlib/finco/log.py index 7d4b68118..c87ef0a0b 100644 --- a/qlib/finco/log.py +++ b/qlib/finco/log.py @@ -12,6 +12,7 @@ class LogColors: """ ANSI color codes for use in console output. """ + RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" @@ -58,8 +59,11 @@ def formatting_log(logger, title="Info"): a context manager, print liens before and after a function """ length = {"Start": 120, "Task": 120, "Info": 60, "Interact": 60, "End": 120}.get(title, 60) - color, bold = (LogColors.YELLOW, LogColors.BOLD) \ - if title in ["Start", "Task", "Info", "Interact", "End"] else (LogColors.CYAN, "") + color, bold = ( + (LogColors.YELLOW, LogColors.BOLD) + if title in ["Start", "Task", "Info", "Interact", "End"] + else (LogColors.CYAN, "") + ) logger.info("") logger.info(f"{color}{bold}{'-'} {title} {'-' * (length - len(title))}{LogColors.END}") yield @@ -99,12 +103,12 @@ class FinCoLog(SingletonBaseClass): f"{LogColors.MAGENTA}{LogColors.BOLD}Role:{LogColors.END} " f"{LogColors.CYAN}{m['role']}{LogColors.END}\n" + f"{LogColors.MAGENTA}{LogColors.BOLD}Content:{LogColors.END} " - f"{LogColors.CYAN}{m['content']}{LogColors.END}\n") + f"{LogColors.CYAN}{m['content']}{LogColors.END}\n" + ) def log_response(self, response: str): with formatting_log(self.logger, "GPT Response"): - self.logger.info( - f"{LogColors.CYAN}{response}{LogColors.END}\n") + self.logger.info(f"{LogColors.CYAN}{response}{LogColors.END}\n") # TODO: # It looks wierd if we only have logger @@ -118,14 +122,13 @@ class FinCoLog(SingletonBaseClass): def plain_info(self, *args): for arg in args: self.logger.info( - f"{LogColors.YELLOW}{LogColors.BOLD}Info:{LogColors.END}{LogColors.WHITE}{arg}{LogColors.END}") + f"{LogColors.YELLOW}{LogColors.BOLD}Info:{LogColors.END}{LogColors.WHITE}{arg}{LogColors.END}" + ) def warning(self, *args): for arg in args: - self.logger.warning( - f"{LogColors.BLUE}{LogColors.BOLD}Warning:{LogColors.END}{arg}") + self.logger.warning(f"{LogColors.BLUE}{LogColors.BOLD}Warning:{LogColors.END}{arg}") def error(self, *args): for arg in args: - self.logger.error( - f"{LogColors.RED}{LogColors.BOLD}Error:{LogColors.END}{arg}") + self.logger.error(f"{LogColors.RED}{LogColors.BOLD}Error:{LogColors.END}{arg}") diff --git a/qlib/finco/prompt_template.py b/qlib/finco/prompt_template.py index c056db591..82b85ed19 100644 --- a/qlib/finco/prompt_template.py +++ b/qlib/finco/prompt_template.py @@ -10,8 +10,9 @@ from qlib.finco import get_finco_path class PromptTemplate(SingletonBaseClass): def __init__(self) -> None: super().__init__() - _template = yaml.load(open(Path.joinpath(get_finco_path(), "prompt_template.yaml"), "r"), - Loader=yaml.FullLoader) + _template = yaml.load( + open(Path.joinpath(get_finco_path(), "prompt_template.yaml"), "r"), Loader=yaml.FullLoader + ) for k, v in _template.items(): if k == "mods": continue @@ -28,5 +29,5 @@ class PromptTemplate(SingletonBaseClass): file_path = Path(file_path) Path.mkdir(file_path.parent, exist_ok=True) - with open(file_path, 'w') as f: + with open(file_path, "w") as f: yaml.dump(self.__dict__, f) diff --git a/qlib/finco/task.py b/qlib/finco/task.py index f92cbed02..9ae87a603 100644 --- a/qlib/finco/task.py +++ b/qlib/finco/task.py @@ -2,7 +2,7 @@ import os from pathlib import Path import io -from typing import Any, List, Union +from typing import Any, List, Optional, Union import ruamel.yaml as yaml import abc import re @@ -20,6 +20,8 @@ from qlib.finco.log import FinCoLog, LogColors from qlib.finco.conf import Config from qlib.finco.knowledge import KnowledgeBase, Topic +from qlib.finco.context import Design, Exp, WorkflowContextManager + COMPONENT_LIST = ["Dataset", "DataHandler", "Model", "Record", "Strategy", "Backtest"] @@ -40,9 +42,19 @@ class Task: - Edit Task: it is supposed to edit the code base directly. """ - def __init__(self) -> None: - self._context_manager = None + _context_manager: WorkflowContextManager + + def __init__(self, tpl_ver: Optional[str] = None) -> None: + """ + + Parameters + ---------- + tpl_ver : Optional[str] + The Version of the template. + If the previous results will greatly affect the next QA. We may use different version instead of combine everything in the same one. + """ self.prompt_template = PromptTemplate() + self.tlp_ver = tpl_ver self.executed = False self.continuous = Config().continuous_mode self.logger = FinCoLog() @@ -77,9 +89,9 @@ class Task: def interact(self, prompt: str, **kwargs) -> Any: """ - The user can interact with the task. This method only handle business in current task. It will return True - while continuous is True. This method will return user input if input cannot be parsed as 'yes' or 'no'. - @return True, False, str + The user can interact with the task. This method only handle business in current task. It will return True + while continuous is True. This method will return user input if input cannot be parsed as 'yes' or 'no'. + @return True, False, str """ self.logger.info(title="Interact") if self.continuous: @@ -100,11 +112,17 @@ class Task: @property def system(self): - return self.prompt_template.get(self.__class__.__name__ + "_system") + key = self.__class__.__name__ + "_system" + if self.tlp_ver is not None: + key = key + "." + self.tlp_ver + return self.prompt_template.get(key) @property def user(self): - return self.prompt_template.get(self.__class__.__name__ + "_user") + key = self.__class__.__name__ + "_user" + if self.tlp_ver is not None: + key = key + "." + self.tlp_ver + return self.prompt_template.get(key) def __str__(self): return self.__class__.__name__ @@ -123,9 +141,7 @@ class WorkflowTask(Task): response = APIBackend().build_messages_and_create_chat_completion( prompt_workflow_selection, self.system.render() ) - self.save_chat_history_to_context_manager( - prompt_workflow_selection, response, self.system.render() - ) + self.save_chat_history_to_context_manager(prompt_workflow_selection, response, self.system.render()) workflow = response.split(":")[1].strip().lower() self.executed = True self._context_manager.set_context("workflow", workflow) @@ -154,15 +170,23 @@ class PlanTask(Task): class HighLevelPlanTask(PlanTask): def __init__(self) -> None: super().__init__() - + def execute(self): - self._context_manager.set_context("target", "minimizing the maximum drawdown") - self._context_manager.set_context("deliverable", "a daily quantitative investment strategy in A-share stock market. A model will be included in the strategy.") - self._context_manager.set_context("user_intention", "build an A-share stock market daily portfolio in quantitative investment and minimize the maximum drawdown.") + self._context_manager.set_context( + "deliverable", + "a daily quantitative investment strategy in A-share stock market. A model will be included in the strategy.", + ) + self._context_manager.set_context( + "user_intention", + "build an A-share stock market daily portfolio in quantitative investment and minimize the maximum drawdown.", + ) self._context_manager.set_context("business_level", "Controller(e.g. Rolling retrain), Data") self._context_manager.set_context("algorithm_level", "supervised learning") - self._context_manager.set_context("thinking_detail", "We want to leverage more recent data than outdated data. So we have to compare with or without rolling training process of the model like a meta controller. When with a rolling training process, data will be different at each time.") + self._context_manager.set_context( + "thinking_detail", + "We want to leverage more recent data than outdated data. So we have to compare with or without rolling training process of the model like a meta controller. When with a rolling training process, data will be different at each time.", + ) target = self._context_manager.get_context("target") deliverable = self._context_manager.get_context("deliverable") @@ -182,24 +206,26 @@ class HighLevelPlanTask(PlanTask): finance_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_FINANCE, content=user_intention) system_prompt = self.system.render() - user_prompt = self.user.render(target=target, deliverable=deliverable, business_level=business_level, - algorithm_level=algorithm_level, thinking_detail=thinking_detail, - practice_knowledge=practice_knowledge, finance_knowledge=finance_knowledge, - user_intention=user_intention) - - response = APIBackend().build_messages_and_create_chat_completion( - user_prompt, system_prompt + user_prompt = self.user.render( + target=target, + deliverable=deliverable, + business_level=business_level, + algorithm_level=algorithm_level, + thinking_detail=thinking_detail, + practice_knowledge=practice_knowledge, + finance_knowledge=finance_knowledge, + user_intention=user_intention, ) - self.save_chat_history_to_context_manager( - user_prompt, response, system_prompt - ) + response = APIBackend().build_messages_and_create_chat_completion(user_prompt, system_prompt) + + response = APIBackend().build_messages_and_create_chat_completion(user_prompt, system_prompt) + + self.save_chat_history_to_context_manager(user_prompt, response, system_prompt) assert response is not None, "The response is None" - res = re.search( - r"Workflow:(.*)Experiments:(.*)Metrics:(.*)", response, re.S - ) + res = re.search(r"Workflow:(.*)Experiments:(.*)Metrics:(.*)", response, re.S) assert ( res is not None and len(res.groups()) == 3 @@ -225,7 +251,7 @@ class SLPlanTask(PlanTask): def execute(self): workflow = self._context_manager.get_context("high_level_workflow") - assert (workflow.lower() == "supervised learning"), "The workflow is not supervised learning" + assert workflow.lower() == "supervised learning", "The workflow is not supervised learning" target = self._context_manager.get_context("target") deliverable = self._context_manager.get_context("deliverable") @@ -237,30 +263,35 @@ class SLPlanTask(PlanTask): experiment_count = max([i for i in range(10) if f"{i}." in experiments]) - infrastructure_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_INFRASTRUCTURE, - content=experiments) + infrastructure_knowledge = KnowledgeBase().query( + knowledge_type=KnowledgeBase.KT_INFRASTRUCTURE, content=experiments + ) system_prompt = self.system.render() - user_prompt = self.user.render(target=target, deliverable=deliverable, business_level=business_level, - algorithm_level=algorithm_level, thinking_detail=thinking_detail, - infrastructure_knowledge=infrastructure_knowledge, - user_intention=user_intention, experiments=experiments) + user_prompt = self.user.render( + target=target, + deliverable=deliverable, + business_level=business_level, + algorithm_level=algorithm_level, + thinking_detail=thinking_detail, + infrastructure_knowledge=infrastructure_knowledge, + user_intention=user_intention, + experiments=experiments, + ) former_messages = [] if self.replan: user_prompt = f"your choice of predefined classes cannot be initialized.\nPlease rewrite the plan and answer with exact required format in system prompt and reply with no more explainations.\nThe error message: {self.error}. Please correct the former answer accordingly." - former_messages = self._context_manager.get_context("chat_history")[self.__class__.__name__]['None'][1:] + former_messages = self._context_manager.get_context("chat_history")[self.__class__.__name__]["None"][1:] response = APIBackend().build_messages_and_create_chat_completion( user_prompt, system_prompt, former_messages=former_messages ) - self.save_chat_history_to_context_manager( - user_prompt, system_prompt, self.system.render() - ) + self.save_chat_history_to_context_manager(user_prompt, system_prompt, self.system.render()) for i in range(1, experiment_count + 1): assert f"Experiment {i}" in response, f"The experiment {i} is not found in the response" self._context_manager.set_context("experiment_count", experiment_count) - - decision_pattern = re.compile("\((.*?)\)") + + decision_pattern = re.compile(r"\((.*?)\)") class_pattern = re.compile("{(.*?)}-{(.*?)}") new_task = [] @@ -270,8 +301,10 @@ class SLPlanTask(PlanTask): re_pattern = re_pattern + "Difference:(.*)" re_pattern = re.compile(re_pattern, re.S) # 1) CURD on the workspace + self._context_manager match_res = re.search(re_pattern, response) for experiment_id in range(1, experiment_count + 1): + exp = Exp() for name in COMPONENT_LIST: target_line = [line for line in match_res.group(experiment_id).split("\n") if f"{name}:" in line] assert len(target_line) == 1, f"The {name} is not found in the response" @@ -289,27 +322,42 @@ class SLPlanTask(PlanTask): self._context_manager.set_context(f"{name}_experiment_{experiment_id}_decision", decision) self._context_manager.set_context(f"{name}_experiment_{experiment_id}_classes", classes) self._context_manager.set_context(f"{name}_experiment_{experiment_id}_plan", target_line) - + setattr(exp, name.lower(), Design(plan=target_line, classes=classes, decision=decision)) assert decision in ["Default", "Personized"], f"The decision of {name} is not correct" - + # TODO: the strctured experiments should replace + self._context_manager.struct_context.exp_list.append(exp) + # 1) create a workspace # TODO: we have to make choice between `sl` and `sl-cfg` - new_task.append( - ConfigSearchTask() - ) + # new_task.append( + # # ConfigSearchTask(get_tpl_path()), # select template from the tpl folder directly. The prompt does not align with the task + # ConfigSearchTask(), # select template from the baselines. + # ) + + # Because selecting template is not that stable. We try to start with + cfg_tpl = get_tpl_path() / "sl" / "workflow_config.yaml" + new_task.append(CMDTask(f"make a directory in the '{self._context_manager.struct_context.workspace}'")) + for i, exp in enumerate(self._context_manager.struct_context.exp_list, 1): + exp.template = cfg_tpl + new_task.append( + CMDTask( + f"copy the file '{cfg_tpl}' to '{self._context_manager.struct_context.workspace}' and rename to experiment_{i}.yaml" + ) + ) + # for name in COMPONENT_LIST: - # if decision == "Default": + # if decision == "Default": new_task.extend([HyperparameterFinetuneActionTask()]) - # elif decision == "Personized": - # # TODO open ImplementActionTask to let GPT write code - # new_task.extend([HyperparameterActionTask(name), ConfigActionTask(name), YamlEditTask(name)]) - # # new_task.extend([HyperparameterActionTask(name), ConfigActionTask(name), ImplementActionTask(name), CodeDumpTask(name), YamlEditTask(name)]) + # elif decision == "Personized": + # # TODO open ImplementActionTask to let GPT write code + # new_task.extend([HyperparameterActionTask(name), ConfigActionTask(name), YamlEditTask(name)]) + # # new_task.extend([HyperparameterActionTask(name), ConfigActionTask(name), ImplementActionTask(name), CodeDumpTask(name), YamlEditTask(name)]) return new_task class RLPlanTask(PlanTask): def __init__( - self, + self, ) -> None: super().__init__() self.logger.error("The RL task is not implemented yet") @@ -328,7 +376,7 @@ class TrainTask(Task): This train task is responsible for training model configure by yaml file. """ - def __init__(self, experiment_index, rolling = False, ddgda=False, **kwargs) -> None: + def __init__(self, experiment_index, rolling=False, ddgda=False, **kwargs) -> None: super().__init__() self._output = None self._experiment_index = experiment_index @@ -344,41 +392,74 @@ class TrainTask(Task): workflow = yaml.safe_load(f) self._context_manager.set_context(f"workflow_{self._experiment_index}_yaml", workflow) - confirm = self.interact(f"I select this workflow file: " - f"{LogColors().render(workflow_path, color=LogColors.YELLOW, style=LogColors.BOLD)}\n" - f"{yaml.dump(workflow, default_flow_style=False)}" - f"Are you sure you want to use? yes(Y/y), no(N/n):") + confirm = self.interact( + f"I select this workflow file: " + f"{LogColors().render(workflow_path, color=LogColors.YELLOW, style=LogColors.BOLD)}\n" + f"{yaml.dump(workflow, default_flow_style=False)}" + f"Are you sure you want to use? yes(Y/y), no(N/n):" + ) if confirm is False: return [] - command = ["qrun", str(workflow_path)] - try: - # Run the command and capture the output - workspace = self._context_manager.get_context("workspace") - result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, - text=True, encoding="utf8", cwd=str(workspace)) - - except subprocess.CalledProcessError as e: - print(f"An error occurred while running the subprocess: {e.stderr} {e.stdout}") - real_error = e.stderr+e.stdout - KnowledgeBase().execute_knowledge.add([real_error]) + if not self._rolling: + command = ["qrun", str(workflow_path)] + try: + # Run the command and capture the output + workspace = self._context_manager.get_context("workspace") + _ = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + text=True, + encoding="utf8", + cwd=str(workspace), + ) - if "data" in e.stdout.lower() or "handler" in e.stdout.lower(): - return [HyperparameterActionTask("Dataset", regenerate=True, error=real_error), + except subprocess.CalledProcessError as e: + print(f"An error occurred while running the subprocess: {e.stderr} {e.stdout}") + real_error = e.stderr + e.stdout + KnowledgeBase().execute_knowledge.add([real_error]) + + if "data" in e.stdout.lower() or "handler" in e.stdout.lower(): + return [ + HyperparameterActionTask("Dataset", regenerate=True, error=real_error), HyperparameterActionTask("DataHandler", regenerate=True, error=real_error), - ConfigActionTask("Dataset"), ConfigActionTask("DataHandler"), YamlEditTask("Dataset"), - YamlEditTask("DataHandler"), TrainTask()] - elif "model" in e.stdout.lower(): - return [HyperparameterActionTask("Model", regenerate=True, error=real_error), - ConfigActionTask("Model"), YamlEditTask("Model"), TrainTask()] - else: - ret_list = [] - for component in COMPONENT_LIST: - ret_list.append(HyperparameterActionTask(component, regenerate=True, error=real_error)) - ret_list.append(ConfigActionTask(component)) - ret_list.append(YamlEditTask(component)) - ret_list.append(TrainTask()) - return ret_list + ConfigActionTask("Dataset"), + ConfigActionTask("DataHandler"), + YamlEditTask("Dataset"), + YamlEditTask("DataHandler"), + TrainTask(), + ] + elif "model" in e.stdout.lower(): + return [ + HyperparameterActionTask("Model", regenerate=True, error=real_error), + ConfigActionTask("Model"), + YamlEditTask("Model"), + TrainTask(), + ] + else: + ret_list = [] + for component in COMPONENT_LIST: + ret_list.append(HyperparameterActionTask(component, regenerate=True, error=real_error)) + ret_list.append(ConfigActionTask(component)) + ret_list.append(YamlEditTask(component)) + ret_list.append(TrainTask()) + return ret_list + elif not self._ddgda: + command = f"python -m qlib.contrib.rolling base --conf_path {workflow_path} run" + # Run the command and capture the output + workspace = self._context_manager.struct_context.workspace + subprocess.run( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, cwd=str(workspace) + ) + else: + command = f"python -m qlib.contrib.rolling ddgda --conf_path {workflow_path} run" + # Run the command and capture the output + workspace = self._context_manager.struct_context.workspace + subprocess.run( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, cwd=str(workspace) + ) return [AnalysisTask()] @@ -386,9 +467,7 @@ class TrainTask(Task): if self._output is not None: # TODO: it will be overrides by later commands # utf8 can't decode normally on Windows - self._context_manager.set_context( - self.__class__.__name__, self._output.decode("ANSI") - ) + self._context_manager.set_context(self.__class__.__name__, self._output.decode("ANSI")) class AnalysisTask(Task): @@ -415,9 +494,7 @@ class AnalysisTask(Task): self._context_manager.set_context(k, v) def execute(self): - prompt = self.user.render( - user_prompt=self._context_manager.get_context("user_prompt") - ) + prompt = self.user.render(user_prompt=self._context_manager.get_context("user_prompt")) be = APIBackend() be.debug_mode = False @@ -430,8 +507,9 @@ class AnalysisTask(Task): ), ) analysers = response.replace(" ", "").split(",") - confirm = self.interact(f"I select these analysers: {analysers}\n" - f"Are you sure you want to use? yes(Y/y), no(N/n) or prompt:") + confirm = self.interact( + f"I select these analysers: {analysers}\n" f"Are you sure you want to use? yes(Y/y), no(N/n) or prompt:" + ) if confirm is False: analysers = [] break @@ -452,15 +530,14 @@ class AnalysisTask(Task): # todo: analysis multi experiment(get recorder by id) experiment_name = "workflow" - R.set_uri(Path.joinpath(workspace, 'mlruns').as_uri()) + R.set_uri(Path.joinpath(workspace, "mlruns").as_uri()) tasks = [] for analyser in analysers: if analyser in self.__ANALYZERS_PROJECT.keys(): tasks.append( self.__ANALYZERS_PROJECT.get(analyser)( - recorder=R.get_recorder(experiment_name=experiment_name), - output_dir=workspace + recorder=R.get_recorder(experiment_name=experiment_name), output_dir=workspace ) ) @@ -475,16 +552,25 @@ class ActionTask(Task): pass - class ConfigSearchTask(ActionTask): - def __init__(self): + """Find a template path that user can start with.""" + + def __init__(self, conf_path: Optional[Union[Path, str]] = None): super().__init__() - - def crawl_the_folder(self, folder_path : Path): - yaml_files = [] + if conf_path is None: + # If no path provided, find path from the templates. + import qlib + + conf_path = Path(os.path.abspath(inspect.getfile(qlib))).parent.parent / "examples" / "benchmarks" + if isinstance(conf_path, str): + conf_path = Path(conf_path) + self.conf_path = conf_path + + def crawl_the_folder(self, folder_path: Path): + yaml_files = [] for root, _, files in os.walk(folder_path.as_posix()): for file in files: - if file.endswith(".yaml") or file.endswith(".yml"): + if file.endswith(".yaml") or file.endswith(".yml"): yaml_file_path = Path(os.path.join(root, file)).relative_to(folder_path) yaml_files.append(yaml_file_path.as_posix()) return yaml_files @@ -504,24 +590,19 @@ class ConfigSearchTask(ActionTask): model_class = f"{{{self._context_manager.get_context(f'Model_experiment_{experiment_id}_classes')[0][0]}}}-{{{self._context_manager.get_context(f'Model_experiment_{experiment_id}_classes')[0][1]}}}" experiments.append((experiment_id, dataset_class, datahandler_class, model_class)) - - import qlib - benchmarks_root_path = Path(os.path.abspath(inspect.getfile(qlib))).parent.parent / "examples" / "benchmarks" - yaml_config_list = self.crawl_the_folder(benchmarks_root_path) + + # TODO: each config should contains some descriptions to provide information to make the choice. + yaml_config_list = self.crawl_the_folder(self.conf_path) system_prompt = self.system.render(yaml_config_list=yaml_config_list) user_prompt = self.user.render(experiments=experiments) - response = APIBackend().build_messages_and_create_chat_completion( - user_prompt, system_prompt - ) + response = APIBackend().build_messages_and_create_chat_completion(user_prompt, system_prompt) former_messages = [] response = APIBackend().build_messages_and_create_chat_completion( user_prompt, self.system.render(), former_messages=former_messages ) - self.save_chat_history_to_context_manager( - user_prompt, response, self.system.render() - ) + self.save_chat_history_to_context_manager(user_prompt, response, self.system.render()) experiment_count = self._context_manager.get_context("experiment_count") @@ -531,14 +612,23 @@ class ConfigSearchTask(ActionTask): config_search_pattern = re.compile(config_search_pattern, re.S) config_search_result = config_search_pattern.search(response) - - return_task = [CMDTask(f"make a directory in the {self._context_manager.get_context('workspace')}"), ] + + return_task = [ + CMDTask(f"make a directory in the {self._context_manager.get_context('workspace')}"), + ] for experiment_id in range(1, experiment_count + 1): - self._context_manager.set_context(f"experiment_{experiment_id}_template_config", config_search_result.group(experiment_id).strip('\n')) - config_location = benchmarks_root_path / config_search_result.group(experiment_id) - return_task.append(CMDTask(f"copy file in {config_location} to {self._context_manager.get_context('workspace')} and rename to experiment_{experiment_id}.yaml")) + self._context_manager.set_context( + f"experiment_{experiment_id}_template_config", config_search_result.group(experiment_id).strip("\n") + ) + config_location = self.conf_path / config_search_result.group(experiment_id) + return_task.append( + CMDTask( + f"copy file in {config_location} to {self._context_manager.get_context('workspace')} and rename to experiment_{experiment_id}.yaml" + ) + ) return return_task + class CMDTask(ActionTask): """ This CMD task is responsible for ensuring compatibility across different operating systems. @@ -551,12 +641,8 @@ class CMDTask(ActionTask): super().__init__() def execute(self): - prompt = self.user.render( - cmd_intention=self.cmd_intention, user_os=platform.system() - ) - response = APIBackend().build_messages_and_create_chat_completion( - prompt, self.system.render() - ) + prompt = self.user.render(cmd_intention=self.cmd_intention, user_os=platform.system()) + response = APIBackend().build_messages_and_create_chat_completion(prompt, self.system.render()) self._output = subprocess.check_output(response, shell=True, cwd=self.cwd) return [] @@ -564,16 +650,14 @@ class CMDTask(ActionTask): if self._output is not None: # TODO: it will be overrides by later commands # utf8 can't decode normally on Windows - self._context_manager.set_context( - self.__class__.__name__, self._output.decode("ANSI") - ) + self._context_manager.set_context(self.__class__.__name__, self._output.decode("ANSI")) class HyperparameterFinetuneActionTask(ActionTask): def __init__(self, component=None) -> None: super().__init__() self.component = component - + def execute(self): target = self._context_manager.get_context("target") deliverable = self._context_manager.get_context("deliverable") @@ -590,7 +674,7 @@ class HyperparameterFinetuneActionTask(ActionTask): config_location = self._context_manager.get_context(f"workspace") / f"experiment_{experiment_index}.yaml" config_file_content = open(config_location, "r").read() template_configs.append((experiment_index, config_file_content)) - + system_prompt = self.system.render() user_prompt = self.user.render( target=target, @@ -600,11 +684,9 @@ class HyperparameterFinetuneActionTask(ActionTask): thinking_detail=thinking_detail, user_intention=user_intention, experiments=experiments, - template_configs=template_configs - ) - response = APIBackend().build_messages_and_create_chat_completion( - user_prompt, system_prompt + template_configs=template_configs, ) + response = APIBackend().build_messages_and_create_chat_completion(user_prompt, system_prompt) config_search_pattern = "" for experiment_id in range(1, experiment_count + 1): @@ -614,13 +696,15 @@ class HyperparameterFinetuneActionTask(ActionTask): config_search_result = re.search(config_search_pattern, response) return_tasks = [] for experiment_id in range(1, experiment_count + 1): - rolling_res = config_search_result.group((experiment_id-1) * 4 + 2).strip('\n') - ddgda_res = config_search_result.group((experiment_id-1) * 4 + 3).strip('\n') - reason_res = config_search_result.group((experiment_id-1) * 4 + 4).strip('\n') + rolling_res = config_search_result.group((experiment_id - 1) * 4 + 2).strip("\n") + ddgda_res = config_search_result.group((experiment_id - 1) * 4 + 3).strip("\n") + reason_res = config_search_result.group((experiment_id - 1) * 4 + 4).strip("\n") if "true" in ddgda_res.lower(): return_tasks.append(TrainTask(experiment_id, rolling=True, ddgda=True)) + self._context_manager.struct_context.exp_list[experiment_id - 1].rolling = "ddgda" if "true" in rolling_res.lower(): return_tasks.append(TrainTask(experiment_id, rolling=True)) + self._context_manager.struct_context.exp_list[experiment_id - 1].rolling = "base" else: return_tasks.append(TrainTask(experiment_id)) self._context_manager.set_context(f"experiment_{experiment_id}_rolling", rolling_res) @@ -652,24 +736,63 @@ class HyperparameterActionTask(ActionTask): assert target_component_plan is not None, "target component plan is not set by plan maker" assert target_component_classes is not None, "target component classes is not set by plan maker" - system_prompt = self.system.render(target_module=self.target_component, choice=target_component_decision, classes=target_component_classes) - + system_prompt = self.system.render( + target_module=self.target_component, choice=target_component_decision, classes=target_component_classes + ) + target_component_classes_and_hyperparameters = [] for module_path, class_name in target_component_classes: exec(f"from {module_path} import {class_name}") - hyperparameters = [hyperparameter for hyperparameter in {name: param for name, param in inspect.signature(eval(class_name).__init__).parameters.items() if name != "self" and name != "kwargs"}.keys()] + hyperparameters = [ + hyperparameter + for hyperparameter in { + name: param + for name, param in inspect.signature(eval(class_name).__init__).parameters.items() + if name != "self" and name != "kwargs" + }.keys() + ] if class_name == "LGBModel": - hyperparameters.extend([ "boosting_type", "num_leaves", "max_depth", "learning_rate", "n_estimators", "objective", "class_weight", "min_split_gain", "min_child_weight", "min_child_samples", "subsample", "subsample_freq", "colsample_bytree", "reg_alpha", "reg_lambda", "random_state", "n_jobs", "silent", "importance_type", "early_stopping_round", "metric", "num_class", "is_unbalance", "bagging_seed", "verbosity", ]) + hyperparameters.extend( + [ + "boosting_type", + "num_leaves", + "max_depth", + "learning_rate", + "n_estimators", + "objective", + "class_weight", + "min_split_gain", + "min_child_weight", + "min_child_samples", + "subsample", + "subsample_freq", + "colsample_bytree", + "reg_alpha", + "reg_lambda", + "random_state", + "n_jobs", + "silent", + "importance_type", + "early_stopping_round", + "metric", + "num_class", + "is_unbalance", + "bagging_seed", + "verbosity", + ] + ) elif class_name == "SignalRecord": hyperparameters.remove("model") hyperparameters.remove("dataset") hyperparameters.remove("recorder") target_component_classes_and_hyperparameters.append((module_path, class_name, hyperparameters)) - execute_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_EXECUTE, - content=target_component_plan) - infrastructure_knowledge = KnowledgeBase().query(knowledge_type=KnowledgeBase.KT_INFRASTRUCTURE, - content=target_component_plan) + execute_knowledge = KnowledgeBase().query( + knowledge_type=KnowledgeBase.KT_EXECUTE, content=target_component_plan + ) + infrastructure_knowledge = KnowledgeBase().query( + knowledge_type=KnowledgeBase.KT_INFRASTRUCTURE, content=target_component_plan + ) user_prompt = self.user.render( user_requirement=user_prompt, @@ -677,7 +800,7 @@ class HyperparameterActionTask(ActionTask): target_component=self.target_component, target_component_classes_and_hyperparameters=target_component_classes_and_hyperparameters, execute_knowledge=execute_knowledge, - infrastructure_knowledge=infrastructure_knowledge + infrastructure_knowledge=infrastructure_knowledge, ) former_messages = [] if self.regenerate: @@ -685,16 +808,14 @@ class HyperparameterActionTask(ActionTask): user_prompt = f"your yaml config generated from your hyperparameter is not in the right format.\n The Yaml string generated from the hyperparameters is not in the right format.\nPlease rewrite the hyperparameters and answer with exact required format in system prompt and reply with no more explainations.\nThe error message: {self.error}. Please correct the former answer accordingly.\nHyperparameters, Reason and Improve suggestion should always be included." else: user_prompt = f"your hyperparameter cannot be initialized, may be caused by wrong format of the value or wrong name or some value is not supported in Qlib.\nPlease rewrite the hyperparameters and answer with exact required format in system prompt and reply with no more explainations.\nThe error message: {self.error}. Please correct the former answer accordingly.\nHyperparameters, Reason and Improve suggestion should always be included." - former_messages = self._context_manager.get_context("chat_history")[self.__class__.__name__][self.target_component][1:] + former_messages = self._context_manager.get_context("chat_history")[self.__class__.__name__][ + self.target_component + ][1:] response = APIBackend().build_messages_and_create_chat_completion( user_prompt, system_prompt, former_messages=former_messages ) - self.save_chat_history_to_context_manager( - user_prompt, response, system_prompt, self.target_component - ) - res = re.search( - r"(?i)Hyperparameters:(.*)Reason:(.*)Improve suggestion:(.*)", response, re.S - ) + self.save_chat_history_to_context_manager(user_prompt, response, system_prompt, self.target_component) + res = re.search(r"(?i)Hyperparameters:(.*)Reason:(.*)Improve suggestion:(.*)", response, re.S) try: assert ( res is not None and len(res.groups()) == 3 @@ -713,9 +834,7 @@ class HyperparameterActionTask(ActionTask): self._context_manager.set_context(f"{self.target_component}_hyperparameters", hyperparameters) self._context_manager.set_context(f"{self.target_component}_reason", reason) - self._context_manager.set_context( - f"{self.target_component}_improve_suggestion", improve_suggestion - ) + self._context_manager.set_context(f"{self.target_component}_improve_suggestion", improve_suggestion) return [] @@ -723,7 +842,7 @@ class ConfigActionTask(ActionTask): def __init__(self, component) -> None: super().__init__() self.target_component = component - + def execute(self): user_prompt = self._context_manager.get_context("user_prompt") @@ -732,12 +851,14 @@ class ConfigActionTask(ActionTask): target_component_classes = self._context_manager.get_context(f"{self.target_component}_classes") target_component_hyperparameters = self._context_manager.get_context(f"{self.target_component}_hyperparameters") - system_prompt = self.system.render(target_module=self.target_component, choice=target_component_decision, classes=target_component_classes) + system_prompt = self.system.render( + target_module=self.target_component, choice=target_component_decision, classes=target_component_classes + ) user_prompt = self.user.render( user_requirement=user_prompt, target_component_plan=target_component_plan, target_component=self.target_component, - target_component_hyperparameters=target_component_hyperparameters + target_component_hyperparameters=target_component_hyperparameters, ) former_messages = [] # if self.reconfig and user_prompt == self._context_manager.get_context("chat_history")[self.__class__.__name__][self.target_component][-2]["content"]: @@ -746,36 +867,37 @@ class ConfigActionTask(ActionTask): response = APIBackend().build_messages_and_create_chat_completion( user_prompt, system_prompt, former_messages=former_messages ) - self.save_chat_history_to_context_manager( - user_prompt, response, system_prompt, self.target_component - ) + self.save_chat_history_to_context_manager(user_prompt, response, system_prompt, self.target_component) config = re.search(r"```yaml(.*)```", response, re.S).group(1) try: yaml_config = yaml.safe_load(io.StringIO(config)) except yaml.YAMLError as e: self.logger.info(f"Yaml file is not in the correct format: {e}") - return_tasks = [HyperparameterActionTask(self.target_component, regenerate=True, error=str(e), error_type="yaml"), ConfigActionTask(self.target_component)] + return_tasks = [ + HyperparameterActionTask(self.target_component, regenerate=True, error=str(e), error_type="yaml"), + ConfigActionTask(self.target_component), + ] return return_tasks - + if self.target_component == "Dataset": - if 'handler' in yaml_config["dataset"]: - del yaml_config['dataset']['handler'] + if "handler" in yaml_config["dataset"]: + del yaml_config["dataset"]["handler"] elif self.target_component == "DataHandler": - for processor in yaml_config['handler']['kwargs']['infer_processors']: + for processor in yaml_config["handler"]["kwargs"]["infer_processors"]: if "kwargs" in processor and "fields_group" in processor["kwargs"]: - del processor["kwargs"]['fields_group'] - for processor in yaml_config['handler']['kwargs']['learn_processors']: + del processor["kwargs"]["fields_group"] + for processor in yaml_config["handler"]["kwargs"]["learn_processors"]: if "kwargs" in processor and "fields_group" in processor["kwargs"]: - del processor["kwargs"]['fields_group'] - - if 'freq' in yaml_config['handler']['kwargs']: - yaml_config['handler']['kwargs']['freq'] = "day" # TODO hot fix freq because no data + del processor["kwargs"]["fields_group"] + + if "freq" in yaml_config["handler"]["kwargs"]: + yaml_config["handler"]["kwargs"]["freq"] = "day" # TODO hot fix freq because no data elif self.target_component == "Record": - for record in yaml_config['record']: - if record['class'] == 'SigAnaRecord' and 'label_col' in record['kwargs']: - del record['kwargs']["label_col"] - + for record in yaml_config["record"]: + if record["class"] == "SigAnaRecord" and "label_col" in record["kwargs"]: + del record["kwargs"]["label_col"] + def remove_default(config): if isinstance(config, dict): for key in list(config.keys()): @@ -787,6 +909,7 @@ class ConfigActionTask(ActionTask): elif isinstance(config, list): for item in config: remove_default(item) + remove_default(yaml_config) self._context_manager.set_context(f"{self.target_component}_config", yaml_config) @@ -797,7 +920,9 @@ class ImplementActionTask(ActionTask): def __init__(self, target_component, reimplement=False) -> None: super().__init__() self.target_component = target_component - assert COMPONENT_LIST.index(self.target_component) <= 2, "The target component is not in dataset datahandler and model" + assert ( + COMPONENT_LIST.index(self.target_component) <= 2 + ), "The target component is not in dataset datahandler and model" self.reimplement = reimplement def execute(self): @@ -809,16 +934,10 @@ class ImplementActionTask(ActionTask): user_prompt = self._context_manager.get_context("user_prompt") prompt_element_dict = dict() for component in COMPONENT_LIST: - prompt_element_dict[ - f"{component}_decision" - ] = self._context_manager.get_context(f"{component}_decision") - prompt_element_dict[ - f"{component}_plan" - ] = self._context_manager.get_context(f"{component}_plan") + prompt_element_dict[f"{component}_decision"] = self._context_manager.get_context(f"{component}_decision") + prompt_element_dict[f"{component}_plan"] = self._context_manager.get_context(f"{component}_plan") - assert ( - None not in prompt_element_dict.values() - ), "Some decision or plan is not set by plan maker" + assert None not in prompt_element_dict.values(), "Some decision or plan is not set by plan maker" config = self._context_manager.get_context(f"{self.target_component}_config") implement_prompt = self.user.render( @@ -830,7 +949,9 @@ class ImplementActionTask(ActionTask): former_messages = [] if self.reimplement: implement_prompt = "your code seems wrong, please re-implement it and answer with exact required format and reply with no more explainations.\n" - former_messages = self._context_manager.get_context("chat_history")[self.__class__.__name__][self.target_component][1:] + former_messages = self._context_manager.get_context("chat_history")[self.__class__.__name__][ + self.target_component + ][1:] response = APIBackend().build_messages_and_create_chat_completion( implement_prompt, self.system.render(), former_messages=former_messages ) @@ -838,17 +959,13 @@ class ImplementActionTask(ActionTask): implement_prompt, response, self.system.render(), self.target_component ) - res = re.search( - r"Code:(.*)Explanation:(.*)Modified config:(.*)", response, re.S - ) + res = re.search(r"Code:(.*)Explanation:(.*)Modified config:(.*)", response, re.S) assert ( res is not None and len(res.groups()) == 3 ), f"The response of implement action task of component {self.target_component} is not in the correct format" code = re.search(r"```python(.*)```", res.group(1), re.S) - assert ( - code is not None - ), "The code part of implementation action task response is not in the correct format" + assert code is not None, "The code part of implementation action task response is not in the correct format" code = code.group(1) explanation = res.group(2) modified_config = re.search(r"```yaml(.*)```", res.group(3), re.S) @@ -858,12 +975,8 @@ class ImplementActionTask(ActionTask): modified_config = modified_config.group(1) self._context_manager.set_context(f"{self.target_component}_code", code) - self._context_manager.set_context( - f"{self.target_component}_code_explanation", explanation - ) - self._context_manager.set_context( - f"{self.target_component}_modified_config", modified_config - ) + self._context_manager.set_context(f"{self.target_component}_code_explanation", explanation) + self._context_manager.set_context(f"{self.target_component}_modified_config", modified_config) return [] @@ -893,31 +1006,32 @@ class YamlEditTask(ActionTask): "Record": "record", "Backtest": "backtest", }[self.target_component] - + def replace_key_value_recursive(self, target_dict, target_key, new_value): res = False - if isinstance(target_dict, dict): - for key, value in target_dict.items(): - if key == target_key: + if isinstance(target_dict, dict): + for key, value in target_dict.items(): + if key == target_key: target_dict[key] = new_value res = True - else: - res = res | self.replace_key_value_recursive(value, target_key, new_value) - elif isinstance(target_dict, list): - for item in target_dict: - res = res | self.replace_key_value_recursive(item, target_key, new_value) + else: + res = res | self.replace_key_value_recursive(value, target_key, new_value) + elif isinstance(target_dict, list): + for item in target_dict: + res = res | self.replace_key_value_recursive(item, target_key, new_value) return res - def execute(self): # 1) read original and new content - self.original_config_location = Path(os.path.join(self._context_manager.get_context('workspace'), "workflow_config.yaml")) + self.original_config_location = Path( + os.path.join(self._context_manager.get_context("workspace"), "workflow_config.yaml") + ) with self.original_config_location.open("r") as f: target_config = yaml.safe_load(f) - update_config = self._context_manager.get_context(f'{self.target_component}_modified_config') + update_config = self._context_manager.get_context(f"{self.target_component}_modified_config") if update_config is None: - update_config = self._context_manager.get_context(f'{self.target_component}_config') - + update_config = self._context_manager.get_context(f"{self.target_component}_config") + # 2) modify the module_path if code is implemented by finco # TODO because we skip code writing part, so we mute this step to avoid error # if self._context_manager.get_context(f'{self.target_component}_decision') == "Personized": @@ -930,47 +1044,56 @@ class YamlEditTask(ActionTask): if self.target_component == "Dataset": return [] elif self.target_component == "DataHandler": - dataset_update_config = self._context_manager.get_context(f'Dataset_modified_config') + dataset_update_config = self._context_manager.get_context(f"Dataset_modified_config") if dataset_update_config is None: - dataset_update_config = self._context_manager.get_context(f'Dataset_config') - dataset_update_config['dataset']['kwargs']['handler'] = update_config['handler'] + dataset_update_config = self._context_manager.get_context(f"Dataset_config") + dataset_update_config["dataset"]["kwargs"]["handler"] = update_config["handler"] update_config = dataset_update_config real_target_config_key = "dataset" else: real_target_config_key = self.target_config_key # 3) replace the module - assert isinstance(update_config, dict) and real_target_config_key in update_config, "The config file is not in the correct format" - assert self.replace_key_value_recursive(target_config, real_target_config_key, update_config[real_target_config_key]), "Replace of the yaml file failed." + assert ( + isinstance(update_config, dict) and real_target_config_key in update_config + ), "The config file is not in the correct format" + assert self.replace_key_value_recursive( + target_config, real_target_config_key, update_config[real_target_config_key] + ), "Replace of the yaml file failed." # TODO hotfix for the bug that the record signalrecord config is not updated - for record in target_config['task']['record']: - if record['class'] == 'SignalRecord': - if 'kwargs' in record and 'model' in record['kwargs']: - del record['kwargs']["model"] - if 'kwargs' in record and 'dataset' in record['kwargs']: - del record['kwargs']["dataset"] - + for record in target_config["task"]["record"]: + if record["class"] == "SignalRecord": + if "kwargs" in record and "model" in record["kwargs"]: + del record["kwargs"]["model"] + if "kwargs" in record and "dataset" in record["kwargs"]: + del record["kwargs"]["dataset"] + # 4) save the config file with self.original_config_location.open("w") as f: yaml.dump(target_config, f) return [] + class CodeDumpTask(ActionTask): def __init__(self, target_component) -> None: super().__init__() self.target_component = target_component - + def execute(self): - code = self._context_manager.get_context(f'{self.target_component}_code') + code = self._context_manager.get_context(f"{self.target_component}_code") assert code is not None, "The code is not set" - - with open(os.path.join(self._context_manager.get_context('workspace'), f'{self.target_component}_code.py'), 'w') as f: + + with open( + os.path.join(self._context_manager.get_context("workspace"), f"{self.target_component}_code.py"), "w" + ) as f: f.write(code) - + try: - exec(f"from qlib.finco.{os.path.basename(self._context_manager.get_context('workspace'))}.{self.target_component}_code import *") + exec( + f"from qlib.finco.{os.path.basename(self._context_manager.get_context('workspace'))}.{self.target_component}_code import *" + ) except (ImportError, AttributeError, SyntaxError): return [ImplementActionTask(self.target_component, reimplement=True), CodeDumpTask(self.target_component)] return [] @@ -1054,8 +1177,9 @@ class SummarizeTask(Task): user_prompt=prompt_workflow_selection, system_prompt=self.system.render() ) - KnowledgeBase().practice_knowledge.add([{"user_intention": user_prompt, - "experiment_metrics": metrics_response}]) + KnowledgeBase().practice_knowledge.add( + [{"user_intention": user_prompt, "experiment_metrics": metrics_response}] + ) # notes: summarize after all experiment added to KnowledgeBase topic = Topic(name="rollingModel", describe=Template("What conclusion can you draw")) @@ -1092,8 +1216,11 @@ class SummarizeTask(Task): # in case of too large file # TODO: Perhaps summarization method instead of truncation would be a better approach result.append( - {"file": file.name, "content": content[: self.__MAX_LENGTH_OF_FILE], - "additional": self._context_manager.retrieve(file.name)} + { + "file": file.name, + "content": content[: self.__MAX_LENGTH_OF_FILE], + "additional": self._context_manager.retrieve(file.name), + } ) return result @@ -1134,8 +1261,9 @@ class SummarizeTask(Task): postfix = filename.split(".")[-1] if postfix in ["jpeg"]: description = self._context_manager.retrieve(filename) - file_list.append({"file_name": filename, "description": description, - "path": str(Path(path).joinpath(filename))}) + file_list.append( + {"file_name": filename, "description": description, "path": str(Path(path).joinpath(filename))} + ) return file_list def save_markdown(self, content: str, path): diff --git a/qlib/finco/tpl/sl/workflow_config.yaml b/qlib/finco/tpl/sl/workflow_config.yaml index 71aead83f..362725dc1 100644 --- a/qlib/finco/tpl/sl/workflow_config.yaml +++ b/qlib/finco/tpl/sl/workflow_config.yaml @@ -10,13 +10,25 @@ data_handler_config: &data_handler_config fit_start_time: 2008-01-01 fit_end_time: 2014-12-31 instruments: *market + infer_processors: + - class: RobustZScoreNorm + kwargs: + fields_group: feature + clip_outlier: true + - class: Fillna + kwargs: + fields_group: feature + learn_processors: + - class: DropnaLabel + - class: CSRankNorm + kwargs: + fields_group: label port_analysis_config: &port_analysis_config strategy: class: TopkDropoutStrategy module_path: qlib.contrib.strategy kwargs: - model: - dataset: + signal: topk: 50 n_drop: 5 backtest: @@ -32,18 +44,11 @@ port_analysis_config: &port_analysis_config min_cost: 5 task: model: - class: LGBModel - module_path: qlib.contrib.model.gbdt + class: LinearModel + module_path: qlib.contrib.model.linear kwargs: - loss: mse - colsample_bytree: 0.8879 - learning_rate: 0.2 - subsample: 0.8789 - lambda_l1: 205.6999 - lambda_l2: 580.9768 - max_depth: 8 - num_leaves: 210 - num_threads: 20 + estimator: ridge + alpha: 0.05 dataset: class: DatasetH module_path: qlib.data.dataset @@ -65,7 +70,7 @@ task: - class: SigAnaRecord module_path: qlib.workflow.record_temp kwargs: - ana_long_short: False + ana_long_short: True ann_scaler: 252 - class: PortAnaRecord module_path: qlib.workflow.record_temp diff --git a/qlib/finco/utils.py b/qlib/finco/utils.py index 379ef6375..7cf314831 100644 --- a/qlib/finco/utils.py +++ b/qlib/finco/utils.py @@ -20,6 +20,7 @@ class SingletonBaseClass(metaclass=SingletonMeta): This class becomes necessary """ + # TODO: Add move this class to Qlib's general utils. @@ -42,4 +43,4 @@ def similarity(text1, text2): def random_string(length=10): letters = string.ascii_letters + string.digits - return ''.join(random.choice(letters) for i in range(length)) + return "".join(random.choice(letters) for i in range(length)) diff --git a/qlib/finco/workflow.py b/qlib/finco/workflow.py index cf8156727..15d0ac34a 100644 --- a/qlib/finco/workflow.py +++ b/qlib/finco/workflow.py @@ -1,67 +1,40 @@ import sys -import copy import shutil -from pathlib import Path from typing import List - -from qlib.finco.task import HighLevelPlanTask, SummarizeTask, TrainTask +from pathlib import Path +from qlib.finco.task import HighLevelPlanTask, SummarizeTask from qlib.finco.prompt_template import PromptTemplate, Template from qlib.finco.log import FinCoLog, LogColors -from qlib.finco.utils import similarity from qlib.finco.llm import APIBackend from qlib.finco.conf import Config from qlib.finco.knowledge import KnowledgeBase, Topic +from qlib.finco.context import WorkflowContextManager -class WorkflowContextManager: - """Context Manager stores the context of the workflow""" - - """All context are key value pairs which saves the input, output and status of the whole workflow""" - - def __init__(self) -> None: - self.context = {} - self.logger = FinCoLog() - - def set_context(self, key, value): - if key in self.context: - self.logger.warning("The key already exists in the context, the value will be overwritten") - self.context[key] = value - - def get_context(self, key): - # NOTE: if the key doesn't exist, return None. In the future, we may raise an error to detect abnormal behavior - if key not in self.context: - self.logger.warning("The key doesn't exist in the context") - return None - return self.context[key] - - def update_context(self, key, new_value): - # NOTE: if the key doesn't exist, return None. In the future, we may raise an error to detect abnormal behavior - if key not in self.context: - self.logger.warning("The key doesn't exist in the context") - self.context.update({key: new_value}) - - def get_all_context(self): - """return a deep copy of the context""" - """TODO: do we need to return a deep copy?""" - return copy.deepcopy(self.context) - - def retrieve(self, query: str) -> dict: - if query in self.context.keys(): - return {query: self.context.get(query)} - - # Note: retrieve information from context by string similarity maybe abandon in future - scores = {} - for k, v in self.context.items(): - scores.update({k: max(similarity(query, k), similarity(query, v))}) - max_score_key = max(scores, key=scores.get) - return {max_score_key: self.context.get(max_score_key)} - - def clear(self, reserve: list = None): - if reserve is None: - reserve = [] - - _context = {k: self.get_context(k) for k in reserve} - self.context = _context +# TODO: it is not necessary in current phase +# class TaskDAG: +# """ +# This is a Task manager. it maintains a graph and a stack stucture to manager the task +# The reason why the DGA relationship is maintained outside instead of inside the task is that +# - To make the creating of task simpler(user don't have to care about the relation-ship) +# - To manage the relation ship when poping and executing the tasks is relatively easier instead of scattering them everywhere +# """ +# def __init__(self) -> None: +# self._finished = [] +# self._stack = [] +# self._dag = defaultdict(list) # from id(object) -> list of id(object) +# +# def pop(self): +# return self._stack.pop(0) +# +# def push(self, task: Union[Task, List[Task]], parent: Optional[Task] = None): +# if isinstance(task, Task): +# task = [task] +# if parent is not None: +# self._dag +# +# def done(self) -> bool: +# return len(self._stack) == 0 class WorkflowManager: @@ -78,8 +51,7 @@ class WorkflowManager: self._confirm_and_rm() self.prompt_template = PromptTemplate() - self.context = WorkflowContextManager() - self.context.set_context("workspace", self._workspace) + self.context = WorkflowContextManager(workspace=self._workspace) self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China A csi300. Please help to use lightgbm model." def _confirm_and_rm(self): @@ -92,7 +64,8 @@ class WorkflowManager: f"If you do not need to delete {self._workspace}," f" please change the workspace dir or rename existing files\n" f"Are you sure you want to delete, yes(Y/y), no (N/n):", - color=LogColors.WHITE) + color=LogColors.WHITE, + ) ) if str(flag) not in ["Y", "y"]: sys.exit() @@ -150,10 +123,12 @@ class WorkflowManager: # TODO: sort the task list based on the priority of the task # task_list = sorted(task_list, key=lambda x: x.task_type) t = task_list.pop(0) - self.logger.info(f"Task finished: {[str(task) for task in task_finished]}", - f"Task in queue: {task_list_info}", - f"Executing task: {str(t)}", - title="Task") + self.logger.info( + f"Task finished: {[str(task) for task in task_finished]}", + f"Task in queue: {task_list_info}", + f"Executing task: {str(t)}", + title="Task", + ) t.assign_context_manager(self.context) res = t.execute() @@ -174,9 +149,10 @@ class LearnManager: self.epoch = 0 self.wm = WorkflowManager() - self.topics = [Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in - self.__DEFAULT_TOPICS] - self.knowledge_base = KnowledgeBase(workdir=Path.cwd().joinpath('knowledge')) + self.topics = [ + Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in self.__DEFAULT_TOPICS + ] + self.knowledge_base = KnowledgeBase(workdir=Path.cwd().joinpath("knowledge")) self.knowledge_base.execute_knowledge.add([]) self.knowledge_base.query(knowledge_type="infrastructure", content="resolve_path") @@ -209,14 +185,17 @@ class LearnManager: for task in task_finished: prompt_workflow_selection = self.wm.prompt_template.get(f"{self.__class__.__name__}_user").render( - summary=summary, brief=knowledge_of_topics, + summary=summary, + brief=knowledge_of_topics, task_finished=[str(t) for t in task_finished], - task=task.__class__.__name__, system=task.system.render(), user_prompt=user_prompt + task=task.__class__.__name__, + system=task.system.render(), + user_prompt=user_prompt, ) response = APIBackend().build_messages_and_create_chat_completion( user_prompt=prompt_workflow_selection, - system_prompt=self.wm.prompt_template.get(f"{self.__class__.__name__}_system").render() + system_prompt=self.wm.prompt_template.get(f"{self.__class__.__name__}_system").render(), ) # todo: response assertion diff --git a/scripts/finco/cmd.sh b/scripts/finco/cmd.sh index 06175863d..ef272f620 100644 --- a/scripts/finco/cmd.sh +++ b/scripts/finco/cmd.sh @@ -12,4 +12,4 @@ if [ -e $DIR/cridential.sh ]; then fi # run the command -python -m qlib.finco.cli "please help me build a low turnover strategy that focus more on longterm return" +python -m qlib.finco.cli "build an A-share stock market daily portfolio in quantitative investment and minimize the maximum drawdown." diff --git a/setup.py b/setup.py index 86d11dd61..37d0204f0 100644 --- a/setup.py +++ b/setup.py @@ -176,11 +176,12 @@ setup( ], "finco": [ # finco is not necessary for all Qlib users; So a single require section is used for it. - "openapi", + "openai", "pydantic", # Please add it to basic requirements after the design of pydantic is state. + "pydantic-settings", "python-dotenv", # I don't think this is necessary if we use pydantic. "fuzzywuzzy", - "python-Levenshtein" # not necessary but accelerate fuzzywuzzy calculation + "python-Levenshtein", # not necessary but accelerate fuzzywuzzy calculation ], }, include_package_data=True,