1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-01 18:11:18 +08:00

Add recorder task and visualize (#1542)

* add recorder task

* add batch generate summarize report unittest.

* * add recorder to RecorderTask;
* add matplot figure to analyzer.py

* add image to markdown;

* Add some log

* update figure path.

---------

Co-authored-by: Young <afe.young@gmail.com>
Co-authored-by: Cadenza-Li <362237642@qq.com>
This commit is contained in:
Fivele-Li
2023-06-12 15:48:00 +08:00
committed by GitHub
parent ad7498e287
commit 1d88830b0d
7 changed files with 330 additions and 22 deletions

104
qlib/contrib/analyzer.py Normal file
View File

@@ -0,0 +1,104 @@
import logging
import matplotlib.pyplot as plt
from pathlib import Path
import numpy as np
from qlib.utils import class_casting
from ..data.dataset import DatasetH
from ..data.dataset.handler import DataHandlerLP
from ..log import get_module_logger
from ..contrib.eva.alpha import calc_ic, calc_long_short_return, calc_long_short_prec
logger = get_module_logger("analysis", logging.INFO)
class AnalyzerTemp:
def __init__(self, workspace=None, **kwargs):
self.workspace = Path(workspace) if workspace else "./"
def analyse(self, **kwargs):
"""
Analyse data index, distribution .etc
Parameters
----------
Return
------
The handled data.
"""
raise NotImplementedError(f"Please implement the `analysis` method.")
class HFAnalyzer(AnalyzerTemp):
"""
This is the Signal Analysis class that generates the analysis results such as IC and IR.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def analyse(self, pred=None, label=None):
long_pre, short_pre = calc_long_short_prec(pred.iloc[:, 0], label.iloc[:, 0], is_alpha=True)
ic, ric = calc_ic(pred.iloc[:, 0], label.iloc[:, 0])
metrics = {
"IC": ic.mean(),
"ICIR": ic.mean() / ic.std(),
"Rank IC": ric.mean(),
"Rank ICIR": ric.mean() / ric.std(),
"Long precision": long_pre.mean(),
"Short precision": short_pre.mean(),
}
long_short_r, long_avg_r = calc_long_short_return(pred.iloc[:, 0], label.iloc[:, 0])
metrics.update(
{
"Long-Short Average Return": long_short_r.mean(),
"Long-Short Average Sharpe": long_short_r.mean() / long_short_r.std(),
}
)
table = [[k, v] for (k, v) in metrics.items()]
plt.table(cellText=table, loc="center")
plt.axis("off")
plt.savefig(self.workspace.joinpath("HFAnalyzerTable.jpeg"))
plt.clf()
plt.scatter(np.arange(0, len(pred)), pred.iloc[:, 0])
plt.scatter(np.arange(0, len(label)), label.iloc[:, 0])
plt.title("HFAnalyzer")
plt.savefig(self.workspace.joinpath("HFAnalyzer.jpeg"))
class SignalAnalyzer(AnalyzerTemp):
"""
This is the Signal Analysis class that generates the analysis results such as IC and IR.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def analyse(self, dataset=None, **kwargs):
with class_casting(dataset, DatasetH):
params = dict(segments="test", col_set="label", data_key=DataHandlerLP.DK_R)
try:
# Assume the backend handler is DataHandlerLP
raw_label = dataset.prepare(**params)
except TypeError:
# The argument number is not right
del params["data_key"]
# The backend handler should be DataHandler
raw_label = dataset.prepare(**params)
except AttributeError as e:
# The data handler is initialized with `drop_raw=True`...
# So raw_label is not available
logger.warning(f"Exception: {e}")
raw_label = None
plt.hist(raw_label)
plt.title("SignalAnalyzer")
plt.savefig(self.workspace.joinpath("signalAnalysis.jpeg"))
return raw_label

View File

@@ -3,9 +3,9 @@ import time
import openai
import json
from typing import Optional
from qlib.log import get_module_logger
from qlib.finco.conf import Config
from qlib.finco.utils import Singleton
from qlib.finco.log import FinCoLog
class APIBackend(Singleton):
@@ -47,7 +47,10 @@ class APIBackend(Singleton):
"content": user_prompt,
},
]
fcl = FinCoLog()
response = self.try_create_chat_completion(messages=messages)
fcl.log_message(messages)
fcl.info(response)
return response
def try_create_chat_completion(self, max_retry=10, **kwargs):

48
qlib/finco/log.py Normal file
View File

@@ -0,0 +1,48 @@
"""
This module will base on Qlib's logger module and provides some interactive functions.
"""
from email import message_from_binary_file
from typing import Dict, List
from qlib.finco.utils import Singleton
from qlib.log import get_module_logger
from contextlib import contextmanager
# a context manager, print liens before and after a function
@contextmanager
def formating_log(logger, text="Interaction"):
logger.info("")
logger.info("=" * 20 + f" BEGIN:{text} " + "=" * 20)
yield
logger.info("=" * 20 + f" END: {text} " + "=" * 20)
logger.info("")
class FinCoLog(Singleton):
# TODO:
# - config to file logger and save it into workspace
def __init__(self) -> None:
self.logger = get_module_logger("interactive")
def log_message(self, messages: List[Dict[str, str]]):
"""
messages is some info like this [
{
"role": "system",
"content": system_prompt,
},
{
"role": "user",
"content": user_prompt,
},
]
"""
with formating_log(self.logger):
for m in messages:
self.logger.info(f"Role: {m['role']}")
self.logger.info(f"Content: {m['content']}")
# TODO:
# It looks wierd if we only have logger
def info(self, *args, **kwargs):
with formating_log(self.logger, "info"):
self.logger.info(*args, **kwargs)

View File

@@ -14,6 +14,10 @@ import platform
from qlib.log import get_module_logger
from qlib.finco.llm import APIBackend
from qlib.finco.tpl import get_tpl_path
from qlib.workflow.record_temp import HFSignalRecord, SignalRecord
from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer
from qlib.utils import init_instance_by_config
from qlib.workflow import R
class Task:
@@ -247,6 +251,85 @@ class RLPlanTask(PlanTask):
return []
class RecorderTask(Task):
"""
This Recorder task is responsible for analysing data such as index and distribution.
"""
__ANALYZERS_PROJECT = {HFAnalyzer.__name__: HFSignalRecord, SignalAnalyzer.__name__: SignalRecord}
__ANALYZERS_DOCS = {HFAnalyzer.__name__: HFAnalyzer.__doc__, SignalAnalyzer.__name__: SignalAnalyzer.__doc__}
# __ANALYZERS_PROJECT = {SignalAnalyzer.__name__: SignalRecord}
# __ANALYZERS_DOCS = {SignalAnalyzer.__name__: SignalAnalyzer.__doc__}
__DEFAULT_WORKFLOW_SYSTEM_PROMPT = f"""
You are an expert system administrator.
Your task is to select the best analysis class based on user intent from this list:
{list(__ANALYZERS_DOCS.keys())}
Their description are:
{__ANALYZERS_DOCS}
Response only with the Analyser name provided above with no explanation or conversation. if there are more than
one analyser, separate them by ","
"""
__DEFAULT_WORKFLOW_USER_PROMPT = """{{user_prompt}},
The analyzers you select should separate by ",", such as: "HFAnalyzer", "SignalAnalyzer"
"""
def __init__(self):
super().__init__()
self._output = None
def execute(self):
prompt = Template(self.__DEFAULT_WORKFLOW_USER_PROMPT).render(
user_prompt=self._context_manager.get_context("user_prompt")
)
be = APIBackend()
be.debug_mode = False
response = be.build_messages_and_create_chat_completion(prompt, self.__DEFAULT_WORKFLOW_SYSTEM_PROMPT)
# it's better to move to another Task
workflow_config = (
self._context_manager.get_context("workflow_config")
if self._context_manager.get_context("workflow_config")
else "workflow_config.yaml"
)
workspace = self._context_manager.get_context("workspace")
with workspace.joinpath(workflow_config).open() as f:
workflow = yaml.safe_load(f)
model = init_instance_by_config(workflow["task"]["model"])
dataset = init_instance_by_config(workflow["task"]["dataset"])
with R.start(experiment_name="finCo"):
model.fit(dataset)
R.save_objects(trained_model=model)
# prediction
recorder = R.get_recorder()
sr = SignalRecord(model, dataset, recorder)
sr.generate()
analysers = response.split(",")
if isinstance(analysers, list):
self.logger.info(f"selected analysers: {analysers}")
tasks = []
for analyser in analysers:
if analyser in self.__ANALYZERS_PROJECT.keys():
tasks.append(
self.__ANALYZERS_PROJECT.get(analyser)(
workspace=workspace, model=model, dataset=dataset, recorder=recorder
)
)
for task in tasks:
resp = task.analyse()
self._context_manager.set_context(task.__class__.__name__, resp)
return []
class ActionTask(Task):
pass
@@ -288,7 +371,8 @@ Example output:
def summarize(self):
if self._output is not None:
# TODO: it will be overrides by later commands
self._context_manager.set_context(self.__class__.__name__, self._output.decode("utf8"))
# utf8 can't decode normally on Windows
self._context_manager.set_context(self.__class__.__name__, self._output.decode("ANSI"))
class ConfigActionTask(ActionTask):
@@ -597,6 +681,11 @@ class SummarizeTask(Task):
your strategy has a relatively low Sharpe ratio. Here are a few suggestions:
You can try diversifying your positions across different assets.
Images:
![HFAnalyzer](file:///D:/Codes/NLP/qlib/finco/finco_workspace/HFAnalyzer.jpeg)
Example output 2:
The output log shows the result of running `qlib` with `LinearModel` strategy on the Chinese stock market CSI 300
from 2008-01-01 to 2020-08-01, based on the Alpha158 data handler from 2015-01-01. The strategy involves using the
@@ -622,9 +711,16 @@ class SummarizeTask(Task):
The numbers in the report do not need to have too many significant figures.
You can add subheadings and paragraphs in Markdown for readability.
You can bold or use other formatting options to highlight keywords in the main text.
You should display images I offered in markdown using the appropriate image format.
"""
__DEFAULT_WORKFLOW_USER_PROMPT = "Here is my information: '{{information}}'\n{{user_prompt}}"
__DEFAULT_USER_PROMPT = "Please summarize them and give me some advice."
__DEFAULT_WORKFLOW_USER_PROMPT = (
"Here is my information: '{{information}}'\n"
"My intention is: {{user_prompt}}. Please provide me with a summary and "
"recommendation based on my intention and the information I have provided."
"There are some figures which absolute path are: {{figure_path}}, "
"You must display these images in markdown using the appropriate image format."
)
__DEFAULT_USER_PROMPT = "Summarize the information I offered and give me some advice."
# TODO: 2048 is close to exceed GPT token limit
__MAX_LENGTH_OF_FILE = 2048
@@ -632,22 +728,29 @@ class SummarizeTask(Task):
def __init__(self):
super().__init__()
self.workspace = self.__DEFAULT_WORKSPACE
def execute(self) -> Any:
workspace = self._context_manager.get_context("workspace")
if workspace is not None:
self.workspace = workspace
user_prompt = self._context_manager.get_context("user_prompt")
user_prompt = user_prompt if user_prompt is not None else self.__DEFAULT_USER_PROMPT
system_prompt = self.__DEFAULT_WORKFLOW_SYSTEM_PROMPT
workspace = self._context_manager.get_context("workspace")
workspace = workspace if workspace is not None else self.__DEFAULT_WORKSPACE
file_info = self.get_info_from_file(workspace)
context_info = self.get_info_from_context()
context_info = [] # too long context make response unstable.
figure_path = self.get_figure_path()
information = context_info + file_info
prompt_workflow_selection = Template(self.__DEFAULT_WORKFLOW_USER_PROMPT).render(
information=information, user_prompt=user_prompt
information=information, figure_path=figure_path, user_prompt=user_prompt
)
response = APIBackend().build_messages_and_create_chat_completion(
be = APIBackend()
be.debug_mode = False
response = be.build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection, system_prompt=system_prompt
)
self.save_markdown(content=response)
@@ -701,7 +804,18 @@ class SummarizeTask(Task):
context.append({key: c[: self.__MAX_LENGTH_OF_FILE]})
return context
def get_figure_path(self):
file_list = []
for root, dirs, files in os.walk(Path(self.workspace)):
for filename in files:
postfix = filename.split(".")[-1]
if postfix in ["jpeg"]:
file_path = os.path.join("./", filename)
file_list.append(str(Path(file_path).relative_to(self.workspace)))
return file_list
def save_markdown(self, content: str):
with open(self.__DEFAULT_REPORT_NAME, "w") as f:
with open(Path(self.workspace).joinpath(self.__DEFAULT_REPORT_NAME), "w") as f:
f.write(content)
self.logger.info(f"report has saved to {self.__DEFAULT_REPORT_NAME}")

View File

@@ -6,7 +6,8 @@ import shutil
from qlib.log import get_module_logger
from qlib.finco.conf import Config
from qlib.finco.utils import parse_json
from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask
from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask, RecorderTask
from qlib.finco.log import FinCoLog
class WorkflowContextManager:
@@ -54,6 +55,7 @@ class WorkflowManager:
self._context = WorkflowContextManager()
self._context.set_context("workspace", self._workspace)
self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China a stock market. I want to construct a new dataset covers longer history"
self.fco = FinCoLog()
def _confirm_and_rm(self):
# if workspace exists, please confirm and remove it. Otherwise exit.
@@ -110,8 +112,9 @@ class WorkflowManager:
self.set_context("user_prompt", prompt)
# NOTE: list may not be enough for general task list
task_list = [WorkflowTask(), SummarizeTask()]
task_list = [WorkflowTask(), RecorderTask(), SummarizeTask()]
while len(task_list):
self.fco.info(f"Current Task List: {task_list}")
# task list is not long, so sort it is not a big problem
# TODO: sort the task list based on the priority of the task
# task_list = sorted(task_list, key=lambda x: x.task_type)
@@ -121,7 +124,7 @@ class WorkflowManager:
if not cfg.continous_mode:
res = t.interact()
t.summarize()
if isinstance(t, (WorkflowTask, PlanTask, ActionTask, SummarizeTask)):
if isinstance(t, (WorkflowTask, PlanTask, ActionTask, RecorderTask, SummarizeTask)):
task_list = res + task_list
else:
raise NotImplementedError(f"Unsupported Task type {t}")

View File

@@ -18,7 +18,7 @@ from ..utils import fill_placeholder, flatten_dict, class_casting, get_date_by_s
from ..utils.time import Freq
from ..utils.data import deepcopy_basic_type
from ..contrib.eva.alpha import calc_ic, calc_long_short_return, calc_long_short_prec
from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer
logger = get_module_logger("workflow", logging.INFO)
@@ -156,16 +156,20 @@ class RecordTemp:
with class_casting(self, self.depend_cls):
self.check(include_self=True)
def analyse(self):
raise NotImplementedError(f"Please implement the `analysis` method.")
class SignalRecord(RecordTemp):
"""
This is the Signal Record class that generates the signal prediction. This class inherits the ``RecordTemp`` class.
"""
def __init__(self, model=None, dataset=None, recorder=None):
def __init__(self, model=None, dataset=None, recorder=None, workspace=None):
super().__init__(recorder=recorder)
self.model = model
self.dataset = dataset
self.workspace = workspace
@staticmethod
def generate_label(dataset):
@@ -204,6 +208,10 @@ class SignalRecord(RecordTemp):
raw_label = self.generate_label(self.dataset)
self.save(**{"label.pkl": raw_label})
def analyse(self):
res = SignalAnalyzer(workspace=self.workspace).analyse(dataset=self.dataset)
return res
def list(self):
return ["pred.pkl", "label.pkl"]
@@ -245,8 +253,9 @@ class HFSignalRecord(SignalRecord):
artifact_path = "hg_sig_analysis"
depend_cls = SignalRecord
def __init__(self, recorder, **kwargs):
def __init__(self, recorder, workspace=None, **kwargs):
super().__init__(recorder=recorder)
self.workspace = workspace
def generate(self):
pred = self.load("pred.pkl")
@@ -280,6 +289,12 @@ class HFSignalRecord(SignalRecord):
self.save(**objects)
pprint(metrics)
def analyse(self):
pred = self.load("pred.pkl")
raw_label = self.load("label.pkl")
res = HFAnalyzer(workspace=self.workspace).analyse(pred=pred, label=raw_label)
return res
def list(self):
return ["ic.pkl", "ric.pkl", "long_pre.pkl", "short_pre.pkl", "long_short_r.pkl", "long_avg_r.pkl"]

View File

@@ -1,10 +1,14 @@
import unittest
import os
import shutil
from dotenv import load_dotenv
# pydantic support load_dotenv, so load_dotenv will be deprecated in the future.
from qlib.finco.task import SummarizeTask
from qlib.finco.workflow import WorkflowContextManager
from qlib.finco.llm import try_create_chat_completion
from qlib.finco.llm import APIBackend
from qlib.finco.workflow import WorkflowManager
load_dotenv(verbose=True, override=True)
@@ -22,24 +26,41 @@ class TestSummarize(unittest.TestCase):
"content": "How to write a perfect quant strategy.",
},
]
response = try_create_chat_completion(messages=messages)
response = APIBackend().try_create_chat_completion(messages=messages)
print(response)
def test_execution(self):
task = SummarizeTask()
context = WorkflowContextManager()
context.set_context("output_path", "../../examples/benchmarks/Linear")
context.set_context("workspace", "../../examples/benchmarks/Linear")
context.set_context("user_prompt", "My main focus is on the performance of the strategy's return."
"Please summarize the information and give me some advice.")
task.assign_context_manager(context)
resp = task.execution()
resp = task.execute()
print(resp)
def test_generate_batch_result(self):
wm = WorkflowManager()
prompt = wm.default_user_prompt
# prompt = ""
workdir = os.path.dirname(wm.get_context().get_context("workspace"))
summaries_path = os.path.join(workdir, "summaries")
if not os.path.exists(summaries_path):
os.makedirs(summaries_path)
for i in range(10):
wm.run(prompt)
if os.path.exists(f"{workdir}/finCoReport.md"):
shutil.move(f"{workdir}/finCoReport.md", f"{workdir}/summaries/finCoReport{i}.md")
def test_parse2txt(self):
task = SummarizeTask()
resp = task.get_info_from_file('')
resp = task.get_info_from_file("")
print(resp)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()