1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-04 11:30:57 +08:00

split task into workflow and task to make the strcture more clear

This commit is contained in:
Xu Yang
2023-05-31 11:45:25 +08:00
parent 421b1403b2
commit 3919678028
3 changed files with 102 additions and 95 deletions

View File

@@ -1,5 +1,5 @@
import fire
from qlib.finco.task import WorkflowManager
from qlib.finco.workflow import WorkflowManager
from dotenv import load_dotenv
from qlib import auto_init

View File

@@ -4,13 +4,10 @@ from pathlib import Path
from typing import Any, List
from qlib.log import get_module_logger
from qlib.typehint import Literal
from qlib.finco.conf import Config
from qlib.finco.llm import try_create_chat_completion
from qlib.finco.utils import parse_json
from jinja2 import Template
import abc
import copy
import logging
@@ -105,6 +102,7 @@ class WorkflowTask(Task):
"content": prompt_workflow_selection,
},
]
response = ""
response = try_create_chat_completion(messages=messages)
workflow = response.split(":")[1].strip().lower()
self.executed = True
@@ -167,30 +165,7 @@ class RLTask(PlanTask):
class ActionTask(Task):
def execute(self) -> Literal["fail", "success"]:
return "success"
"""Context Manager stores the context of the workflow"""
"""All context are key value pairs which saves the input, output and status of the whole workflow"""
class WorkflowContextManager():
def __init__(self) -> None:
self.context = {}
self.logger = get_module_logger("fincoWorkflowContextManager")
def set_context(self, key, value):
if key in self.context:
self.logger.warning("The key already exists in the context, the value will be overwritten")
self.context[key] = value
def get_context(self, key):
if key not in self.context:
self.logger.warning("The key doesn't exist in the context")
return None
return self.context[key]
"""return a deep copy of the context"""
"""TODO: do we need to return a deep copy?"""
def get_all_context(self):
return copy.deepcopy(self.context)
class SummarizeTask(Task):
def execution(self) -> Any:
@@ -215,70 +190,3 @@ class SummarizeTask(Task):
result.append({'postfix': postfix, 'content': content})
return result
class WorkflowManager:
"""This manange the whole task automation workflow including tasks and actions"""
def __init__(self, name="project", output_path=None) -> None:
if output_path is None:
self._output_path = Path.cwd() / name
else:
self._output_path = Path(output_path)
self._context = WorkflowContextManager()
"""Direct call set_context method of the context manager"""
def set_context(self, key, value):
self._context.set_context(key, value)
def get_context(self) -> WorkflowContextManager:
return self._context
def run(self, prompt: str) -> Path:
"""
The workflow manager is supposed to generate a codebase based on the prompt
Parameters
----------
prompt: str
the prompt user gives
Returns
-------
Path
The workflow manager is expected to produce output that includes a codebase containing generated code, results, and reports in a designated location.
The path is returned
The output path should follow a specific format:
- TODO: design
There is a summarized report where user can start from.
"""
# NOTE: The following items are not designed to make the workflow very flexible.
# - The generated tasks can't be changed after geting new information from the execution retuls.
# - But it is required in some cases, if we want to build a external dataset, it maybe have to plan like autogpt...
cfg = Config()
# NOTE: list may not be enough for general task list
self.set_context("user_prompt", prompt)
task_list = [WorkflowTask()]
while len(task_list):
"""task list is not long, so sort it is not a big problem"""
"""TODO: sort the task list based on the priority of the task"""
# task_list = sorted(task_list, key=lambda x: x.task_type)
t = task_list.pop(0)
t.assign_context_manager(self._context)
res = t.execute()
if not cfg.continous_mode:
res = t.interact()
if isinstance(t.task_type, WorkflowTask) or isinstance(t.task_type, PlanTask):
task_list.extend(res)
elif isinstance(t.task_type, ActionTask):
if res != "success":
...
# TODO: handle the unexpected execution Error
else:
raise NotImplementedError("Unsupported action type")
self.add_context(t.summarize())
return self._output_path

99
qlib/finco/workflow.py Normal file
View File

@@ -0,0 +1,99 @@
import copy
from pathlib import Path
from qlib.log import get_module_logger
from qlib.finco.conf import Config
from qlib.finco.utils import parse_json
from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask
"""Context Manager stores the context of the workflow"""
"""All context are key value pairs which saves the input, output and status of the whole workflow"""
class WorkflowContextManager():
def __init__(self) -> None:
self.context = {}
self.logger = get_module_logger("fincoWorkflowContextManager")
def set_context(self, key, value):
if key in self.context:
self.logger.warning("The key already exists in the context, the value will be overwritten")
self.context[key] = value
def get_context(self, key):
if key not in self.context:
self.logger.warning("The key doesn't exist in the context")
return None
return self.context[key]
"""return a deep copy of the context"""
"""TODO: do we need to return a deep copy?"""
def get_all_context(self):
return copy.deepcopy(self.context)
class WorkflowManager:
"""This manange the whole task automation workflow including tasks and actions"""
def __init__(self, name="project", output_path=None) -> None:
if output_path is None:
self._output_path = Path.cwd() / name
else:
self._output_path = Path(output_path)
self._context = WorkflowContextManager()
"""Direct call set_context method of the context manager"""
def set_context(self, key, value):
self._context.set_context(key, value)
def get_context(self) -> WorkflowContextManager:
return self._context
def run(self, prompt: str) -> Path:
"""
The workflow manager is supposed to generate a codebase based on the prompt
Parameters
----------
prompt: str
the prompt user gives
Returns
-------
Path
The workflow manager is expected to produce output that includes a codebase containing generated code, results, and reports in a designated location.
The path is returned
The output path should follow a specific format:
- TODO: design
There is a summarized report where user can start from.
"""
# NOTE: The following items are not designed to make the workflow very flexible.
# - The generated tasks can't be changed after geting new information from the execution retuls.
# - But it is required in some cases, if we want to build a external dataset, it maybe have to plan like autogpt...
cfg = Config()
# NOTE: list may not be enough for general task list
self.set_context("user_prompt", prompt)
task_list = [WorkflowTask()]
while len(task_list):
"""task list is not long, so sort it is not a big problem"""
"""TODO: sort the task list based on the priority of the task"""
# task_list = sorted(task_list, key=lambda x: x.task_type)
t = task_list.pop(0)
t.assign_context_manager(self._context)
res = t.execute()
if not cfg.continous_mode:
res = t.interact()
if isinstance(t, WorkflowTask) or isinstance(t, PlanTask):
task_list.extend(res)
elif isinstance(t, ActionTask):
if res != "success":
...
# TODO: handle the unexpected execution Error
else:
raise NotImplementedError("Unsupported action type")
self.add_context(t.summarize())
return self._output_path