diff --git a/qlib/finco/conf.py b/qlib/finco/conf.py index 374690e44..e0e0068fb 100644 --- a/qlib/finco/conf.py +++ b/qlib/finco/conf.py @@ -24,7 +24,7 @@ class Config(Singleton): self.max_retry = int(os.getenv("MAX_RETRY")) if os.getenv("MAX_RETRY") is not None else None - self.continous_mode = ( + self.continuous_mode = ( os.getenv("CONTINOUS_MODE") == "True" if os.getenv("CONTINOUS_MODE") is not None else False ) self.debug_mode = os.getenv("DEBUG_MODE") == "True" if os.getenv("DEBUG_MODE") is not None else False diff --git a/qlib/finco/log.py b/qlib/finco/log.py index ca01778c4..03d88db11 100644 --- a/qlib/finco/log.py +++ b/qlib/finco/log.py @@ -5,7 +5,6 @@ import logging from typing import Dict, List from qlib.finco.utils import Singleton -from qlib.log import get_module_logger from contextlib import contextmanager @@ -19,12 +18,38 @@ class LogColors: BLUE = "\033[94m" MAGENTA = "\033[95m" CYAN = "\033[96m" - BOLD = "\033[1m" - END = "\033[0m" WHITE = "\033[97m" GRAY = "\033[90m" BLACK = "\033[30m" - # TODO: Provide better interface to render text. (e.g. render(text, color.., style ..)) + + BOLD = "\033[1m" + ITALIC = "\033[3m" + + END = "\033[0m" + + @classmethod + def get_all_colors(cls): + names = dir(cls) + names = [name for name in names if not name.startswith("__") and not callable(getattr(cls, name))] + var_values = [getattr(cls, name) for name in names] + return var_values + + def render(self, text: str, color: str = "", style: str = ""): + """ + render text by input color and style. It's not recommend that input text is already rendered. + """ + # This method is called too frequently, which is not good. + colors = self.get_all_colors() + # Perhaps color and font should be distinguished here. + if color: + assert color in colors, f"color should be in: {colors} but now is: {color}" + if style: + assert style in colors, f"style should be in: {colors} but now is: {style}" + + text = f"{color}{text}{self.END}" + text = f"{style}{text}{self.END}" + + return text @contextmanager @@ -32,8 +57,9 @@ def formatting_log(logger, title="Info"): """ a context manager, print liens before and after a function """ - length = {"Start": 120, "Task": 120, "Info": 60}.get(title, 60) - color, bold = (LogColors.YELLOW, LogColors.BOLD) if title in ["Start", "Info", "Task"] else (LogColors.CYAN, "") + length = {"Start": 120, "Task": 120, "Info": 60, "Interact": 60, "End": 120}.get(title, 60) + color, bold = (LogColors.YELLOW, LogColors.BOLD) \ + if title in ["Start", "Task", "Info", "Interact", "End"] else (LogColors.CYAN, "") logger.info("") logger.info(f"{color}{bold}{'-'} {title} {'-' * (length - len(title))}{LogColors.END}") yield @@ -44,7 +70,6 @@ class FinCoLog(Singleton): # TODO: # - config to file logger and save it into workspace def __init__(self) -> None: - # self.logger = get_module_logger("interactive") self.logger = logging.Logger("interactive") # TODO: merge these with Qlib's default logger. # We can do the same thing by changing the default log dict of Qlib. @@ -74,7 +99,7 @@ class FinCoLog(Singleton): f"{LogColors.MAGENTA}{LogColors.BOLD}Role:{LogColors.END} " f"{LogColors.CYAN}{m['role']}{LogColors.END}\n" + f"{LogColors.MAGENTA}{LogColors.BOLD}Content:{LogColors.END} " - f"{LogColors.CYAN}{m['content']}{LogColors.END}") + f"{LogColors.CYAN}{m['content']}{LogColors.END}\n") def log_response(self, response: str): with formatting_log(self.logger, "GPT Response"): @@ -92,6 +117,15 @@ class FinCoLog(Singleton): def plain_info(self, *args): for arg in args: - # self.logger.info(arg) self.logger.info( f"{LogColors.YELLOW}{LogColors.BOLD}Info:{LogColors.END}{LogColors.WHITE}{arg}{LogColors.END}") + + def warning(self, *args): + for arg in args: + self.logger.warning( + f"{LogColors.BLUE}{LogColors.BOLD}Warning:{LogColors.END}{arg}") + + def error(self, *args): + for arg in args: + self.logger.error( + f"{LogColors.RED}{LogColors.BOLD}Error:{LogColors.END}{arg}") diff --git a/qlib/finco/prompt_template.yaml b/qlib/finco/prompt_template.yaml index 299af0d79..a4a4942ed 100644 --- a/qlib/finco/prompt_template.yaml +++ b/qlib/finco/prompt_template.yaml @@ -53,7 +53,7 @@ SLPlanTask_user : |- Please provide the 6 crucial components in Qlib (Dataset, DataHandler, Model, Record, Strategy, Backtest) ensureing the workflow can meet the user's requirements. Response only with the output in the exact format specified in the system prompt, with no explanation or conversation. -RecorderTask_system : |- +AnalysisTask_system : |- You are an expert system administrator. Your task is to select the best analysis class based on user intent from this list: {{ANALYZERS_list}} @@ -63,7 +63,7 @@ RecorderTask_system : |- Response only with the Analyser name provided above with no explanation or conversation. if there are more than one analyser, separate them by "," -RecorderTask_user : |- +AnalysisTask_user : |- {{user_prompt}}, The analyzers you select should separate by ",", such as: "HFAnalyzer", "SignalAnalyzer" diff --git a/qlib/finco/task.py b/qlib/finco/task.py index 5eaf08a24..364fc5987 100644 --- a/qlib/finco/task.py +++ b/qlib/finco/task.py @@ -3,15 +3,12 @@ import os from pathlib import Path import io from typing import Any, List, Union -from jinja2 import Template import ruamel.yaml as yaml import abc import re -import logging import subprocess import platform -from qlib.log import get_module_logger from qlib.finco.llm import APIBackend from qlib.finco.tpl import get_tpl_path from qlib.finco.prompt_template import PormptTemplate @@ -19,10 +16,12 @@ from qlib.workflow.record_temp import HFSignalRecord, SignalRecord from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer from qlib.utils import init_instance_by_config from qlib.workflow import R -from qlib.finco.log import FinCoLog +from qlib.finco.log import FinCoLog, LogColors +from qlib.finco.conf import Config COMPONENT_LIST = ["Dataset", "DataHandler", "Model", "Record", "Strategy", "Backtest"] + class Task: """ The user's intention, which was initially represented by a prompt, is achieved through a sequence of tasks. @@ -44,9 +43,7 @@ class Task: self._context_manager = None self.prompt_template = PormptTemplate() self.executed = False - # self.logger: logging.Logger = get_module_logger( - # f"finco.{self.__class__.__name__}" - # ) + self.continuous = Config().continuous_mode self.logger = FinCoLog() def summarize(self) -> str: @@ -74,14 +71,28 @@ class Task: """All sub classes should implement the execute method to determine the next task""" raise NotImplementedError - @abc.abstractclassmethod - def interact(self) -> Any: - """The user can interact with the task""" - """All sub classes should implement the interact method to determine the next task""" - """In continous mode, this method will not be called and the next task will be determined by the execution method only""" - raise NotImplementedError( - "The interact method is not implemented, but workflow not in continous mode" - ) + def interact(self, prompt: str, **kwargs) -> Any: + """ + The user can interact with the task. This method only handle business in current task. It will return True + while continuous is True. This method will return user input if input cannot be parsed as 'yes' or 'no'. + @return True, False, str + """ + self.logger.info(title="Interact") + if self.continuous: + return True + + try: + answer = input(prompt) + except KeyboardInterrupt: + self.logger.info("User has exited the program.") + exit() + + if answer.lower().strip() in ["y", "yes"]: + return True + elif answer.lower().strip() in ["n", "no"]: + return False + else: + return answer @property def system(self): @@ -93,11 +104,6 @@ class Task: def user(self): return self.prompt_template.__getattribute__(self.__class__.__name__ + "_user") - @staticmethod - def confirm(prompt: str): - answer = input(prompt) - return answer - def __str__(self): return self.__class__.__name__ @@ -105,14 +111,10 @@ class Task: class WorkflowTask(Task): """This task is supposed to be the first task of the workflow""" - def __init__( - self, - ) -> None: + def __init__(self) -> None: super().__init__() - def execute( - self, - ) -> List[Task]: + def execute(self) -> List[Task]: """make the choice which main workflow (RL, SL) will be used""" user_prompt = self._context_manager.get_context("user_prompt") prompt_workflow_selection = self.user.render(user_prompt=user_prompt) @@ -125,10 +127,16 @@ class WorkflowTask(Task): workflow = response.split(":")[1].strip().lower() self.executed = True self._context_manager.set_context("workflow", workflow) - answer = self.confirm(f"I select this workflow: {workflow}\n" - f"Are you sure you want to use? yes(Y/y), no(N/n):") - if str(answer) not in ["Y", "y"]: + + confirm = self.interact( + f"The workflow has been determined to be: " + f"{LogColors().render(workflow, color=LogColors.YELLOW, style=LogColors.BOLD)}\n" + f"Enter 'y' to authorise command,'s' to run self-feedback commands, " + f"'n' to exit program, or enter feedback for WorkflowTask: " + ) + if confirm is False: return [] + if workflow == "supervised learning": return [SLPlanTask()] elif workflow == "reinforcement learning": @@ -136,43 +144,18 @@ class WorkflowTask(Task): else: raise ValueError(f"The workflow: {workflow} is not supported") - def interact(self) -> Any: - assert self.executed == True, "The workflow task has not been executed yet" - ## TODO use logger - self.logger.info( - f"The workflow has been determined to be ---{self._context_manager.get_context('workflow')}---" - ) - self.logger.info( - "Enter 'y' to authorise command,'s' to run self-feedback commands, " - "'n' to exit program, or enter feedback for WorkflowTask" - ) - try: - answer = input("You answer is:") - except KeyboardInterrupt: - self.logger.info("User has exited the program") - exit() - if answer.lower().strip() == "y": - return - else: - # TODO add self feedback - raise ValueError("The input cannot be interpreted as a valid input") - class PlanTask(Task): pass class SLPlanTask(PlanTask): - def __init__( - self, - ) -> None: + def __init__(self,) -> None: super().__init__() def execute(self): workflow = self._context_manager.get_context("workflow") - assert ( - workflow == "supervised learning" - ), "The workflow is not supervised learning" + assert (workflow == "supervised learning"), "The workflow is not supervised learning" user_prompt = self._context_manager.get_context("user_prompt") assert user_prompt is not None, "The user prompt is not provided" @@ -223,7 +206,7 @@ class SLPlanTask(PlanTask): class RLPlanTask(PlanTask): def __init__( - self, + self, ) -> None: super().__init__() self.logger.error("The RL task is not implemented yet") @@ -242,6 +225,51 @@ class RecorderTask(Task): This Recorder task is responsible for analysing data such as index and distribution. """ + def __init__(self): + super().__init__() + + def execute(self): + workflow_config = ( + self._context_manager.get_context("workflow_config") + if self._context_manager.get_context("workflow_config") + else "workflow_config.yaml" + ) + workspace = self._context_manager.get_context("workspace") + workflow_path = workspace.joinpath(workflow_config) + with workflow_path.open() as f: + workflow = yaml.safe_load(f) + + confirm = self.interact(f"I select this workflow file: " + f"{LogColors().render(workflow_path, color=LogColors.YELLOW, style=LogColors.BOLD)}\n" + f"{yaml.dump(workflow, default_flow_style=False)}" + f"Are you sure you want to use? yes(Y/y), no(N/n):") + if confirm is False: + return [] + + model = init_instance_by_config(workflow["task"]["model"]) + dataset = init_instance_by_config(workflow["task"]["dataset"]) + + with R.start(experiment_name="finCo"): + model.fit(dataset) + R.save_objects(trained_model=model) + + # prediction + recorder = R.get_recorder() + sr = SignalRecord(model, dataset, recorder) + sr.generate() + + self._context_manager.set_context("model", model) + self._context_manager.set_context("dataset", dataset) + self._context_manager.set_context("recorder", recorder) + + return [AnalysisTask()] + + +class AnalysisTask(Task): + """ + This Recorder task is responsible for analysing data such as index and distribution. + """ + __ANALYZERS_PROJECT = { HFAnalyzer.__name__: HFSignalRecord, SignalAnalyzer.__name__: SignalRecord, @@ -250,12 +278,9 @@ class RecorderTask(Task): HFAnalyzer.__name__: HFAnalyzer.__doc__, SignalAnalyzer.__name__: SignalAnalyzer.__doc__, } - # __ANALYZERS_PROJECT = {SignalAnalyzer.__name__: SignalRecord} - # __ANALYZERS_DOCS = {SignalAnalyzer.__name__: SignalAnalyzer.__doc__} def __init__(self): super().__init__() - self._output = None def execute(self): prompt = self.user.render( @@ -263,52 +288,38 @@ class RecorderTask(Task): ) be = APIBackend() be.debug_mode = False - response = be.build_messages_and_create_chat_completion( - prompt, - self.system.render( - ANALYZERS_list=list(self.__ANALYZERS_DOCS.keys()), - ANALYZERS_DOCS=self.__ANALYZERS_DOCS, - ), - ) - analysers = response.split(",") - answer = self.confirm(f"I select these analysers: {analysers}\nAre you sure you want to use? yes(Y/y), no(N/n):") - if str(answer) not in ["Y", "y"]: - analysers = [] + while True: + response = be.build_messages_and_create_chat_completion( + prompt, + self.system.render( + ANALYZERS_list=list(self.__ANALYZERS_DOCS.keys()), + ANALYZERS_DOCS=self.__ANALYZERS_DOCS, + ), + ) + analysers = response.split(",") + confirm = self.interact(f"I select these analysers: {analysers}\n" + f"Are you sure you want to use? yes(Y/y), no(N/n) or prompt:") + if confirm is False: + analysers = [] + break + elif confirm is True: + break + else: + prompt = confirm if isinstance(analysers, list) and len(analysers): self.logger.info(f"selected analysers: {analysers}", plain=True) - # it's better to move to another Task - workflow_config = ( - self._context_manager.get_context("workflow_config") - if self._context_manager.get_context("workflow_config") - else "workflow_config.yaml" - ) - workspace = self._context_manager.get_context("workspace") - with workspace.joinpath(workflow_config).open() as f: - workflow = yaml.safe_load(f) - - model = init_instance_by_config(workflow["task"]["model"]) - dataset = init_instance_by_config(workflow["task"]["dataset"]) - - with R.start(experiment_name="finCo"): - model.fit(dataset) - R.save_objects(trained_model=model) - - # prediction - recorder = R.get_recorder() - sr = SignalRecord(model, dataset, recorder) - sr.generate() tasks = [] for analyser in analysers: if analyser in self.__ANALYZERS_PROJECT.keys(): tasks.append( self.__ANALYZERS_PROJECT.get(analyser)( - workspace=workspace, - model=model, - dataset=dataset, - recorder=recorder, + workspace=self._context_manager.get_context("workspace"), + model=self._context_manager.get_context("model"), + dataset=self._context_manager.get_context("dataset"), + recorder=self._context_manager.get_context("recorder"), ) ) @@ -352,20 +363,22 @@ class CMDTask(ActionTask): self.__class__.__name__, self._output.decode("ANSI") ) + class DifferentiatedComponentActionTask(ActionTask): @property def system(self): return self.prompt_template.__getattribute__(self.__class__.__name__ + "_system_" + self.target_component) - + @property def user(self): return self.prompt_template.__getattribute__(self.__class__.__name__ + "_user_" + self.target_component) + class ConfigActionTask(DifferentiatedComponentActionTask): def __init__(self, component) -> None: super().__init__() self.target_component = component - + def execute(self): user_prompt = self._context_manager.get_context("user_prompt") prompt_element_dict = dict() @@ -378,7 +391,7 @@ class ConfigActionTask(DifferentiatedComponentActionTask): ] = self._context_manager.get_context(f"{component}_plan") assert ( - None not in prompt_element_dict.values() + None not in prompt_element_dict.values() ), "Some decision or plan is not set by plan maker" config_prompt = self.user.render( @@ -562,6 +575,8 @@ class SummarizeTask(Task): user_prompt=prompt_workflow_selection, system_prompt=self.system.render() ) self.save_markdown(content=response) + self.logger.info(f"Report has saved to {self.__DEFAULT_REPORT_NAME}", title="End") + return [] def summarize(self) -> str: @@ -627,4 +642,3 @@ class SummarizeTask(Task): def save_markdown(self, content: str): with open(Path(self.workspace).joinpath(self.__DEFAULT_REPORT_NAME), "w") as f: f.write(content) - self.logger.info(f"report has saved to {self.__DEFAULT_REPORT_NAME}", plain=True) diff --git a/qlib/finco/workflow.py b/qlib/finco/workflow.py index 855bca530..631a0e951 100644 --- a/qlib/finco/workflow.py +++ b/qlib/finco/workflow.py @@ -3,11 +3,8 @@ import copy from pathlib import Path import shutil -from qlib.log import get_module_logger -from qlib.finco.conf import Config -from qlib.finco.utils import parse_json -from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask, RecorderTask -from qlib.finco.log import FinCoLog +from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask, RecorderTask, AnalysisTask +from qlib.finco.log import FinCoLog, LogColors class WorkflowContextManager: @@ -17,7 +14,7 @@ class WorkflowContextManager: def __init__(self) -> None: self.context = {} - self.logger = get_module_logger("fincoWorkflowContextManager") + self.logger = FinCoLog() def set_context(self, key, value): if key in self.context: @@ -47,6 +44,8 @@ class WorkflowManager: """This manange the whole task automation workflow including tasks and actions""" def __init__(self, workspace=None) -> None: + self.logger = FinCoLog() + if workspace is None: self._workspace = Path.cwd() / "finco_workspace" else: @@ -55,16 +54,18 @@ class WorkflowManager: self._context = WorkflowContextManager() self._context.set_context("workspace", self._workspace) self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China a stock market. Please help to pick one third of the factors in Alpha360 and use lightGBM model." - self.fco = FinCoLog() def _confirm_and_rm(self): # if workspace exists, please confirm and remove it. Otherwise exit. if self._workspace.exists(): + self.logger.info(title="Interact") flag = input( - f"Will be deleted: " - f"\n\t{self._workspace}" - f"\nIf you do not need to delete {self._workspace}, please change the workspace dir or rename existing files " - f"\nAre you sure you want to delete, yes(Y/y), no (N/n):" + LogColors().render( + f"Will be deleted: \n\t{self._workspace}\n" + f"If you do not need to delete {self._workspace}," + f" please change the workspace dir or rename existing files\n" + f"Are you sure you want to delete, yes(Y/y), no (N/n):", + color=LogColors.WHITE) ) if str(flag) not in ["Y", "y"]: sys.exit() @@ -103,14 +104,12 @@ class WorkflowManager: # - The generated tasks can't be changed after geting new information from the execution retuls. # - But it is required in some cases, if we want to build a external dataset, it maybe have to plan like autogpt... - cfg = Config() - # NOTE: default user prompt might be changed in the future and exposed to the user if prompt is None: self.set_context("user_prompt", self.default_user_prompt) else: self.set_context("user_prompt", prompt) - self.fco.info(f"user_prompt: {self.get_context().get_context('user_prompt')}", title="Start") + self.logger.info(f"user_prompt: {self.get_context().get_context('user_prompt')}", title="Start") # NOTE: list may not be enough for general task list task_list = [WorkflowTask(), RecorderTask(), SummarizeTask()] @@ -122,19 +121,20 @@ class WorkflowManager: # TODO: sort the task list based on the priority of the task # task_list = sorted(task_list, key=lambda x: x.task_type) t = task_list.pop(0) - self.fco.info(f"Task finished: {[str(task) for task in task_finished]}", - f"Task in queue: {task_list_info}", - f"Executing task: {str(t)}", - title="Task") + self.logger.info(f"Task finished: {[str(task) for task in task_finished]}", + f"Task in queue: {task_list_info}", + f"Executing task: {str(t)}", + title="Task") t.assign_context_manager(self._context) res = t.execute() - if not cfg.continous_mode: - res = t.interact() t.summarize() - if isinstance(t, (WorkflowTask, PlanTask, ActionTask, RecorderTask, SummarizeTask)): - task_list = res + task_list - else: - raise NotImplementedError(f"Unsupported Task type {t}") task_finished.append(t) + self.logger.plain_info(f"{str(t)} finished.\n\n\n") + + for _ in res: + if not isinstance(t, (WorkflowTask, PlanTask, ActionTask, RecorderTask, AnalysisTask, SummarizeTask)): + raise NotImplementedError(f"Unsupported Task type {_}") + task_list = res + task_list + return self._workspace