1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-03 11:00:57 +08:00

Add analyser task and optimize interact (#1552)

* * optimize interact
* add AnalyserTask
* optimize logger format and add render feature

* format optimize
This commit is contained in:
Fivele-Li
2023-06-16 11:42:45 +08:00
committed by GitHub
parent a70386ad52
commit f12184cc0f
5 changed files with 184 additions and 136 deletions

View File

@@ -24,7 +24,7 @@ class Config(Singleton):
self.max_retry = int(os.getenv("MAX_RETRY")) if os.getenv("MAX_RETRY") is not None else None
self.continous_mode = (
self.continuous_mode = (
os.getenv("CONTINOUS_MODE") == "True" if os.getenv("CONTINOUS_MODE") is not None else False
)
self.debug_mode = os.getenv("DEBUG_MODE") == "True" if os.getenv("DEBUG_MODE") is not None else False

View File

@@ -5,7 +5,6 @@ import logging
from typing import Dict, List
from qlib.finco.utils import Singleton
from qlib.log import get_module_logger
from contextlib import contextmanager
@@ -19,12 +18,38 @@ class LogColors:
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
BOLD = "\033[1m"
END = "\033[0m"
WHITE = "\033[97m"
GRAY = "\033[90m"
BLACK = "\033[30m"
# TODO: Provide better interface to render text. (e.g. render(text, color.., style ..))
BOLD = "\033[1m"
ITALIC = "\033[3m"
END = "\033[0m"
@classmethod
def get_all_colors(cls):
names = dir(cls)
names = [name for name in names if not name.startswith("__") and not callable(getattr(cls, name))]
var_values = [getattr(cls, name) for name in names]
return var_values
def render(self, text: str, color: str = "", style: str = ""):
"""
render text by input color and style. It's not recommend that input text is already rendered.
"""
# This method is called too frequently, which is not good.
colors = self.get_all_colors()
# Perhaps color and font should be distinguished here.
if color:
assert color in colors, f"color should be in: {colors} but now is: {color}"
if style:
assert style in colors, f"style should be in: {colors} but now is: {style}"
text = f"{color}{text}{self.END}"
text = f"{style}{text}{self.END}"
return text
@contextmanager
@@ -32,8 +57,9 @@ def formatting_log(logger, title="Info"):
"""
a context manager, print liens before and after a function
"""
length = {"Start": 120, "Task": 120, "Info": 60}.get(title, 60)
color, bold = (LogColors.YELLOW, LogColors.BOLD) if title in ["Start", "Info", "Task"] else (LogColors.CYAN, "")
length = {"Start": 120, "Task": 120, "Info": 60, "Interact": 60, "End": 120}.get(title, 60)
color, bold = (LogColors.YELLOW, LogColors.BOLD) \
if title in ["Start", "Task", "Info", "Interact", "End"] else (LogColors.CYAN, "")
logger.info("")
logger.info(f"{color}{bold}{'-'} {title} {'-' * (length - len(title))}{LogColors.END}")
yield
@@ -44,7 +70,6 @@ class FinCoLog(Singleton):
# TODO:
# - config to file logger and save it into workspace
def __init__(self) -> None:
# self.logger = get_module_logger("interactive")
self.logger = logging.Logger("interactive")
# TODO: merge these with Qlib's default logger.
# We can do the same thing by changing the default log dict of Qlib.
@@ -74,7 +99,7 @@ class FinCoLog(Singleton):
f"{LogColors.MAGENTA}{LogColors.BOLD}Role:{LogColors.END} "
f"{LogColors.CYAN}{m['role']}{LogColors.END}\n"
+ f"{LogColors.MAGENTA}{LogColors.BOLD}Content:{LogColors.END} "
f"{LogColors.CYAN}{m['content']}{LogColors.END}")
f"{LogColors.CYAN}{m['content']}{LogColors.END}\n")
def log_response(self, response: str):
with formatting_log(self.logger, "GPT Response"):
@@ -92,6 +117,15 @@ class FinCoLog(Singleton):
def plain_info(self, *args):
for arg in args:
# self.logger.info(arg)
self.logger.info(
f"{LogColors.YELLOW}{LogColors.BOLD}Info:{LogColors.END}{LogColors.WHITE}{arg}{LogColors.END}")
def warning(self, *args):
for arg in args:
self.logger.warning(
f"{LogColors.BLUE}{LogColors.BOLD}Warning:{LogColors.END}{arg}")
def error(self, *args):
for arg in args:
self.logger.error(
f"{LogColors.RED}{LogColors.BOLD}Error:{LogColors.END}{arg}")

View File

@@ -53,7 +53,7 @@ SLPlanTask_user : |-
Please provide the 6 crucial components in Qlib (Dataset, DataHandler, Model, Record, Strategy, Backtest) ensureing the workflow can meet the user's requirements.
Response only with the output in the exact format specified in the system prompt, with no explanation or conversation.
RecorderTask_system : |-
AnalysisTask_system : |-
You are an expert system administrator.
Your task is to select the best analysis class based on user intent from this list:
{{ANALYZERS_list}}
@@ -63,7 +63,7 @@ RecorderTask_system : |-
Response only with the Analyser name provided above with no explanation or conversation. if there are more than
one analyser, separate them by ","
RecorderTask_user : |-
AnalysisTask_user : |-
{{user_prompt}},
The analyzers you select should separate by ",", such as: "HFAnalyzer", "SignalAnalyzer"

View File

@@ -3,15 +3,12 @@ import os
from pathlib import Path
import io
from typing import Any, List, Union
from jinja2 import Template
import ruamel.yaml as yaml
import abc
import re
import logging
import subprocess
import platform
from qlib.log import get_module_logger
from qlib.finco.llm import APIBackend
from qlib.finco.tpl import get_tpl_path
from qlib.finco.prompt_template import PormptTemplate
@@ -19,10 +16,12 @@ from qlib.workflow.record_temp import HFSignalRecord, SignalRecord
from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer
from qlib.utils import init_instance_by_config
from qlib.workflow import R
from qlib.finco.log import FinCoLog
from qlib.finco.log import FinCoLog, LogColors
from qlib.finco.conf import Config
COMPONENT_LIST = ["Dataset", "DataHandler", "Model", "Record", "Strategy", "Backtest"]
class Task:
"""
The user's intention, which was initially represented by a prompt, is achieved through a sequence of tasks.
@@ -44,9 +43,7 @@ class Task:
self._context_manager = None
self.prompt_template = PormptTemplate()
self.executed = False
# self.logger: logging.Logger = get_module_logger(
# f"finco.{self.__class__.__name__}"
# )
self.continuous = Config().continuous_mode
self.logger = FinCoLog()
def summarize(self) -> str:
@@ -74,14 +71,28 @@ class Task:
"""All sub classes should implement the execute method to determine the next task"""
raise NotImplementedError
@abc.abstractclassmethod
def interact(self) -> Any:
"""The user can interact with the task"""
"""All sub classes should implement the interact method to determine the next task"""
"""In continous mode, this method will not be called and the next task will be determined by the execution method only"""
raise NotImplementedError(
"The interact method is not implemented, but workflow not in continous mode"
)
def interact(self, prompt: str, **kwargs) -> Any:
"""
The user can interact with the task. This method only handle business in current task. It will return True
while continuous is True. This method will return user input if input cannot be parsed as 'yes' or 'no'.
@return True, False, str
"""
self.logger.info(title="Interact")
if self.continuous:
return True
try:
answer = input(prompt)
except KeyboardInterrupt:
self.logger.info("User has exited the program.")
exit()
if answer.lower().strip() in ["y", "yes"]:
return True
elif answer.lower().strip() in ["n", "no"]:
return False
else:
return answer
@property
def system(self):
@@ -93,11 +104,6 @@ class Task:
def user(self):
return self.prompt_template.__getattribute__(self.__class__.__name__ + "_user")
@staticmethod
def confirm(prompt: str):
answer = input(prompt)
return answer
def __str__(self):
return self.__class__.__name__
@@ -105,14 +111,10 @@ class Task:
class WorkflowTask(Task):
"""This task is supposed to be the first task of the workflow"""
def __init__(
self,
) -> None:
def __init__(self) -> None:
super().__init__()
def execute(
self,
) -> List[Task]:
def execute(self) -> List[Task]:
"""make the choice which main workflow (RL, SL) will be used"""
user_prompt = self._context_manager.get_context("user_prompt")
prompt_workflow_selection = self.user.render(user_prompt=user_prompt)
@@ -125,10 +127,16 @@ class WorkflowTask(Task):
workflow = response.split(":")[1].strip().lower()
self.executed = True
self._context_manager.set_context("workflow", workflow)
answer = self.confirm(f"I select this workflow: {workflow}\n"
f"Are you sure you want to use? yes(Y/y), no(N/n):")
if str(answer) not in ["Y", "y"]:
confirm = self.interact(
f"The workflow has been determined to be: "
f"{LogColors().render(workflow, color=LogColors.YELLOW, style=LogColors.BOLD)}\n"
f"Enter 'y' to authorise command,'s' to run self-feedback commands, "
f"'n' to exit program, or enter feedback for WorkflowTask: "
)
if confirm is False:
return []
if workflow == "supervised learning":
return [SLPlanTask()]
elif workflow == "reinforcement learning":
@@ -136,43 +144,18 @@ class WorkflowTask(Task):
else:
raise ValueError(f"The workflow: {workflow} is not supported")
def interact(self) -> Any:
assert self.executed == True, "The workflow task has not been executed yet"
## TODO use logger
self.logger.info(
f"The workflow has been determined to be ---{self._context_manager.get_context('workflow')}---"
)
self.logger.info(
"Enter 'y' to authorise command,'s' to run self-feedback commands, "
"'n' to exit program, or enter feedback for WorkflowTask"
)
try:
answer = input("You answer is:")
except KeyboardInterrupt:
self.logger.info("User has exited the program")
exit()
if answer.lower().strip() == "y":
return
else:
# TODO add self feedback
raise ValueError("The input cannot be interpreted as a valid input")
class PlanTask(Task):
pass
class SLPlanTask(PlanTask):
def __init__(
self,
) -> None:
def __init__(self,) -> None:
super().__init__()
def execute(self):
workflow = self._context_manager.get_context("workflow")
assert (
workflow == "supervised learning"
), "The workflow is not supervised learning"
assert (workflow == "supervised learning"), "The workflow is not supervised learning"
user_prompt = self._context_manager.get_context("user_prompt")
assert user_prompt is not None, "The user prompt is not provided"
@@ -223,7 +206,7 @@ class SLPlanTask(PlanTask):
class RLPlanTask(PlanTask):
def __init__(
self,
self,
) -> None:
super().__init__()
self.logger.error("The RL task is not implemented yet")
@@ -242,6 +225,51 @@ class RecorderTask(Task):
This Recorder task is responsible for analysing data such as index and distribution.
"""
def __init__(self):
super().__init__()
def execute(self):
workflow_config = (
self._context_manager.get_context("workflow_config")
if self._context_manager.get_context("workflow_config")
else "workflow_config.yaml"
)
workspace = self._context_manager.get_context("workspace")
workflow_path = workspace.joinpath(workflow_config)
with workflow_path.open() as f:
workflow = yaml.safe_load(f)
confirm = self.interact(f"I select this workflow file: "
f"{LogColors().render(workflow_path, color=LogColors.YELLOW, style=LogColors.BOLD)}\n"
f"{yaml.dump(workflow, default_flow_style=False)}"
f"Are you sure you want to use? yes(Y/y), no(N/n):")
if confirm is False:
return []
model = init_instance_by_config(workflow["task"]["model"])
dataset = init_instance_by_config(workflow["task"]["dataset"])
with R.start(experiment_name="finCo"):
model.fit(dataset)
R.save_objects(trained_model=model)
# prediction
recorder = R.get_recorder()
sr = SignalRecord(model, dataset, recorder)
sr.generate()
self._context_manager.set_context("model", model)
self._context_manager.set_context("dataset", dataset)
self._context_manager.set_context("recorder", recorder)
return [AnalysisTask()]
class AnalysisTask(Task):
"""
This Recorder task is responsible for analysing data such as index and distribution.
"""
__ANALYZERS_PROJECT = {
HFAnalyzer.__name__: HFSignalRecord,
SignalAnalyzer.__name__: SignalRecord,
@@ -250,12 +278,9 @@ class RecorderTask(Task):
HFAnalyzer.__name__: HFAnalyzer.__doc__,
SignalAnalyzer.__name__: SignalAnalyzer.__doc__,
}
# __ANALYZERS_PROJECT = {SignalAnalyzer.__name__: SignalRecord}
# __ANALYZERS_DOCS = {SignalAnalyzer.__name__: SignalAnalyzer.__doc__}
def __init__(self):
super().__init__()
self._output = None
def execute(self):
prompt = self.user.render(
@@ -263,52 +288,38 @@ class RecorderTask(Task):
)
be = APIBackend()
be.debug_mode = False
response = be.build_messages_and_create_chat_completion(
prompt,
self.system.render(
ANALYZERS_list=list(self.__ANALYZERS_DOCS.keys()),
ANALYZERS_DOCS=self.__ANALYZERS_DOCS,
),
)
analysers = response.split(",")
answer = self.confirm(f"I select these analysers: {analysers}\nAre you sure you want to use? yes(Y/y), no(N/n):")
if str(answer) not in ["Y", "y"]:
analysers = []
while True:
response = be.build_messages_and_create_chat_completion(
prompt,
self.system.render(
ANALYZERS_list=list(self.__ANALYZERS_DOCS.keys()),
ANALYZERS_DOCS=self.__ANALYZERS_DOCS,
),
)
analysers = response.split(",")
confirm = self.interact(f"I select these analysers: {analysers}\n"
f"Are you sure you want to use? yes(Y/y), no(N/n) or prompt:")
if confirm is False:
analysers = []
break
elif confirm is True:
break
else:
prompt = confirm
if isinstance(analysers, list) and len(analysers):
self.logger.info(f"selected analysers: {analysers}", plain=True)
# it's better to move to another Task
workflow_config = (
self._context_manager.get_context("workflow_config")
if self._context_manager.get_context("workflow_config")
else "workflow_config.yaml"
)
workspace = self._context_manager.get_context("workspace")
with workspace.joinpath(workflow_config).open() as f:
workflow = yaml.safe_load(f)
model = init_instance_by_config(workflow["task"]["model"])
dataset = init_instance_by_config(workflow["task"]["dataset"])
with R.start(experiment_name="finCo"):
model.fit(dataset)
R.save_objects(trained_model=model)
# prediction
recorder = R.get_recorder()
sr = SignalRecord(model, dataset, recorder)
sr.generate()
tasks = []
for analyser in analysers:
if analyser in self.__ANALYZERS_PROJECT.keys():
tasks.append(
self.__ANALYZERS_PROJECT.get(analyser)(
workspace=workspace,
model=model,
dataset=dataset,
recorder=recorder,
workspace=self._context_manager.get_context("workspace"),
model=self._context_manager.get_context("model"),
dataset=self._context_manager.get_context("dataset"),
recorder=self._context_manager.get_context("recorder"),
)
)
@@ -352,20 +363,22 @@ class CMDTask(ActionTask):
self.__class__.__name__, self._output.decode("ANSI")
)
class DifferentiatedComponentActionTask(ActionTask):
@property
def system(self):
return self.prompt_template.__getattribute__(self.__class__.__name__ + "_system_" + self.target_component)
@property
def user(self):
return self.prompt_template.__getattribute__(self.__class__.__name__ + "_user_" + self.target_component)
class ConfigActionTask(DifferentiatedComponentActionTask):
def __init__(self, component) -> None:
super().__init__()
self.target_component = component
def execute(self):
user_prompt = self._context_manager.get_context("user_prompt")
prompt_element_dict = dict()
@@ -378,7 +391,7 @@ class ConfigActionTask(DifferentiatedComponentActionTask):
] = self._context_manager.get_context(f"{component}_plan")
assert (
None not in prompt_element_dict.values()
None not in prompt_element_dict.values()
), "Some decision or plan is not set by plan maker"
config_prompt = self.user.render(
@@ -562,6 +575,8 @@ class SummarizeTask(Task):
user_prompt=prompt_workflow_selection, system_prompt=self.system.render()
)
self.save_markdown(content=response)
self.logger.info(f"Report has saved to {self.__DEFAULT_REPORT_NAME}", title="End")
return []
def summarize(self) -> str:
@@ -627,4 +642,3 @@ class SummarizeTask(Task):
def save_markdown(self, content: str):
with open(Path(self.workspace).joinpath(self.__DEFAULT_REPORT_NAME), "w") as f:
f.write(content)
self.logger.info(f"report has saved to {self.__DEFAULT_REPORT_NAME}", plain=True)

View File

@@ -3,11 +3,8 @@ import copy
from pathlib import Path
import shutil
from qlib.log import get_module_logger
from qlib.finco.conf import Config
from qlib.finco.utils import parse_json
from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask, RecorderTask
from qlib.finco.log import FinCoLog
from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask, RecorderTask, AnalysisTask
from qlib.finco.log import FinCoLog, LogColors
class WorkflowContextManager:
@@ -17,7 +14,7 @@ class WorkflowContextManager:
def __init__(self) -> None:
self.context = {}
self.logger = get_module_logger("fincoWorkflowContextManager")
self.logger = FinCoLog()
def set_context(self, key, value):
if key in self.context:
@@ -47,6 +44,8 @@ class WorkflowManager:
"""This manange the whole task automation workflow including tasks and actions"""
def __init__(self, workspace=None) -> None:
self.logger = FinCoLog()
if workspace is None:
self._workspace = Path.cwd() / "finco_workspace"
else:
@@ -55,16 +54,18 @@ class WorkflowManager:
self._context = WorkflowContextManager()
self._context.set_context("workspace", self._workspace)
self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China a stock market. Please help to pick one third of the factors in Alpha360 and use lightGBM model."
self.fco = FinCoLog()
def _confirm_and_rm(self):
# if workspace exists, please confirm and remove it. Otherwise exit.
if self._workspace.exists():
self.logger.info(title="Interact")
flag = input(
f"Will be deleted: "
f"\n\t{self._workspace}"
f"\nIf you do not need to delete {self._workspace}, please change the workspace dir or rename existing files "
f"\nAre you sure you want to delete, yes(Y/y), no (N/n):"
LogColors().render(
f"Will be deleted: \n\t{self._workspace}\n"
f"If you do not need to delete {self._workspace},"
f" please change the workspace dir or rename existing files\n"
f"Are you sure you want to delete, yes(Y/y), no (N/n):",
color=LogColors.WHITE)
)
if str(flag) not in ["Y", "y"]:
sys.exit()
@@ -103,14 +104,12 @@ class WorkflowManager:
# - The generated tasks can't be changed after geting new information from the execution retuls.
# - But it is required in some cases, if we want to build a external dataset, it maybe have to plan like autogpt...
cfg = Config()
# NOTE: default user prompt might be changed in the future and exposed to the user
if prompt is None:
self.set_context("user_prompt", self.default_user_prompt)
else:
self.set_context("user_prompt", prompt)
self.fco.info(f"user_prompt: {self.get_context().get_context('user_prompt')}", title="Start")
self.logger.info(f"user_prompt: {self.get_context().get_context('user_prompt')}", title="Start")
# NOTE: list may not be enough for general task list
task_list = [WorkflowTask(), RecorderTask(), SummarizeTask()]
@@ -122,19 +121,20 @@ class WorkflowManager:
# TODO: sort the task list based on the priority of the task
# task_list = sorted(task_list, key=lambda x: x.task_type)
t = task_list.pop(0)
self.fco.info(f"Task finished: {[str(task) for task in task_finished]}",
f"Task in queue: {task_list_info}",
f"Executing task: {str(t)}",
title="Task")
self.logger.info(f"Task finished: {[str(task) for task in task_finished]}",
f"Task in queue: {task_list_info}",
f"Executing task: {str(t)}",
title="Task")
t.assign_context_manager(self._context)
res = t.execute()
if not cfg.continous_mode:
res = t.interact()
t.summarize()
if isinstance(t, (WorkflowTask, PlanTask, ActionTask, RecorderTask, SummarizeTask)):
task_list = res + task_list
else:
raise NotImplementedError(f"Unsupported Task type {t}")
task_finished.append(t)
self.logger.plain_info(f"{str(t)} finished.\n\n\n")
for _ in res:
if not isinstance(t, (WorkflowTask, PlanTask, ActionTask, RecorderTask, AnalysisTask, SummarizeTask)):
raise NotImplementedError(f"Unsupported Task type {_}")
task_list = res + task_list
return self._workspace