From 6cb87ecfd195996bdc124dc8c372f2c34bd23808 Mon Sep 17 00:00:00 2001 From: Xu Yang Date: Mon, 3 Jul 2023 17:56:22 +0800 Subject: [PATCH 1/2] refine code to use qrun --- qlib/finco/data_cache_demo.py | 53 ++++++++++++++++++++++++++ qlib/finco/llm.py | 2 +- qlib/finco/prompt_template.yaml | 12 ++++-- qlib/finco/task.py | 67 ++++++++++++++++++++++++--------- 4 files changed, 113 insertions(+), 21 deletions(-) create mode 100644 qlib/finco/data_cache_demo.py diff --git a/qlib/finco/data_cache_demo.py b/qlib/finco/data_cache_demo.py new file mode 100644 index 000000000..eb000335d --- /dev/null +++ b/qlib/finco/data_cache_demo.py @@ -0,0 +1,53 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +""" + The motivation of this demo + - To show the data modules of Qlib is Serializable, users can dump processed data to disk to avoid duplicated data preprocessing +""" + +from copy import deepcopy +from pathlib import Path +import pickle +from pprint import pprint +import subprocess +import yaml +from qlib.log import TimeInspector + +from qlib import init +from qlib.data.dataset.handler import DataHandlerLP +from qlib.utils import init_instance_by_config + +# For general purpose, we use relative path +DIRNAME = Path(__file__).absolute().resolve().parent / "finco_workspace" + +if __name__ == "__main__": + init() + + config_path = Path("/home/xuyang/workspace/qlib/qlib/finco/finco_workspace/workflow_config.yaml") + + # 1) show original time + # with TimeInspector.logt("The original time without handler cache:"): + # subprocess.run(f"qrun {config_path}", shell=True) + + # 2) dump handler + task_config = yaml.safe_load(config_path.open()) + # hd_conf = task_config["task"]["dataset"]["kwargs"]["handler"] + # pprint(hd_conf) + # hd: DataHandlerLP = init_instance_by_config(hd_conf) + hd_path = DIRNAME / "handler.pkl" + # hd.to_pickle(hd_path, dump_all=True) + + # 3) create new task with handler cache + new_task_config = deepcopy(task_config) + new_task_config["task"]["dataset"]["kwargs"]["handler"] = f"file://{hd_path}" + new_task_config["sys"] = {"path": [str(config_path.parent.resolve())]} + new_task_path = DIRNAME / "workflow_config.yaml" + print("The location of the new task", new_task_path) + + # save new task + with new_task_path.open("w") as f: + yaml.safe_dump(new_task_config, f, indent=4, sort_keys=False) + + # 4) train model with new task + # with TimeInspector.logt("The time for task with handler cache:"): + # subprocess.run(f"qrun {new_task_path}", shell=True) diff --git a/qlib/finco/llm.py b/qlib/finco/llm.py index 379c010a6..08df6e061 100644 --- a/qlib/finco/llm.py +++ b/qlib/finco/llm.py @@ -62,7 +62,7 @@ class APIBackend(Singleton): try: response = self.create_chat_completion(**kwargs) return response - except (openai.error.RateLimitError, openai.error.Timeout) as e: + except (openai.error.RateLimitError, openai.error.Timeout, openai.error.APIError) as e: print(e) print(f"Retrying {i+1}th time...") time.sleep(1) diff --git a/qlib/finco/prompt_template.yaml b/qlib/finco/prompt_template.yaml index 72471f019..0e17304bb 100644 --- a/qlib/finco/prompt_template.yaml +++ b/qlib/finco/prompt_template.yaml @@ -126,7 +126,9 @@ HyperparameterActionTask_system : |- The user has provided the requirements, chose the predefined classes and made plan and reason to each component. You should strictly follow user's choice and you should provide the reason of your hyperparameter choices if exist and some suggestion if the user wants to finetune the hyperparameters after the hyperparameter. You only need to response the hyperparameters in the exact format in exsample below with no explanation or conversation. "Hyperparameters:", "Reason:", "Improve suggestion:" are key tags so always include them in response. - {% if target_module == "DataHandler" %} + {% if target_module == "Dataset" %} + Caution, if the user chose {qlib.data.dataset}-{DatasetH}, always remember to set hyperparameter: {segments}! + {% elif target_module == "DataHandler" %} Qlib has these processors {processor_name}-{hyperparameter kwargs}: {DropnaProcessor}-{['fields_group']},{DropnaLabel}-{['fields_group']},{CSRankNorm}-{['fields_group']},{ProcessInf}-{[]},{Processor}-{[]},{MinMaxNorm}-{['fit_start_time', 'fit_end_time', 'fields_group']},{CSZFillna}-{['fields_group']},{TanhProcess}-{[]},{CSZScoreNorm}-{['fields_group', 'method']},{RobustZScoreNorm}-{['fit_start_time', 'fit_end_time', 'fields_group', 'clip_outlier']},{FilterCol}-{['fields_group', 'col_list']},{HashStockFormat}-{[]},{ZScoreNorm}-{['fit_start_time', 'fit_end_time', 'fields_group']},{DropCol}-{['col_list']},{Fillna}-{['fields_group', 'fill_value']}. You can choose some of them to use in {infer_processors} or {learn_processors} if necessary and pick the kwargs of them. @@ -342,6 +344,9 @@ ConfigActionTask_system: |- {%for module_path, class_name in classes%}{% raw %}{{% endraw %}{{module_path}}{% raw %}}{% endraw %}-{% raw %}{{% endraw %}{{class_name}}{% raw %}}{% endraw %}.{% endfor %} and you have decided all the hyperparameters. {% endif %} + The predefined classes and user's hint are hard requirments, you should copy them to your answer with no modification to avoid errors! + "```yaml(.*)" and "```" are key tags in response, always include them in your response! + Default in user's hyperparameter means using default value in Qlib code. So always remember to avoid puting them in the config and delete this key in yaml string!!! You only output the target component part of the config, Don't output all the config file!!! @@ -352,6 +357,7 @@ ConfigActionTask_system: |- 4. each predefined class's hyperparameter to initialize the class You will response the YAML config with no explanation and interaction. + Most importantly, always make sure the yaml string you response can be converted to yaml object without any format issue! Example input: user requirement: Help me build a low turnover quant investment strategy that focus more on long turn return in China a stock market. I want to use a big LSTM model and add several MLP layer before the head. @@ -441,7 +447,7 @@ ConfigActionTask_system: |- target component: Backtest {% endif %} Example output: - """yaml{% if target_module == "Dataset" %} + ```yaml{% if target_module == "Dataset" %} dataset: class: DatasetH module_path: qlib.data.dataset @@ -511,7 +517,7 @@ ConfigActionTask_system: |- open_cost: 0.0005 close_cost: 0.0015 min_cost: 5 - {% endif %}""" + {% endif %}``` ConfigActionTask_user: |- user requirement: {{user_requirement}} diff --git a/qlib/finco/task.py b/qlib/finco/task.py index 363f19115..5e61c81d0 100644 --- a/qlib/finco/task.py +++ b/qlib/finco/task.py @@ -262,8 +262,27 @@ class TrainTask(Task): if confirm is False: return [] - command = f"qrun {workflow_path}" - self._output = subprocess.check_output(command, shell=True, cwd=workspace) + command = ["qrun", str(workflow_path)] + try: + # Run the command and capture the output + workspace = self._context_manager.get_context("workspace") + result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, cwd=str(workspace)) + + except subprocess.CalledProcessError as e: + print(f"An error occurred while running the subprocess: {e.stderr} {e.stdout}") + real_error = e.stderr+e.stdout + if "model" in e.stdout.lower(): + return [HyperparameterActionTask("Model", regenerate=True, error=real_error), ConfigActionTask("Model"), YamlEditTask("Model"), TrainTask()] + elif "dataset" in e.stdout.lower() or "handler" in e.stdout.lower(): + return [HyperparameterActionTask("Dataset", regenerate=True, error=real_error), HyperparameterActionTask("DataHandler", regenerate=True, error=real_error), ConfigActionTask("Dataset"), ConfigActionTask("DataHandler"), YamlEditTask("Dataset"), YamlEditTask("DataHandler"), TrainTask()] + else: + ret_list = [] + for component in COMPONENT_LIST: + ret_list.append(HyperparameterActionTask(component, regenerate=True, error=real_error)) + ret_list.append(ConfigActionTask(component)) + ret_list.append(YamlEditTask(component)) + ret_list.append(TrainTask()) + return ret_list return [AnalysisTask()] @@ -403,11 +422,12 @@ class DifferentiatedComponentActionTask(ActionTask): class HyperparameterActionTask(ActionTask): - def __init__(self, component, regenerate=False, error=None) -> None: + def __init__(self, component, regenerate=False, error=None, error_type=None) -> None: super().__init__() self.target_component = component self.regenerate = regenerate self.error = error + self.error_type = error_type def execute(self): user_prompt = self._context_manager.get_context("user_prompt") @@ -444,7 +464,10 @@ class HyperparameterActionTask(ActionTask): ) former_messages = [] if self.regenerate: - user_prompt = f"your hyperparameter cannot be initialized, may be caused by wrong format of the value or wrong name or some value is not supported in Qlib.\nPlease rewrite the hyperparameters and answer with exact required format in system prompt and reply with no more explainations.\nThe error message: {self.error}. Please correct the former answer accordingly.\nHyperparameters, Reason and Improve suggestion should always be included." + if self.error_type == "yaml": + user_prompt = f"your yaml config generated from your hyperparameter is not in the right format.\n The Yaml string generated from the hyperparameters is not in the right format.\nPlease rewrite the hyperparameters and answer with exact required format in system prompt and reply with no more explainations.\nThe error message: {self.error}. Please correct the former answer accordingly.\nHyperparameters, Reason and Improve suggestion should always be included." + else: + user_prompt = f"your hyperparameter cannot be initialized, may be caused by wrong format of the value or wrong name or some value is not supported in Qlib.\nPlease rewrite the hyperparameters and answer with exact required format in system prompt and reply with no more explainations.\nThe error message: {self.error}. Please correct the former answer accordingly.\nHyperparameters, Reason and Improve suggestion should always be included." former_messages = self._context_manager.get_context("chat_history")[self.__class__.__name__][self.target_component][1:] response = APIBackend().build_messages_and_create_chat_completion( user_prompt, system_prompt, former_messages=former_messages @@ -472,11 +495,9 @@ class HyperparameterActionTask(ActionTask): class ConfigActionTask(ActionTask): - def __init__(self, component, reconfig=False, error=None) -> None: + def __init__(self, component) -> None: super().__init__() self.target_component = component - self.reconfig = reconfig - self.error = error def execute(self): user_prompt = self._context_manager.get_context("user_prompt") @@ -494,9 +515,9 @@ class ConfigActionTask(ActionTask): target_component_hyperparameters=target_component_hyperparameters ) former_messages = [] - if self.reconfig and user_prompt == self._context_manager.get_context("chat_history")[self.__class__.__name__][self.target_component][-2]["content"]: - user_prompt = f"your config cannot be converted to YAML, may be caused by wrong format. Please rewrite the yaml and answer with exact required format in system prompt and reply with no more explainations.\nerror message: {self.error}\n" - former_messages = self._context_manager.get_context("chat_history")[self.__class__.__name__][self.target_component][1:] + # if self.reconfig and user_prompt == self._context_manager.get_context("chat_history")[self.__class__.__name__][self.target_component][-2]["content"]: + # user_prompt = f"your config cannot be converted to YAML, may be caused by wrong format. Please rewrite the yaml and answer with exact required format in system prompt and reply with no more explainations.\nerror message: {self.error}\n" + # former_messages = self._context_manager.get_context("chat_history")[self.__class__.__name__][self.target_component][1:] response = APIBackend().build_messages_and_create_chat_completion( user_prompt, system_prompt, former_messages=former_messages ) @@ -509,10 +530,13 @@ class ConfigActionTask(ActionTask): yaml_config = yaml.safe_load(io.StringIO(config)) except yaml.YAMLError as e: self.logger.info(f"Yaml file is not in the correct format: {e}") - return_tasks = [HyperparameterActionTask(self.target_component, regenerate=True, error=str(e)), ConfigActionTask(self.target_component, reconfig=True, error=str(e))] + return_tasks = [HyperparameterActionTask(self.target_component, regenerate=True, error=str(e), error_type="yaml"), ConfigActionTask(self.target_component)] return return_tasks - if self.target_component == "DataHandler": + if self.target_component == "Dataset": + if 'handler' in yaml_config["dataset"]: + del yaml_config['dataset']['handler'] + elif self.target_component == "DataHandler": for processor in yaml_config['handler']['kwargs']['infer_processors']: if "kwargs" in processor and "fields_group" in processor["kwargs"]: del processor["kwargs"]['fields_group'] @@ -520,8 +544,12 @@ class ConfigActionTask(ActionTask): if "kwargs" in processor and "fields_group" in processor["kwargs"]: del processor["kwargs"]['fields_group'] - if 'freq' in yaml_config['handler']['kwargs'] and yaml_config['handler']['kwargs']['freq'] == '1d': - yaml_config['handler']['kwargs']['freq'] = "day" + if 'freq' in yaml_config['handler']['kwargs']: + yaml_config['handler']['kwargs']['freq'] = "day" # TODO hot fix freq because no data + elif self.target_component == "Record": + for record in yaml_config['record']: + if record['class'] == 'SigAnaRecord' and 'label_col' in record['kwargs']: + del record['kwargs']["label_col"] def remove_default(config): if isinstance(config, dict): @@ -688,12 +716,17 @@ class YamlEditTask(ActionTask): else: real_target_config_key = self.target_config_key - - # 3) replace the module assert isinstance(update_config, dict) and real_target_config_key in update_config, "The config file is not in the correct format" assert self.replace_key_value_recursive(target_config, real_target_config_key, update_config[real_target_config_key]), "Replace of the yaml file failed." - + + # TODO hotfix for the bug that the record signalrecord config is not updated + for record in target_config['task']['record']: + if record['class'] == 'SignalRecord': + if 'model' in record['kwargs']: + del record['kwargs']["model"] + if 'dataset' in record['kwargs']: + del record['kwargs']["dataset"] # 4) save the config file with self.original_config_location.open("w") as f: From ee5e5cfdd890003048324e4f5addef094b56d824 Mon Sep 17 00:00:00 2001 From: Xu Yang Date: Mon, 3 Jul 2023 17:57:13 +0800 Subject: [PATCH 2/2] remove useless code --- qlib/finco/data_cache_demo.py | 53 ----------------------------------- 1 file changed, 53 deletions(-) delete mode 100644 qlib/finco/data_cache_demo.py diff --git a/qlib/finco/data_cache_demo.py b/qlib/finco/data_cache_demo.py deleted file mode 100644 index eb000335d..000000000 --- a/qlib/finco/data_cache_demo.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. -""" - The motivation of this demo - - To show the data modules of Qlib is Serializable, users can dump processed data to disk to avoid duplicated data preprocessing -""" - -from copy import deepcopy -from pathlib import Path -import pickle -from pprint import pprint -import subprocess -import yaml -from qlib.log import TimeInspector - -from qlib import init -from qlib.data.dataset.handler import DataHandlerLP -from qlib.utils import init_instance_by_config - -# For general purpose, we use relative path -DIRNAME = Path(__file__).absolute().resolve().parent / "finco_workspace" - -if __name__ == "__main__": - init() - - config_path = Path("/home/xuyang/workspace/qlib/qlib/finco/finco_workspace/workflow_config.yaml") - - # 1) show original time - # with TimeInspector.logt("The original time without handler cache:"): - # subprocess.run(f"qrun {config_path}", shell=True) - - # 2) dump handler - task_config = yaml.safe_load(config_path.open()) - # hd_conf = task_config["task"]["dataset"]["kwargs"]["handler"] - # pprint(hd_conf) - # hd: DataHandlerLP = init_instance_by_config(hd_conf) - hd_path = DIRNAME / "handler.pkl" - # hd.to_pickle(hd_path, dump_all=True) - - # 3) create new task with handler cache - new_task_config = deepcopy(task_config) - new_task_config["task"]["dataset"]["kwargs"]["handler"] = f"file://{hd_path}" - new_task_config["sys"] = {"path": [str(config_path.parent.resolve())]} - new_task_path = DIRNAME / "workflow_config.yaml" - print("The location of the new task", new_task_path) - - # save new task - with new_task_path.open("w") as f: - yaml.safe_dump(new_task_config, f, indent=4, sort_keys=False) - - # 4) train model with new task - # with TimeInspector.logt("The time for task with handler cache:"): - # subprocess.run(f"qrun {new_task_path}", shell=True)