diff --git a/qlib/finco/cli.py b/qlib/finco/cli.py index 0a6a32d74..e674b6af3 100644 --- a/qlib/finco/cli.py +++ b/qlib/finco/cli.py @@ -1,5 +1,5 @@ import fire -from qlib.finco.task import WorkflowManager +from qlib.finco.workflow import WorkflowManager from dotenv import load_dotenv from qlib import auto_init diff --git a/qlib/finco/task.py b/qlib/finco/task.py index e76de2c2d..f01c26355 100644 --- a/qlib/finco/task.py +++ b/qlib/finco/task.py @@ -4,13 +4,10 @@ from pathlib import Path from typing import Any, List from qlib.log import get_module_logger from qlib.typehint import Literal -from qlib.finco.conf import Config from qlib.finco.llm import try_create_chat_completion -from qlib.finco.utils import parse_json from jinja2 import Template import abc -import copy import logging @@ -105,6 +102,7 @@ class WorkflowTask(Task): "content": prompt_workflow_selection, }, ] + response = "" response = try_create_chat_completion(messages=messages) workflow = response.split(":")[1].strip().lower() self.executed = True @@ -167,30 +165,7 @@ class RLTask(PlanTask): class ActionTask(Task): def execute(self) -> Literal["fail", "success"]: return "success" - -"""Context Manager stores the context of the workflow""" -"""All context are key value pairs which saves the input, output and status of the whole workflow""" -class WorkflowContextManager(): - def __init__(self) -> None: - self.context = {} - self.logger = get_module_logger("fincoWorkflowContextManager") - - def set_context(self, key, value): - if key in self.context: - self.logger.warning("The key already exists in the context, the value will be overwritten") - self.context[key] = value - - def get_context(self, key): - if key not in self.context: - self.logger.warning("The key doesn't exist in the context") - return None - return self.context[key] - - """return a deep copy of the context""" - """TODO: do we need to return a deep copy?""" - def get_all_context(self): - return copy.deepcopy(self.context) - + class SummarizeTask(Task): def execution(self) -> Any: @@ -215,70 +190,3 @@ class SummarizeTask(Task): result.append({'postfix': postfix, 'content': content}) return result - -class WorkflowManager: - """This manange the whole task automation workflow including tasks and actions""" - - def __init__(self, name="project", output_path=None) -> None: - - if output_path is None: - self._output_path = Path.cwd() / name - else: - self._output_path = Path(output_path) - self._context = WorkflowContextManager() - - """Direct call set_context method of the context manager""" - def set_context(self, key, value): - self._context.set_context(key, value) - - def get_context(self) -> WorkflowContextManager: - return self._context - - def run(self, prompt: str) -> Path: - """ - The workflow manager is supposed to generate a codebase based on the prompt - - Parameters - ---------- - prompt: str - the prompt user gives - - Returns - ------- - Path - The workflow manager is expected to produce output that includes a codebase containing generated code, results, and reports in a designated location. - The path is returned - - The output path should follow a specific format: - - TODO: design - There is a summarized report where user can start from. - """ - - # NOTE: The following items are not designed to make the workflow very flexible. - # - The generated tasks can't be changed after geting new information from the execution retuls. - # - But it is required in some cases, if we want to build a external dataset, it maybe have to plan like autogpt... - - cfg = Config() - - # NOTE: list may not be enough for general task list - self.set_context("user_prompt", prompt) - task_list = [WorkflowTask()] - while len(task_list): - """task list is not long, so sort it is not a big problem""" - """TODO: sort the task list based on the priority of the task""" - # task_list = sorted(task_list, key=lambda x: x.task_type) - t = task_list.pop(0) - t.assign_context_manager(self._context) - res = t.execute() - if not cfg.continous_mode: - res = t.interact() - if isinstance(t.task_type, WorkflowTask) or isinstance(t.task_type, PlanTask): - task_list.extend(res) - elif isinstance(t.task_type, ActionTask): - if res != "success": - ... - # TODO: handle the unexpected execution Error - else: - raise NotImplementedError("Unsupported action type") - self.add_context(t.summarize()) - return self._output_path diff --git a/qlib/finco/workflow.py b/qlib/finco/workflow.py new file mode 100644 index 000000000..901104c27 --- /dev/null +++ b/qlib/finco/workflow.py @@ -0,0 +1,99 @@ +import copy +from pathlib import Path + +from qlib.log import get_module_logger +from qlib.finco.conf import Config +from qlib.finco.utils import parse_json +from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask + + +"""Context Manager stores the context of the workflow""" +"""All context are key value pairs which saves the input, output and status of the whole workflow""" +class WorkflowContextManager(): + def __init__(self) -> None: + self.context = {} + self.logger = get_module_logger("fincoWorkflowContextManager") + + def set_context(self, key, value): + if key in self.context: + self.logger.warning("The key already exists in the context, the value will be overwritten") + self.context[key] = value + + def get_context(self, key): + if key not in self.context: + self.logger.warning("The key doesn't exist in the context") + return None + return self.context[key] + + """return a deep copy of the context""" + """TODO: do we need to return a deep copy?""" + def get_all_context(self): + return copy.deepcopy(self.context) + + +class WorkflowManager: + """This manange the whole task automation workflow including tasks and actions""" + + def __init__(self, name="project", output_path=None) -> None: + + if output_path is None: + self._output_path = Path.cwd() / name + else: + self._output_path = Path(output_path) + self._context = WorkflowContextManager() + + """Direct call set_context method of the context manager""" + def set_context(self, key, value): + self._context.set_context(key, value) + + def get_context(self) -> WorkflowContextManager: + return self._context + + def run(self, prompt: str) -> Path: + """ + The workflow manager is supposed to generate a codebase based on the prompt + + Parameters + ---------- + prompt: str + the prompt user gives + + Returns + ------- + Path + The workflow manager is expected to produce output that includes a codebase containing generated code, results, and reports in a designated location. + The path is returned + + The output path should follow a specific format: + - TODO: design + There is a summarized report where user can start from. + """ + + # NOTE: The following items are not designed to make the workflow very flexible. + # - The generated tasks can't be changed after geting new information from the execution retuls. + # - But it is required in some cases, if we want to build a external dataset, it maybe have to plan like autogpt... + + cfg = Config() + + # NOTE: list may not be enough for general task list + self.set_context("user_prompt", prompt) + task_list = [WorkflowTask()] + while len(task_list): + """task list is not long, so sort it is not a big problem""" + """TODO: sort the task list based on the priority of the task""" + # task_list = sorted(task_list, key=lambda x: x.task_type) + t = task_list.pop(0) + t.assign_context_manager(self._context) + res = t.execute() + if not cfg.continous_mode: + res = t.interact() + if isinstance(t, WorkflowTask) or isinstance(t, PlanTask): + task_list.extend(res) + elif isinstance(t, ActionTask): + if res != "success": + ... + # TODO: handle the unexpected execution Error + else: + raise NotImplementedError("Unsupported action type") + self.add_context(t.summarize()) + return self._output_path