mirror of
https://github.com/microsoft/qlib.git
synced 2026-07-02 18:40:58 +08:00
Merge pull request #1528 from microsoft/xuyang1/refine_task_and_implement_workflow_task_as_example
Xuyang1/refine task and implement workflow task as example
This commit is contained in:
@@ -1,13 +1,15 @@
|
||||
import fire
|
||||
from qlib.finco.task import WorkflowManager
|
||||
from dotenv import load_dotenv
|
||||
from qlib import auto_init
|
||||
|
||||
|
||||
def main(prompt):
|
||||
def main(prompt=None):
|
||||
load_dotenv(verbose=True, override=True)
|
||||
wm = WorkflowManager()
|
||||
wm.run(prompt)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
auto_init()
|
||||
fire.Fire(main)
|
||||
|
||||
@@ -13,7 +13,7 @@ class Config():
|
||||
def __init__(self):
|
||||
self.use_azure = os.getenv("USE_AZURE") == "True"
|
||||
self.temperature = 0.5 if os.getenv("TEMPERATURE") is None else float(os.getenv("TEMPERATURE"))
|
||||
self.max_tokens = 8000 if os.getenv("MAX_TOKENS") is None else int(os.getenv("MAX_TOKENS"))
|
||||
self.max_tokens = 800 if os.getenv("MAX_TOKENS") is None else int(os.getenv("MAX_TOKENS"))
|
||||
|
||||
self.openai_api_key = os.getenv("OPENAI_API_KEY")
|
||||
self.use_azure = os.getenv("USE_AZURE") == "True"
|
||||
@@ -21,4 +21,6 @@ class Config():
|
||||
self.azure_api_version = os.getenv("AZURE_API_VERSION")
|
||||
self.model = os.getenv("MODEL") or ("gpt-35-turbo" if self.use_azure else "gpt-3.5-turbo")
|
||||
|
||||
self.max_retry = os.getenv("MAX_RETRY")
|
||||
self.max_retry = os.getenv("MAX_RETRY")
|
||||
|
||||
self.continous_mode = os.getenv("CONTINOUS_MODE") == "True" if os.getenv("CONTINOUS_MODE") is not None else False
|
||||
@@ -1,3 +1,4 @@
|
||||
import time
|
||||
import openai
|
||||
from typing import Optional
|
||||
from qlib.finco.conf import Config
|
||||
@@ -25,6 +26,7 @@ def try_create_chat_completion(max_retry=10, **kwargs):
|
||||
except openai.error.RateLimitError as e:
|
||||
print(e)
|
||||
print(f"Retrying {i+1}th time...")
|
||||
time.sleep(1)
|
||||
continue
|
||||
raise Exception(f"Failed to create chat completion after {max_retry} retries.")
|
||||
|
||||
@@ -56,7 +58,8 @@ def create_chat_completion(
|
||||
model=cfg.model,
|
||||
messages=messages,
|
||||
)
|
||||
return response
|
||||
resp = response.choices[0].message["content"]
|
||||
return resp
|
||||
|
||||
if __name__ == "__main__":
|
||||
create_chat_completion()
|
||||
@@ -2,12 +2,22 @@ import os
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, List
|
||||
from qlib.log import get_module_logger
|
||||
from qlib.typehint import Literal
|
||||
from qlib.finco.conf import Config
|
||||
from qlib.finco.llm import try_create_chat_completion
|
||||
from qlib.finco.utils import parse_json
|
||||
from jinja2 import Template
|
||||
|
||||
import abc
|
||||
import copy
|
||||
import logging
|
||||
|
||||
|
||||
class Task:
|
||||
class Task():
|
||||
"""
|
||||
The user's intention, which was initially represented by a prompt, is achieved through a sequence of tasks.
|
||||
This class doesn't have to be abstract, but it is abstract in the sense that it is not supposed to be instantiated directly because it doesn't have any implementation.
|
||||
|
||||
Some thoughts:
|
||||
- Do we have to split create a new concept of Action besides Task?
|
||||
@@ -21,34 +31,131 @@ class Task:
|
||||
- Edit Task: it is supposed to edit the code base directly.
|
||||
"""
|
||||
|
||||
def __init__(self, context=None) -> None:
|
||||
pass
|
||||
|
||||
## all subclass should implement this method to determine task type
|
||||
@abc.abstractclassmethod
|
||||
def __init__(self) -> None:
|
||||
self._context_manager = None
|
||||
self.executed = False
|
||||
|
||||
def summarize(self) -> str:
|
||||
"""After the execution of the task, it is supposed to generated some context about the execution"""
|
||||
return ""
|
||||
raise NotImplementedError
|
||||
|
||||
def update_context(self, latest_context):
|
||||
"""assign the workflow context manager to the task"""
|
||||
"""then all tasks can use this context manager to share the same context"""
|
||||
def assign_context_manager(self, context_manager):
|
||||
...
|
||||
self._context_manager = context_manager
|
||||
|
||||
def execution(self) -> Any:
|
||||
def execution(self, **kwargs) -> Any:
|
||||
"""The execution results of the task"""
|
||||
pass
|
||||
raise NotImplementedError
|
||||
|
||||
def interact(self) -> Any:
|
||||
"""The user can interact with the task"""
|
||||
"""All sub classes should implement the interact method to determine the next task"""
|
||||
"""In continous mode, this method will not be called and the next task will be determined by the execution method only"""
|
||||
raise NotImplementedError("The interact method is not implemented, but workflow not in continous mode")
|
||||
|
||||
class WorkflowTask(Task):
|
||||
"""This task is supposed to be the first task of the workflow"""
|
||||
def __init__(self,) -> None:
|
||||
super().__init__()
|
||||
self.__DEFAULT_WORKFLOW_SYSTEM_PROMPT = """
|
||||
Your task is to determine the workflow in Qlib (supervised learning or reinforcemtn learning) ensureing the workflow can meet the user's requirements.
|
||||
|
||||
The user will provide the requirements, you will provide only the output the choice in exact format specified below with no explanation or conversation.
|
||||
|
||||
Example input 1:
|
||||
Help me build a build a low turnover quant investment strategy that focus more on long turn return in China a stock market.
|
||||
|
||||
Example output 1:
|
||||
workflow: supervised learning
|
||||
|
||||
Example input 2:
|
||||
Help me build a build a pipeline to determine the best selling point of a stock in a day or half a day in USA stock market.
|
||||
|
||||
Example output 2:
|
||||
workflow: reinforcemtn learning
|
||||
"""
|
||||
|
||||
self.__DEFAULT_WORKFLOW_USER_PROMPT = (
|
||||
"User input: '{{user_prompt}}'\n"
|
||||
"Please provide the workflow in Qlib (supervised learning or reinforcemtn learning) ensureing the workflow can meet the user's requirements.\n"
|
||||
"Response only with the output in the exact format specified in the system prompt, with no explanation or conversation.\n"
|
||||
)
|
||||
self.__DEFAULT_USER_PROMPT = "Please help me build a low turnover strategy that focus more on longterm return in China a stock market."
|
||||
self.logger = get_module_logger("fincoWorkflowTask", level=logging.INFO)
|
||||
|
||||
"""make the choice which main workflow (RL, SL) will be used"""
|
||||
def execute(self,) -> List[Task]:
|
||||
user_prompt = self._context_manager.get_context("user_prompt")
|
||||
user_prompt = user_prompt if user_prompt is not None else self.__DEFAULT_USER_PROMPT
|
||||
system_prompt = self.__DEFAULT_WORKFLOW_SYSTEM_PROMPT
|
||||
prompt_workflow_selection = Template(
|
||||
self.__DEFAULT_WORKFLOW_USER_PROMPT
|
||||
).render(user_prompt=user_prompt)
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": system_prompt,
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": prompt_workflow_selection,
|
||||
},
|
||||
]
|
||||
response = try_create_chat_completion(messages=messages)
|
||||
workflow = response.split(":")[1].strip().lower()
|
||||
self.executed = True
|
||||
self._context_manager.set_context("workflow", workflow)
|
||||
if workflow == "supervised learning":
|
||||
return [SLTask()]
|
||||
elif workflow == "reinforcement learning":
|
||||
return [RLTask()]
|
||||
else:
|
||||
raise ValueError(f"The workflow: {workflow} is not supported")
|
||||
|
||||
def interact(self) -> Any:
|
||||
assert self.executed == True, "The workflow task has not been executed yet"
|
||||
## TODO use logger
|
||||
self.logger.info(
|
||||
f"The workflow has been determined to be ---{self._context_manager.get_context('workflow')}---"
|
||||
)
|
||||
self.logger.info(
|
||||
"Enter 'y' to authorise command,'s' to run self-feedback commands, "
|
||||
"'n' to exit program, or enter feedback for WorkflowTask"
|
||||
)
|
||||
try:
|
||||
answer = input("You answer is:")
|
||||
except KeyboardInterrupt:
|
||||
self.logger.info("User has exited the program")
|
||||
exit()
|
||||
if answer.lower().strip() == "y":
|
||||
return
|
||||
else:
|
||||
# TODO add self feedback
|
||||
raise ValueError("The input cannot be interpreted as a valid input")
|
||||
|
||||
|
||||
class PlanTask(Task):
|
||||
def execute(self) -> List[Task]:
|
||||
def execute(self, prompt) -> List[Task]:
|
||||
return []
|
||||
|
||||
|
||||
class WorkflowTask(PlanTask):
|
||||
"""make the choice which main workflow (RL, SL) will be used"""
|
||||
|
||||
def execute(self):
|
||||
...
|
||||
|
||||
|
||||
class SLTask(PlanTask):
|
||||
def __init__(self,) -> None:
|
||||
super().__init__()
|
||||
|
||||
def exeute(self):
|
||||
"""
|
||||
return a list of interested tasks
|
||||
Copy the template project maybe a part of the task
|
||||
"""
|
||||
return []
|
||||
|
||||
class RLTask(PlanTask):
|
||||
def __init__(self,) -> None:
|
||||
super().__init__()
|
||||
def exeute(self):
|
||||
"""
|
||||
return a list of interested tasks
|
||||
@@ -60,6 +167,29 @@ class SLTask(PlanTask):
|
||||
class ActionTask(Task):
|
||||
def execute(self) -> Literal["fail", "success"]:
|
||||
return "success"
|
||||
|
||||
"""Context Manager stores the context of the workflow"""
|
||||
"""All context are key value pairs which saves the input, output and status of the whole workflow"""
|
||||
class WorkflowContextManager():
|
||||
def __init__(self) -> None:
|
||||
self.context = {}
|
||||
self.logger = get_module_logger("fincoWorkflowContextManager")
|
||||
|
||||
def set_context(self, key, value):
|
||||
if key in self.context:
|
||||
self.logger.warning("The key already exists in the context, the value will be overwritten")
|
||||
self.context[key] = value
|
||||
|
||||
def get_context(self, key):
|
||||
if key not in self.context:
|
||||
self.logger.warning("The key doesn't exist in the context")
|
||||
return None
|
||||
return self.context[key]
|
||||
|
||||
"""return a deep copy of the context"""
|
||||
"""TODO: do we need to return a deep copy?"""
|
||||
def get_all_context(self):
|
||||
return copy.deepcopy(self.context)
|
||||
|
||||
|
||||
class SummarizeTask(Task):
|
||||
@@ -95,13 +225,14 @@ class WorkflowManager:
|
||||
self._output_path = Path.cwd() / name
|
||||
else:
|
||||
self._output_path = Path(output_path)
|
||||
self._context = []
|
||||
self._context = WorkflowContextManager()
|
||||
|
||||
def add_context(self, task_res):
|
||||
self._context.append(task_res)
|
||||
"""Direct call set_context method of the context manager"""
|
||||
def set_context(self, key, value):
|
||||
self._context.set_context(key, value)
|
||||
|
||||
def get_context(self):
|
||||
"""TODO: context manger?"""
|
||||
def get_context(self) -> WorkflowContextManager:
|
||||
return self._context
|
||||
|
||||
def run(self, prompt: str) -> Path:
|
||||
"""
|
||||
@@ -127,16 +258,23 @@ class WorkflowManager:
|
||||
# - The generated tasks can't be changed after geting new information from the execution retuls.
|
||||
# - But it is required in some cases, if we want to build a external dataset, it maybe have to plan like autogpt...
|
||||
|
||||
cfg = Config()
|
||||
|
||||
# NOTE: list may not be enough for general task list
|
||||
task_list = [WorkflowTask(prompt)]
|
||||
self.set_context("user_prompt", prompt)
|
||||
task_list = [WorkflowTask()]
|
||||
while len(task_list):
|
||||
# task_list.ap
|
||||
"""task list is not long, so sort it is not a big problem"""
|
||||
"""TODO: sort the task list based on the priority of the task"""
|
||||
# task_list = sorted(task_list, key=lambda x: x.task_type)
|
||||
t = task_list.pop(0)
|
||||
t.update_context(self.get_context())
|
||||
t.assign_context_manager(self._context)
|
||||
res = t.execute()
|
||||
if isinstance(t, PlanTask):
|
||||
if not cfg.continous_mode:
|
||||
res = t.interact()
|
||||
if isinstance(t.task_type, WorkflowTask) or isinstance(t.task_type, PlanTask):
|
||||
task_list.extend(res)
|
||||
elif isinstance(t, ActionTask):
|
||||
elif isinstance(t.task_type, ActionTask):
|
||||
if res != "success":
|
||||
...
|
||||
# TODO: handle the unexpected execution Error
|
||||
|
||||
9
qlib/finco/utils.py
Normal file
9
qlib/finco/utils.py
Normal file
@@ -0,0 +1,9 @@
|
||||
import json
|
||||
|
||||
def parse_json(response):
|
||||
try:
|
||||
return json.loads(response)
|
||||
except json.decoder.JSONDecodeError:
|
||||
pass
|
||||
|
||||
raise Exception(f"Failed to parse response: {response}, please report it or help us to fix it.")
|
||||
Reference in New Issue
Block a user