1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-02 02:21:18 +08:00

Merge pull request #1532 from microsoft/xuyang1/add-plan-and-config-task-implementation

add the initial version of plan and config task implementation
This commit is contained in:
Xu Yang
2023-06-01 11:20:04 +08:00
committed by GitHub
5 changed files with 282 additions and 72 deletions

View File

@@ -21,6 +21,6 @@ class Config():
self.azure_api_version = os.getenv("AZURE_API_VERSION")
self.model = os.getenv("MODEL") or ("gpt-35-turbo" if self.use_azure else "gpt-3.5-turbo")
self.max_retry = os.getenv("MAX_RETRY")
self.max_retry = int(os.getenv("MAX_RETRY")) if os.getenv("MAX_RETRY") is not None else None
self.continous_mode = os.getenv("CONTINOUS_MODE") == "True" if os.getenv("CONTINOUS_MODE") is not None else False

View File

@@ -26,7 +26,7 @@ def try_create_chat_completion(max_retry=10, **kwargs):
except openai.error.RateLimitError as e:
print(e)
print(f"Retrying {i+1}th time...")
time.sleep(1)
time.sleep(2)
continue
raise Exception(f"Failed to create chat completion after {max_retry} retries.")

View File

@@ -2,14 +2,13 @@ import os
from pathlib import Path
from typing import Any, List
from qlib.log import get_module_logger
from qlib.typehint import Literal
from qlib.finco.llm import try_create_chat_completion
from jinja2 import Template
import abc
import re
import logging
from qlib.log import get_module_logger
from qlib.finco.utils import build_messages_and_create_chat_completion
class Task():
"""
@@ -28,89 +27,90 @@ class Task():
- Edit Task: it is supposed to edit the code base directly.
"""
## all subclass should implement this method to determine task type
@abc.abstractclassmethod
def __init__(self) -> None:
self._context_manager = None
self.executed = False
self.logger : logging.Logger = get_module_logger(f"finco.{self.__class__.__name__}")
def summarize(self) -> str:
"""After the execution of the task, it is supposed to generated some context about the execution"""
raise NotImplementedError
"""This function might be converted to abstract method in the future"""
self.logger.info("The method has nothing to summarize")
"""assign the workflow context manager to the task"""
"""then all tasks can use this context manager to share the same context"""
def assign_context_manager(self, context_manager):
...
"""assign the workflow context manager to the task"""
"""then all tasks can use this context manager to share the same context"""
self._context_manager = context_manager
def save_chat_history_to_context_manager(self, user_input, response):
chat_history = self._context_manager.get_context("chat_history")
if chat_history is None:
chat_history = []
chat_history.append({"role": "user", "content": user_input})
chat_history.append({"role": "assistant", "content": response})
self._context_manager.update_context("chat_history", chat_history)
def execution(self, **kwargs) -> Any:
@abc.abstractclassmethod
def execute(self, **kwargs) -> Any:
"""The execution results of the task"""
"""All sub classes should implement the execute method to determine the next task"""
raise NotImplementedError
@abc.abstractclassmethod
def interact(self) -> Any:
"""The user can interact with the task"""
"""All sub classes should implement the interact method to determine the next task"""
"""In continous mode, this method will not be called and the next task will be determined by the execution method only"""
raise NotImplementedError("The interact method is not implemented, but workflow not in continous mode")
class WorkflowTask(Task):
"""This task is supposed to be the first task of the workflow"""
def __init__(self,) -> None:
super().__init__()
self.__DEFAULT_WORKFLOW_SYSTEM_PROMPT = """
Your task is to determine the workflow in Qlib (supervised learning or reinforcemtn learning) ensureing the workflow can meet the user's requirements.
Your task is to determine the workflow in Qlib (supervised learning or reinforcement learning) ensuring the workflow can meet the user's requirements.
The user will provide the requirements, you will provide only the output the choice in exact format specified below with no explanation or conversation.
The user will provide the requirements, you will provide only the output the choice in exact format specified below with no explanation or conversation.
Example input 1:
Help me build a build a low turnover quant investment strategy that focus more on long turn return in China a stock market.
Example input 1:
Help me build a low turnover quant investment strategy that focus more on long turn return in China a stock market.
Example output 1:
workflow: supervised learning
Example output 1:
workflow: supervised learning
Example input 2:
Help me build a build a pipeline to determine the best selling point of a stock in a day or half a day in USA stock market.
Example input 2:
Help me build a pipeline to determine the best selling point of a stock in a day or half a day in USA stock market.
Example output 2:
workflow: reinforcemtn learning
Example output 2:
workflow: reinforcement learning
"""
self.__DEFAULT_WORKFLOW_USER_PROMPT = (
"User input: '{{user_prompt}}'\n"
"Please provide the workflow in Qlib (supervised learning or reinforcemtn learning) ensureing the workflow can meet the user's requirements.\n"
"Please provide the workflow in Qlib (supervised learning or reinforcement learning) ensureing the workflow can meet the user's requirements.\n"
"Response only with the output in the exact format specified in the system prompt, with no explanation or conversation.\n"
)
self.__DEFAULT_USER_PROMPT = "Please help me build a low turnover strategy that focus more on longterm return in China a stock market."
self.logger = get_module_logger("fincoWorkflowTask", level=logging.INFO)
"""make the choice which main workflow (RL, SL) will be used"""
def execute(self,) -> List[Task]:
"""make the choice which main workflow (RL, SL) will be used"""
user_prompt = self._context_manager.get_context("user_prompt")
user_prompt = user_prompt if user_prompt is not None else self.__DEFAULT_USER_PROMPT
system_prompt = self.__DEFAULT_WORKFLOW_SYSTEM_PROMPT
prompt_workflow_selection = Template(
self.__DEFAULT_WORKFLOW_USER_PROMPT
).render(user_prompt=user_prompt)
messages = [
{
"role": "system",
"content": system_prompt,
},
{
"role": "user",
"content": prompt_workflow_selection,
},
]
response = ""
response = try_create_chat_completion(messages=messages)
response = build_messages_and_create_chat_completion(prompt_workflow_selection, system_prompt)
self.save_chat_history_to_context_manager(prompt_workflow_selection, response)
# TODO: use the above line instead of the following line before release!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# response = 'workflow: supervised learning'
workflow = response.split(":")[1].strip().lower()
self.executed = True
self._context_manager.set_context("workflow", workflow)
if workflow == "supervised learning":
return [SLTask()]
return [SLPlanTask()]
elif workflow == "reinforcement learning":
return [RLTask()]
return [RLPlanTask()]
else:
raise ValueError(f"The workflow: {workflow} is not supported")
@@ -137,24 +137,85 @@ class WorkflowTask(Task):
class PlanTask(Task):
def execute(self, prompt) -> List[Task]:
return []
pass
class SLTask(PlanTask):
class SLPlanTask(PlanTask):
def __init__(self,) -> None:
super().__init__()
self.__DEFAULT_WORKFLOW_SYSTEM_PROMPT = """
Your task is to determine the 5 crucial components in Qlib (Dataset, Model, Record, Strategy, Backtest) ensuring the workflow can meet the user's requirements.
def exeute(self):
For each component, you first point out whether to use default module in Qlib or implement the new module (Default or Personized). Default module means the class has already be implemented by Qlib which can be found in document and source code. Default class can be directed called from config file without additional implementation. Personized module means new python class is implemented and called from config file. You should always provide the reason of your choice.
The user will provide the requirements, you will provide only the output the choice in exact format specified below with no explanation or conversation. You only response 5 components in the order of dataset, model, record, strategy, backtest with no other addition.
Example input:
Help me build a low turnover quant investment strategy that focus more on long turn return in China a stock market. I have some data in csv format and I want to merge them with the data in Qlib.
Example output:
components:
- Dataset: (Personized) I will implement a CustomDataset inherited from qlib.data.dataset and exposed a api to load user's csv file. I will check the format of user's data and align them with Qlib data. Because it is a suitable dataset to get a long turn return in China A stock market.
- Model: (Default) I will use LGBModel in qlib.contrib.model.gbdt and choose more robust hyperparameters to focus on long-term return. Because tree model is more stable than NN models and is more unlikely to be over converged.
- Record: (Default) I will use SignalRecord in qlib.workflow.record_temp and SigAnaRecord in qlib.workflow.record_temp to save all the signals and the analysis results. Because user needs to check the metrics to determine whether the system meets the requirements.
- Strategy: (Default) I will use TopkDropoutStrategy in qlib.contrib.strategy. Because it is a more robust strategy which saves turnover fee.
- Backtest: (Default) I will use the default backtest module in Qlib. Because it can tell the user a more real performance result of the model we build.
"""
return a list of interested tasks
Copy the template project maybe a part of the task
"""
return []
self.__DEFAULT_WORKFLOW_USER_PROMPT = (
"User input: '{{user_prompt}}'\n"
"Please provide the 5 crucial components in Qlib (dataset, model, record, strategy, backtest) ensureing the workflow can meet the user's requirements.\n"
"Response only with the output in the exact format specified in the system prompt, with no explanation or conversation.\n"
)
def execute(self):
workflow = self._context_manager.get_context("workflow")
assert workflow == "supervised learning", "The workflow is not supervised learning"
user_prompt = self._context_manager.get_context("user_prompt")
assert user_prompt is not None, "The user prompt is not provided"
system_prompt = self.__DEFAULT_WORKFLOW_SYSTEM_PROMPT
prompt_plan_all = Template(
self.__DEFAULT_WORKFLOW_USER_PROMPT
).render(user_prompt=user_prompt)
response = build_messages_and_create_chat_completion(prompt_plan_all, system_prompt)
self.save_chat_history_to_context_manager(prompt_plan_all, response)
# TODO: use upper lines instead of the following line before release!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# response = 'components:\n- Dataset: (Default) I will use a default dataset in Qlib for China A stock market. Because it is a suitable dataset that already contains the required data.\n\n- Model: (Default) I will use LGBModel in qlib.contrib.model.gbdt and choose more robust hyperparameters to focus on long-term return. Because tree model is more stable than NN models and is more unlikely to be over converged.\n\n- Record: (Default) I will use SignalRecord in qlib.workflow.record_temp and SigAnaRecord in qlib.workflow.record_temp to save all the signals and the analysis results. Because user needs to check the metrics to determine whether the system meets the requirements.\n\n- Strategy: (Default) I will use TopkDropoutStrategy in qlib.contrib.strategy. Because it is a more robust strategy which saves turnover.\n\n- Backtest: (Default) I will use the default backtest module in Qlib. Because it can tell the user a more real performance result of the model we build.'
if "components" not in response:
self.logger.warning("The response is not in the correct format, which probably means the answer is not correct")
regex_dict = {
"Dataset":re.compile("Dataset: \((.*?)\) (.*?)\n"),
"Model":re.compile("Model: \((.*?)\) (.*?)\n"),
"Record":re.compile("Record: \((.*?)\) (.*?)\n"),
"Strategy":re.compile("Strategy: \((.*?)\) (.*?)\n"),
"Backtest":re.compile("Backtest: \((.*?)\) (.*?)$"),
}
new_task = []
for name, regex in regex_dict.items():
res = re.search(regex, response)
if not res:
self.logger.error(f"The search for {name} decision failed")
else:
self._context_manager.set_context(f"{name}_decision", res.group(1))
self._context_manager.set_context(f"{name}_plan", res.group(2))
assert res.group(1) in ["Default", "Personized"]
if res.group(1) == "Default":
new_task.append(ConfigActionTask(name))
elif res.group(1) == "Personized":
new_task.append(ConfigActionTask(name), ImplementActionTask(name))
return new_task
class RLTask(PlanTask):
class RLPlanTask(PlanTask):
def __init__(self,) -> None:
super().__init__()
def exeute(self):
self.logger.error("The RL task is not implemented yet")
exit()
def execute(self):
"""
return a list of interested tasks
Copy the template project maybe a part of the task
@@ -163,9 +224,116 @@ class RLTask(PlanTask):
class ActionTask(Task):
def execute(self) -> Literal["fail", "success"]:
return "success"
pass
class ConfigActionTask(ActionTask):
def __init__(self, component) -> None:
super().__init__()
self.target_componet = component
self.__DEFAULT_CONFIG_ACTION_SYSTEM_PROMPT = """
Your task is to write the config of target component in Qlib(Dataset, Model, Record, Strategy, Backtest).
Config means the yaml file in Qlib. You can find the default config in qlib/contrib/config_template. You can also find the config in Qlib document.
The user has provided the requirements and made plan and reason to each component. You should strictly follow user's plan and you should provide the reason of your hyperparameter choices if exist and some suggestion if user wants to finetune the hyperparameters after the config. Default means you should only use classes in Qlib without any other new code while Personized has no such restriction. class in Qlib means Qlib has implemented the class and you can find it in Qlib document or source code.
You only need to write the config of the target component in the exact format specified below with no explanation or conversation.
Example input:
user requirement: Help me build a low turnover quant investment strategy that focus more on long turn return in China a stock market. I have some data in csv format and I want to merge them with the data in Qlib.
user plan:
- Dataset: (Personized) I will implement a CustomDataset imherited from qlib.data.dataset and exposed a api to load user's csv file. I will check the format of user's data and align them with Qlib data. Because it is a suitable dataset to get a long turn return in China A stock market.
- Model: (Default) I will use LGBModel in qlib.contrib.model.gbdt and choose more robust hyperparameters to focus on long-term return. Because tree model is more stable than NN models and is more unlikely to be over converged.
- Record: (Default) I will use SignalRecord in qlib.workflow.record_temp and SigAnaRecord in qlib.workflow.record_temp to save all the signals and the analysis results. Because user needs to check the metrics to determine whether the system meets the requirements.
- Strategy: (Default) I will use TopkDropoutStrategy in qlib.contrib.strategy. Because it is a more robust strategy which saves turnover fee.
- Backtest: (Default) I will use the default backtest module in Qlib. Because it can tell the user a more real performance result of the model we build.
target component: Model
Example output:
Config:
```yaml
model:
class: LGBModel
module_path: qlib.contrib.model.gbdt
kwargs:
loss: mse
colsample_bytree: 0.8879
learning_rate: 0.2
subsample: 0.8789
lambda_l1: 205.6999
lambda_l2: 580.9768
max_depth: 8
num_leaves: 210
num_threads: 20
```
Reason: I choose the hyperparameters above because they are the default hyperparameters in Qlib and they are more robust than other hyperparameters.
Improve suggestion: You can try to tune the num_leaves in range [100, 300], max_depth in [5, 10], learning_rate in [0.01, 1] and other hyperparameters in the config. Since you're trying to get a long tern return, if you have enough computation resource, you can try to use a larger num_leaves and max_depth and a smaller learning_rate.
"""
self.__CONFIG_ACTION_SYSTEM_PROMPT_TEMPLATE = (
"""
user requirement: {{user_requirement}}
user plan:
- Dataset: ({{dataset_decision}}) {{dataset_plan}}
- Model: ({{model_decision}}) {{model_plan}}
- Record: ({{record_decision}}) {{record_plan}}
- Strategy: ({{strategy_decision}}) {{strategy_plan}}
- Backtest: ({{backtest_decision}}) {{backtest_plan}}
target component: {{target_component}}
"""
)
def execute(self):
user_prompt = self._context_manager.get_context("user_prompt")
component_list = ["Dataset", "Model", "Record", "Strategy", "Backtest"]
prompt_element_dict = dict()
for component in component_list:
prompt_element_dict[f"{component}_decision"] = self._context_manager.get_context(f"{component}_decision")
prompt_element_dict[f"{component}_plan"] = self._context_manager.get_context(f"{component}_plan")
assert None not in prompt_element_dict.values(), "Some decision or plan is not set by plan maker"
config_prompt = Template(self.__CONFIG_ACTION_SYSTEM_PROMPT_TEMPLATE).render(
user_requirement=user_prompt,
dataset_decision=prompt_element_dict["Dataset_decision"],
dataset_plan=prompt_element_dict["Dataset_plan"],
model_decision=prompt_element_dict["Model_decision"],
model_plan=prompt_element_dict["Model_plan"],
record_decision=prompt_element_dict["Record_decision"],
record_plan=prompt_element_dict["Record_plan"],
strategy_decision=prompt_element_dict["Strategy_decision"],
strategy_plan=prompt_element_dict["Strategy_plan"],
backtest_decision=prompt_element_dict["Backtest_decision"],
backtest_plan=prompt_element_dict["Backtest_plan"],
target_component=self.target_componet
)
response = build_messages_and_create_chat_completion(config_prompt, self.__DEFAULT_CONFIG_ACTION_SYSTEM_PROMPT)
self.save_chat_history_to_context_manager(config_prompt, response)
res = re.search(r"Config:(.*)Reason:(.*)Improve suggestion:(.*)", response, re.S)
assert res is not None and len(res.groups()) == 3, "The response of config action task is not in the correct format"
config = re.search(r"```yaml(.*)```", res.group(1), re.S)
assert config is not None, "The config part of config action task response is not in the correct format"
config = config.group(1)
reason = res.group(2)
improve_suggestion = res.group(3)
self._context_manager.set_context(f"{self.target_componet}_config", config)
self._context_manager.set_context(f"{self.target_componet}_reason", reason)
self._context_manager.set_context(f"{self.target_componet}_improve_suggestion", improve_suggestion)
return []
class ImplementActionTask(ActionTask):
def __init__(self) -> None:
super().__init__()
def execute(self):
"""
return a list of interested tasks
Copy the template project maybe a part of the task
"""
return []
class SummarizeTask(Task):
__DEFAULT_OUTPUT_PATH = "./"

View File

@@ -1,4 +1,7 @@
import json
from qlib.finco.llm import try_create_chat_completion
from qlib.finco.conf import Config
from qlib.log import get_module_logger
def parse_json(response):
try:
@@ -6,4 +9,27 @@ def parse_json(response):
except json.decoder.JSONDecodeError:
pass
raise Exception(f"Failed to parse response: {response}, please report it or help us to fix it.")
raise Exception(f"Failed to parse response: {response}, please report it or help us to fix it.")
def build_messages_and_create_chat_completion(user_prompt, system_prompt=None):
"""build the messages to avoid implementing several redundant lines of code"""
cfg = Config()
# TODO: system prompt should always be provided. In development stage we can use default value
if system_prompt is None:
try:
system_prompt = cfg.system_prompt
except AttributeError:
get_module_logger("finco").warning("system_prompt is not set, using default value.")
system_prompt = "You are an AI assistant who helps to answer user's questions about finance."
messages = [
{
"role": "system",
"content": system_prompt,
},
{
"role": "user",
"content": user_prompt,
},
]
response = try_create_chat_completion(messages=messages)
return response

View File

@@ -6,44 +6,55 @@ from qlib.finco.conf import Config
from qlib.finco.utils import parse_json
from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask
"""Context Manager stores the context of the workflow"""
"""All context are key value pairs which saves the input, output and status of the whole workflow"""
class WorkflowContextManager():
class WorkflowContextManager:
"""Context Manager stores the context of the workflow"""
"""All context are key value pairs which saves the input, output and status of the whole workflow"""
def __init__(self) -> None:
self.context = {}
self.logger = get_module_logger("fincoWorkflowContextManager")
def set_context(self, key, value):
if key in self.context:
self.logger.warning("The key already exists in the context, the value will be overwritten")
self.logger.warning(
"The key already exists in the context, the value will be overwritten"
)
self.context[key] = value
def get_context(self, key):
# NOTE: if the key doesn't exist, return None. In the future, we may raise an error to detect abnormal behavior
if key not in self.context:
self.logger.warning("The key doesn't exist in the context")
return None
return self.context[key]
"""return a deep copy of the context"""
"""TODO: do we need to return a deep copy?"""
def update_context(self, key, new_value):
# NOTE: if the key doesn't exist, return None. In the future, we may raise an error to detect abnormal behavior
if key not in self.context:
self.logger.warning("The key doesn't exist in the context")
self.context.update({key: new_value})
def get_all_context(self):
"""return a deep copy of the context"""
"""TODO: do we need to return a deep copy?"""
return copy.deepcopy(self.context)
class WorkflowManager:
"""This manange the whole task automation workflow including tasks and actions"""
def __init__(self, name="project", output_path=None) -> None:
if output_path is None:
self._output_path = Path.cwd() / name
else:
self._output_path = Path(output_path)
self._context = WorkflowContextManager()
self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China a stock market."
"""Direct call set_context method of the context manager"""
def set_context(self, key, value):
"""Direct call set_context method of the context manager"""
self._context.set_context(key, value)
def get_context(self) -> WorkflowContextManager:
@@ -75,18 +86,24 @@ class WorkflowManager:
cfg = Config()
# NOTE: default user prompt might be changed in the future and exposed to the user
if prompt is None:
self.set_context("user_prompt", self.default_user_prompt)
else:
self.set_context("user_prompt", prompt)
# NOTE: list may not be enough for general task list
self.set_context("user_prompt", prompt)
task_list = [WorkflowTask()]
while len(task_list):
"""task list is not long, so sort it is not a big problem"""
"""TODO: sort the task list based on the priority of the task"""
# task list is not long, so sort it is not a big problem
# TODO: sort the task list based on the priority of the task
# task_list = sorted(task_list, key=lambda x: x.task_type)
t = task_list.pop(0)
t.assign_context_manager(self._context)
res = t.execute()
if not cfg.continous_mode:
res = t.interact()
t.summarize()
if isinstance(t, WorkflowTask) or isinstance(t, PlanTask):
task_list.extend(res)
elif isinstance(t, ActionTask):
@@ -95,5 +112,4 @@ class WorkflowManager:
# TODO: handle the unexpected execution Error
else:
raise NotImplementedError("Unsupported action type")
self.add_context(t.summarize())
return self._output_path