From 1a523df0071a3af3c127bc21b626195b7e5af2fb Mon Sep 17 00:00:00 2001 From: Fivele-Li <128388363+Fivele-Li@users.noreply.github.com> Date: Wed, 14 Jun 2023 14:48:17 +0800 Subject: [PATCH] Optimize log and interact of FinCo (#1549) * use FinCoLog for a better interact experience * addition file changes * optimize format * optimize format --- qlib/finco/llm.py | 2 +- qlib/finco/log.py | 73 +++++++++++++++++++++++++++++++------ qlib/finco/task.py | 83 +++++++++++++++++++++++++----------------- qlib/finco/workflow.py | 11 +++++- 4 files changed, 122 insertions(+), 47 deletions(-) diff --git a/qlib/finco/llm.py b/qlib/finco/llm.py index b03a5d34c..d42cfec14 100644 --- a/qlib/finco/llm.py +++ b/qlib/finco/llm.py @@ -50,7 +50,7 @@ class APIBackend(Singleton): fcl = FinCoLog() response = self.try_create_chat_completion(messages=messages) fcl.log_message(messages) - fcl.info(response) + fcl.log_response(response) return response def try_create_chat_completion(self, max_retry=10, **kwargs): diff --git a/qlib/finco/log.py b/qlib/finco/log.py index b493ba87f..ca01778c4 100644 --- a/qlib/finco/log.py +++ b/qlib/finco/log.py @@ -1,19 +1,42 @@ """ This module will base on Qlib's logger module and provides some interactive functions. """ -from email import message_from_binary_file +import logging + from typing import Dict, List from qlib.finco.utils import Singleton from qlib.log import get_module_logger from contextlib import contextmanager -# a context manager, print liens before and after a function + +class LogColors: + """ + ANSI color codes for use in console output. + """ + RED = "\033[91m" + GREEN = "\033[92m" + YELLOW = "\033[93m" + BLUE = "\033[94m" + MAGENTA = "\033[95m" + CYAN = "\033[96m" + BOLD = "\033[1m" + END = "\033[0m" + WHITE = "\033[97m" + GRAY = "\033[90m" + BLACK = "\033[30m" + # TODO: Provide better interface to render text. (e.g. render(text, color.., style ..)) + + @contextmanager -def formating_log(logger, text="Interaction"): +def formatting_log(logger, title="Info"): + """ + a context manager, print liens before and after a function + """ + length = {"Start": 120, "Task": 120, "Info": 60}.get(title, 60) + color, bold = (LogColors.YELLOW, LogColors.BOLD) if title in ["Start", "Info", "Task"] else (LogColors.CYAN, "") logger.info("") - logger.info("=" * 20 + f" BEGIN:{text} " + "=" * 20) + logger.info(f"{color}{bold}{'-'} {title} {'-' * (length - len(title))}{LogColors.END}") yield - logger.info("=" * 20 + f" END: {text} " + "=" * 20) logger.info("") @@ -21,7 +44,16 @@ class FinCoLog(Singleton): # TODO: # - config to file logger and save it into workspace def __init__(self) -> None: - self.logger = get_module_logger("interactive") + # self.logger = get_module_logger("interactive") + self.logger = logging.Logger("interactive") + # TODO: merge these with Qlib's default logger. + # We can do the same thing by changing the default log dict of Qlib. + # Reference: https://github.com/microsoft/qlib/blob/main/qlib/config.py#L155 + + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter("%(message)s")) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) def log_message(self, messages: List[Dict[str, str]]): """ @@ -36,13 +68,30 @@ class FinCoLog(Singleton): }, ] """ - with formating_log(self.logger): + with formatting_log(self.logger, "GPT Messages"): for m in messages: - self.logger.info(f"Role: {m['role']}") - self.logger.info(f"Content: {m['content']}") + self.logger.info( + f"{LogColors.MAGENTA}{LogColors.BOLD}Role:{LogColors.END} " + f"{LogColors.CYAN}{m['role']}{LogColors.END}\n" + + f"{LogColors.MAGENTA}{LogColors.BOLD}Content:{LogColors.END} " + f"{LogColors.CYAN}{m['content']}{LogColors.END}") + + def log_response(self, response: str): + with formatting_log(self.logger, "GPT Response"): + self.logger.info( + f"{LogColors.CYAN}{response}{LogColors.END}\n") # TODO: # It looks wierd if we only have logger - def info(self, *args, **kwargs): - with formating_log(self.logger, "info"): - self.logger.info(*args, **kwargs) + def info(self, *args, plain=False, title="Info"): + if plain: + return self.plain_info(*args) + with formatting_log(self.logger, title): + for arg in args: + self.logger.info(f"{LogColors.WHITE}{arg}{LogColors.END}") + + def plain_info(self, *args): + for arg in args: + # self.logger.info(arg) + self.logger.info( + f"{LogColors.YELLOW}{LogColors.BOLD}Info:{LogColors.END}{LogColors.WHITE}{arg}{LogColors.END}") diff --git a/qlib/finco/task.py b/qlib/finco/task.py index 3486e9a91..ba9decc2c 100644 --- a/qlib/finco/task.py +++ b/qlib/finco/task.py @@ -19,6 +19,7 @@ from qlib.workflow.record_temp import HFSignalRecord, SignalRecord from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer from qlib.utils import init_instance_by_config from qlib.workflow import R +from qlib.finco.log import FinCoLog class Task: @@ -42,14 +43,15 @@ class Task: self._context_manager = None self.prompt_template = PormptTemplate() self.executed = False - self.logger: logging.Logger = get_module_logger( - f"finco.{self.__class__.__name__}" - ) + # self.logger: logging.Logger = get_module_logger( + # f"finco.{self.__class__.__name__}" + # ) + self.logger = FinCoLog() def summarize(self) -> str: """After the execution of the task, it is supposed to generated some context about the execution""" """This function might be converted to abstract method in the future""" - self.logger.info("The method has nothing to summarize") + self.logger.info(f"{self.__class__.__name__}: The task has nothing to summarize", plain=True) def assign_context_manager(self, context_manager): """assign the workflow context manager to the task""" @@ -90,6 +92,14 @@ class Task: def user(self): return self.prompt_template.__getattribute__(self.__class__.__name__ + "_user") + @staticmethod + def confirm(prompt: str): + answer = input(prompt) + return answer + + def __str__(self): + return self.__class__.__name__ + class WorkflowTask(Task): """This task is supposed to be the first task of the workflow""" @@ -114,6 +124,10 @@ class WorkflowTask(Task): workflow = response.split(":")[1].strip().lower() self.executed = True self._context_manager.set_context("workflow", workflow) + answer = self.confirm(f"I select this workflow: {workflow}\n" + f"Are you sure you want to use? yes(Y/y), no(N/n):") + if str(answer) not in ["Y", "y"]: + return [] if workflow == "supervised learning": return [SLPlanTask()] elif workflow == "reinforcement learning": @@ -254,32 +268,36 @@ class RecorderTask(Task): ANALYZERS_DOCS=self.__ANALYZERS_DOCS, ), ) - - # it's better to move to another Task - workflow_config = ( - self._context_manager.get_context("workflow_config") - if self._context_manager.get_context("workflow_config") - else "workflow_config.yaml" - ) - workspace = self._context_manager.get_context("workspace") - with workspace.joinpath(workflow_config).open() as f: - workflow = yaml.safe_load(f) - - model = init_instance_by_config(workflow["task"]["model"]) - dataset = init_instance_by_config(workflow["task"]["dataset"]) - - with R.start(experiment_name="finCo"): - model.fit(dataset) - R.save_objects(trained_model=model) - - # prediction - recorder = R.get_recorder() - sr = SignalRecord(model, dataset, recorder) - sr.generate() - analysers = response.split(",") - if isinstance(analysers, list): - self.logger.info(f"selected analysers: {analysers}") + + answer = self.confirm(f"I select these analysers: {analysers}\nAre you sure you want to use? yes(Y/y), no(N/n):") + if str(answer) not in ["Y", "y"]: + analysers = [] + + if isinstance(analysers, list) and len(analysers): + self.logger.info(f"selected analysers: {analysers}", plain=True) + # it's better to move to another Task + workflow_config = ( + self._context_manager.get_context("workflow_config") + if self._context_manager.get_context("workflow_config") + else "workflow_config.yaml" + ) + workspace = self._context_manager.get_context("workspace") + with workspace.joinpath(workflow_config).open() as f: + workflow = yaml.safe_load(f) + + model = init_instance_by_config(workflow["task"]["model"]) + dataset = init_instance_by_config(workflow["task"]["dataset"]) + + with R.start(experiment_name="finCo"): + model.fit(dataset) + R.save_objects(trained_model=model) + + # prediction + recorder = R.get_recorder() + sr = SignalRecord(model, dataset, recorder) + sr.generate() + tasks = [] for analyser in analysers: if analyser in self.__ANALYZERS_PROJECT.keys(): @@ -578,7 +596,7 @@ class SummarizeTask(Task): if postfix in ["py", "log", "yaml"]: with open(file) as f: content = f.read() - self.logger.info(f"file to summarize: {file}") + self.logger.info(f"file to summarize: {file}", plain=True) # in case of too large file # TODO: Perhaps summarization method instead of truncation would be a better approach result.append( @@ -612,11 +630,10 @@ class SummarizeTask(Task): for filename in files: postfix = filename.split(".")[-1] if postfix in ["jpeg"]: - file_path = os.path.join(root, filename) - file_list.append(str(Path(file_path).relative_to(self.workspace))) + file_list.append(str(Path(self.workspace).joinpath(filename))) return file_list def save_markdown(self, content: str): with open(Path(self.workspace).joinpath(self.__DEFAULT_REPORT_NAME), "w") as f: f.write(content) - self.logger.info(f"report has saved to {self.__DEFAULT_REPORT_NAME}") + self.logger.info(f"report has saved to {self.__DEFAULT_REPORT_NAME}", plain=True) diff --git a/qlib/finco/workflow.py b/qlib/finco/workflow.py index 13d31a845..20c0b6133 100644 --- a/qlib/finco/workflow.py +++ b/qlib/finco/workflow.py @@ -110,15 +110,23 @@ class WorkflowManager: self.set_context("user_prompt", self.default_user_prompt) else: self.set_context("user_prompt", prompt) + self.fco.info(f"user_prompt: {self.get_context().get_context('user_prompt')}", title="Start") # NOTE: list may not be enough for general task list task_list = [WorkflowTask(), RecorderTask(), SummarizeTask()] + task_finished = [] while len(task_list): - self.fco.info(f"Current Task List: {task_list}") + task_list_info = [str(task) for task in task_list] + # task list is not long, so sort it is not a big problem # TODO: sort the task list based on the priority of the task # task_list = sorted(task_list, key=lambda x: x.task_type) t = task_list.pop(0) + self.fco.info(f"Task finished: {[str(task) for task in task_finished]}", + f"Task in queue: {task_list_info}", + f"Executing task: {str(t)}", + title="Task") + t.assign_context_manager(self._context) res = t.execute() if not cfg.continous_mode: @@ -128,4 +136,5 @@ class WorkflowManager: task_list = res + task_list else: raise NotImplementedError(f"Unsupported Task type {t}") + task_finished.append(t) return self._workspace