diff --git a/qlib/contrib/analyzer.py b/qlib/contrib/analyzer.py new file mode 100644 index 000000000..ef63679b6 --- /dev/null +++ b/qlib/contrib/analyzer.py @@ -0,0 +1,104 @@ +import logging +import matplotlib.pyplot as plt +from pathlib import Path +import numpy as np + +from qlib.utils import class_casting + +from ..data.dataset import DatasetH +from ..data.dataset.handler import DataHandlerLP +from ..log import get_module_logger +from ..contrib.eva.alpha import calc_ic, calc_long_short_return, calc_long_short_prec + +logger = get_module_logger("analysis", logging.INFO) + + +class AnalyzerTemp: + def __init__(self, workspace=None, **kwargs): + self.workspace = Path(workspace) if workspace else "./" + + def analyse(self, **kwargs): + """ + Analyse data index, distribution .etc + + Parameters + ---------- + + + Return + ------ + The handled data. + """ + raise NotImplementedError(f"Please implement the `analysis` method.") + + +class HFAnalyzer(AnalyzerTemp): + """ + This is the Signal Analysis class that generates the analysis results such as IC and IR. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def analyse(self, pred=None, label=None): + long_pre, short_pre = calc_long_short_prec(pred.iloc[:, 0], label.iloc[:, 0], is_alpha=True) + ic, ric = calc_ic(pred.iloc[:, 0], label.iloc[:, 0]) + metrics = { + "IC": ic.mean(), + "ICIR": ic.mean() / ic.std(), + "Rank IC": ric.mean(), + "Rank ICIR": ric.mean() / ric.std(), + "Long precision": long_pre.mean(), + "Short precision": short_pre.mean(), + } + + long_short_r, long_avg_r = calc_long_short_return(pred.iloc[:, 0], label.iloc[:, 0]) + metrics.update( + { + "Long-Short Average Return": long_short_r.mean(), + "Long-Short Average Sharpe": long_short_r.mean() / long_short_r.std(), + } + ) + + table = [[k, v] for (k, v) in metrics.items()] + plt.table(cellText=table, loc="center") + plt.axis("off") + plt.savefig(self.workspace.joinpath("HFAnalyzerTable.jpeg")) + plt.clf() + + plt.scatter(np.arange(0, len(pred)), pred.iloc[:, 0]) + plt.scatter(np.arange(0, len(label)), label.iloc[:, 0]) + plt.title("HFAnalyzer") + plt.savefig(self.workspace.joinpath("HFAnalyzer.jpeg")) + + +class SignalAnalyzer(AnalyzerTemp): + """ + This is the Signal Analysis class that generates the analysis results such as IC and IR. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def analyse(self, dataset=None, **kwargs): + + with class_casting(dataset, DatasetH): + params = dict(segments="test", col_set="label", data_key=DataHandlerLP.DK_R) + try: + # Assume the backend handler is DataHandlerLP + raw_label = dataset.prepare(**params) + except TypeError: + # The argument number is not right + del params["data_key"] + # The backend handler should be DataHandler + raw_label = dataset.prepare(**params) + except AttributeError as e: + # The data handler is initialized with `drop_raw=True`... + # So raw_label is not available + logger.warning(f"Exception: {e}") + raw_label = None + plt.hist(raw_label) + plt.title("SignalAnalyzer") + plt.savefig(self.workspace.joinpath("signalAnalysis.jpeg")) + + return raw_label diff --git a/qlib/finco/llm.py b/qlib/finco/llm.py index ce6f80c53..b03a5d34c 100644 --- a/qlib/finco/llm.py +++ b/qlib/finco/llm.py @@ -3,9 +3,9 @@ import time import openai import json from typing import Optional -from qlib.log import get_module_logger from qlib.finco.conf import Config from qlib.finco.utils import Singleton +from qlib.finco.log import FinCoLog class APIBackend(Singleton): @@ -47,7 +47,10 @@ class APIBackend(Singleton): "content": user_prompt, }, ] + fcl = FinCoLog() response = self.try_create_chat_completion(messages=messages) + fcl.log_message(messages) + fcl.info(response) return response def try_create_chat_completion(self, max_retry=10, **kwargs): diff --git a/qlib/finco/log.py b/qlib/finco/log.py new file mode 100644 index 000000000..b493ba87f --- /dev/null +++ b/qlib/finco/log.py @@ -0,0 +1,48 @@ +""" +This module will base on Qlib's logger module and provides some interactive functions. +""" +from email import message_from_binary_file +from typing import Dict, List +from qlib.finco.utils import Singleton +from qlib.log import get_module_logger +from contextlib import contextmanager + +# a context manager, print liens before and after a function +@contextmanager +def formating_log(logger, text="Interaction"): + logger.info("") + logger.info("=" * 20 + f" BEGIN:{text} " + "=" * 20) + yield + logger.info("=" * 20 + f" END: {text} " + "=" * 20) + logger.info("") + + +class FinCoLog(Singleton): + # TODO: + # - config to file logger and save it into workspace + def __init__(self) -> None: + self.logger = get_module_logger("interactive") + + def log_message(self, messages: List[Dict[str, str]]): + """ + messages is some info like this [ + { + "role": "system", + "content": system_prompt, + }, + { + "role": "user", + "content": user_prompt, + }, + ] + """ + with formating_log(self.logger): + for m in messages: + self.logger.info(f"Role: {m['role']}") + self.logger.info(f"Content: {m['content']}") + + # TODO: + # It looks wierd if we only have logger + def info(self, *args, **kwargs): + with formating_log(self.logger, "info"): + self.logger.info(*args, **kwargs) diff --git a/qlib/finco/task.py b/qlib/finco/task.py index dc6434bf6..4510a3d38 100644 --- a/qlib/finco/task.py +++ b/qlib/finco/task.py @@ -14,6 +14,10 @@ import platform from qlib.log import get_module_logger from qlib.finco.llm import APIBackend from qlib.finco.tpl import get_tpl_path +from qlib.workflow.record_temp import HFSignalRecord, SignalRecord +from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer +from qlib.utils import init_instance_by_config +from qlib.workflow import R class Task: @@ -247,6 +251,85 @@ class RLPlanTask(PlanTask): return [] +class RecorderTask(Task): + """ + This Recorder task is responsible for analysing data such as index and distribution. + """ + + __ANALYZERS_PROJECT = {HFAnalyzer.__name__: HFSignalRecord, SignalAnalyzer.__name__: SignalRecord} + __ANALYZERS_DOCS = {HFAnalyzer.__name__: HFAnalyzer.__doc__, SignalAnalyzer.__name__: SignalAnalyzer.__doc__} + # __ANALYZERS_PROJECT = {SignalAnalyzer.__name__: SignalRecord} + # __ANALYZERS_DOCS = {SignalAnalyzer.__name__: SignalAnalyzer.__doc__} + + __DEFAULT_WORKFLOW_SYSTEM_PROMPT = f""" + You are an expert system administrator. + Your task is to select the best analysis class based on user intent from this list: + {list(__ANALYZERS_DOCS.keys())} + Their description are: + {__ANALYZERS_DOCS} + + Response only with the Analyser name provided above with no explanation or conversation. if there are more than + one analyser, separate them by "," + + """ + + __DEFAULT_WORKFLOW_USER_PROMPT = """{{user_prompt}}, + The analyzers you select should separate by ",", such as: "HFAnalyzer", "SignalAnalyzer" + """ + + def __init__(self): + super().__init__() + self._output = None + + def execute(self): + prompt = Template(self.__DEFAULT_WORKFLOW_USER_PROMPT).render( + user_prompt=self._context_manager.get_context("user_prompt") + ) + be = APIBackend() + be.debug_mode = False + response = be.build_messages_and_create_chat_completion(prompt, self.__DEFAULT_WORKFLOW_SYSTEM_PROMPT) + + # it's better to move to another Task + workflow_config = ( + self._context_manager.get_context("workflow_config") + if self._context_manager.get_context("workflow_config") + else "workflow_config.yaml" + ) + workspace = self._context_manager.get_context("workspace") + with workspace.joinpath(workflow_config).open() as f: + workflow = yaml.safe_load(f) + + model = init_instance_by_config(workflow["task"]["model"]) + dataset = init_instance_by_config(workflow["task"]["dataset"]) + + with R.start(experiment_name="finCo"): + model.fit(dataset) + R.save_objects(trained_model=model) + + # prediction + recorder = R.get_recorder() + sr = SignalRecord(model, dataset, recorder) + sr.generate() + + analysers = response.split(",") + if isinstance(analysers, list): + self.logger.info(f"selected analysers: {analysers}") + tasks = [] + for analyser in analysers: + if analyser in self.__ANALYZERS_PROJECT.keys(): + tasks.append( + self.__ANALYZERS_PROJECT.get(analyser)( + workspace=workspace, model=model, dataset=dataset, recorder=recorder + ) + ) + + for task in tasks: + resp = task.analyse() + self._context_manager.set_context(task.__class__.__name__, resp) + + return [] + + class ActionTask(Task): pass @@ -288,7 +371,8 @@ Example output: def summarize(self): if self._output is not None: # TODO: it will be overrides by later commands - self._context_manager.set_context(self.__class__.__name__, self._output.decode("utf8")) + # utf8 can't decode normally on Windows + self._context_manager.set_context(self.__class__.__name__, self._output.decode("ANSI")) class ConfigActionTask(ActionTask): @@ -597,6 +681,11 @@ class SummarizeTask(Task): your strategy has a relatively low Sharpe ratio. Here are a few suggestions: You can try diversifying your positions across different assets. + Images: + + ![HFAnalyzer](file:///D:/Codes/NLP/qlib/finco/finco_workspace/HFAnalyzer.jpeg) + + Example output 2: The output log shows the result of running `qlib` with `LinearModel` strategy on the Chinese stock market CSI 300 from 2008-01-01 to 2020-08-01, based on the Alpha158 data handler from 2015-01-01. The strategy involves using the @@ -622,9 +711,16 @@ class SummarizeTask(Task): The numbers in the report do not need to have too many significant figures. You can add subheadings and paragraphs in Markdown for readability. You can bold or use other formatting options to highlight keywords in the main text. + You should display images I offered in markdown using the appropriate image format. """ - __DEFAULT_WORKFLOW_USER_PROMPT = "Here is my information: '{{information}}'\n{{user_prompt}}" - __DEFAULT_USER_PROMPT = "Please summarize them and give me some advice." + __DEFAULT_WORKFLOW_USER_PROMPT = ( + "Here is my information: '{{information}}'\n" + "My intention is: {{user_prompt}}. Please provide me with a summary and " + "recommendation based on my intention and the information I have provided." + "There are some figures which absolute path are: {{figure_path}}, " + "You must display these images in markdown using the appropriate image format." + ) + __DEFAULT_USER_PROMPT = "Summarize the information I offered and give me some advice." # TODO: 2048 is close to exceed GPT token limit __MAX_LENGTH_OF_FILE = 2048 @@ -632,22 +728,29 @@ class SummarizeTask(Task): def __init__(self): super().__init__() + self.workspace = self.__DEFAULT_WORKSPACE def execute(self) -> Any: + workspace = self._context_manager.get_context("workspace") + if workspace is not None: + self.workspace = workspace + user_prompt = self._context_manager.get_context("user_prompt") user_prompt = user_prompt if user_prompt is not None else self.__DEFAULT_USER_PROMPT system_prompt = self.__DEFAULT_WORKFLOW_SYSTEM_PROMPT - workspace = self._context_manager.get_context("workspace") - workspace = workspace if workspace is not None else self.__DEFAULT_WORKSPACE + file_info = self.get_info_from_file(workspace) - context_info = self.get_info_from_context() + context_info = [] # too long context make response unstable. + figure_path = self.get_figure_path() information = context_info + file_info prompt_workflow_selection = Template(self.__DEFAULT_WORKFLOW_USER_PROMPT).render( - information=information, user_prompt=user_prompt + information=information, figure_path=figure_path, user_prompt=user_prompt ) - response = APIBackend().build_messages_and_create_chat_completion( + be = APIBackend() + be.debug_mode = False + response = be.build_messages_and_create_chat_completion( user_prompt=prompt_workflow_selection, system_prompt=system_prompt ) self.save_markdown(content=response) @@ -701,7 +804,18 @@ class SummarizeTask(Task): context.append({key: c[: self.__MAX_LENGTH_OF_FILE]}) return context + def get_figure_path(self): + file_list = [] + + for root, dirs, files in os.walk(Path(self.workspace)): + for filename in files: + postfix = filename.split(".")[-1] + if postfix in ["jpeg"]: + file_path = os.path.join("./", filename) + file_list.append(str(Path(file_path).relative_to(self.workspace))) + return file_list + def save_markdown(self, content: str): - with open(self.__DEFAULT_REPORT_NAME, "w") as f: + with open(Path(self.workspace).joinpath(self.__DEFAULT_REPORT_NAME), "w") as f: f.write(content) self.logger.info(f"report has saved to {self.__DEFAULT_REPORT_NAME}") diff --git a/qlib/finco/workflow.py b/qlib/finco/workflow.py index f380b23da..13d31a845 100644 --- a/qlib/finco/workflow.py +++ b/qlib/finco/workflow.py @@ -6,7 +6,8 @@ import shutil from qlib.log import get_module_logger from qlib.finco.conf import Config from qlib.finco.utils import parse_json -from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask +from qlib.finco.task import WorkflowTask, PlanTask, ActionTask, SummarizeTask, RecorderTask +from qlib.finco.log import FinCoLog class WorkflowContextManager: @@ -54,6 +55,7 @@ class WorkflowManager: self._context = WorkflowContextManager() self._context.set_context("workspace", self._workspace) self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China a stock market. I want to construct a new dataset covers longer history" + self.fco = FinCoLog() def _confirm_and_rm(self): # if workspace exists, please confirm and remove it. Otherwise exit. @@ -110,8 +112,9 @@ class WorkflowManager: self.set_context("user_prompt", prompt) # NOTE: list may not be enough for general task list - task_list = [WorkflowTask(), SummarizeTask()] + task_list = [WorkflowTask(), RecorderTask(), SummarizeTask()] while len(task_list): + self.fco.info(f"Current Task List: {task_list}") # task list is not long, so sort it is not a big problem # TODO: sort the task list based on the priority of the task # task_list = sorted(task_list, key=lambda x: x.task_type) @@ -121,7 +124,7 @@ class WorkflowManager: if not cfg.continous_mode: res = t.interact() t.summarize() - if isinstance(t, (WorkflowTask, PlanTask, ActionTask, SummarizeTask)): + if isinstance(t, (WorkflowTask, PlanTask, ActionTask, RecorderTask, SummarizeTask)): task_list = res + task_list else: raise NotImplementedError(f"Unsupported Task type {t}") diff --git a/qlib/workflow/record_temp.py b/qlib/workflow/record_temp.py index fdb3f6c92..331b54cc4 100644 --- a/qlib/workflow/record_temp.py +++ b/qlib/workflow/record_temp.py @@ -18,7 +18,7 @@ from ..utils import fill_placeholder, flatten_dict, class_casting, get_date_by_s from ..utils.time import Freq from ..utils.data import deepcopy_basic_type from ..contrib.eva.alpha import calc_ic, calc_long_short_return, calc_long_short_prec - +from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer logger = get_module_logger("workflow", logging.INFO) @@ -156,16 +156,20 @@ class RecordTemp: with class_casting(self, self.depend_cls): self.check(include_self=True) + def analyse(self): + raise NotImplementedError(f"Please implement the `analysis` method.") + class SignalRecord(RecordTemp): """ This is the Signal Record class that generates the signal prediction. This class inherits the ``RecordTemp`` class. """ - def __init__(self, model=None, dataset=None, recorder=None): + def __init__(self, model=None, dataset=None, recorder=None, workspace=None): super().__init__(recorder=recorder) self.model = model self.dataset = dataset + self.workspace = workspace @staticmethod def generate_label(dataset): @@ -204,6 +208,10 @@ class SignalRecord(RecordTemp): raw_label = self.generate_label(self.dataset) self.save(**{"label.pkl": raw_label}) + def analyse(self): + res = SignalAnalyzer(workspace=self.workspace).analyse(dataset=self.dataset) + return res + def list(self): return ["pred.pkl", "label.pkl"] @@ -245,8 +253,9 @@ class HFSignalRecord(SignalRecord): artifact_path = "hg_sig_analysis" depend_cls = SignalRecord - def __init__(self, recorder, **kwargs): + def __init__(self, recorder, workspace=None, **kwargs): super().__init__(recorder=recorder) + self.workspace = workspace def generate(self): pred = self.load("pred.pkl") @@ -280,6 +289,12 @@ class HFSignalRecord(SignalRecord): self.save(**objects) pprint(metrics) + def analyse(self): + pred = self.load("pred.pkl") + raw_label = self.load("label.pkl") + res = HFAnalyzer(workspace=self.workspace).analyse(pred=pred, label=raw_label) + return res + def list(self): return ["ic.pkl", "ric.pkl", "long_pre.pkl", "short_pre.pkl", "long_short_r.pkl", "long_avg_r.pkl"] diff --git a/tests/finco/test_sumarize.py b/tests/finco/test_sumarize.py index 2db916b0e..c2cc41f24 100644 --- a/tests/finco/test_sumarize.py +++ b/tests/finco/test_sumarize.py @@ -1,10 +1,14 @@ import unittest +import os +import shutil + from dotenv import load_dotenv # pydantic support load_dotenv, so load_dotenv will be deprecated in the future. from qlib.finco.task import SummarizeTask from qlib.finco.workflow import WorkflowContextManager -from qlib.finco.llm import try_create_chat_completion +from qlib.finco.llm import APIBackend +from qlib.finco.workflow import WorkflowManager load_dotenv(verbose=True, override=True) @@ -22,24 +26,41 @@ class TestSummarize(unittest.TestCase): "content": "How to write a perfect quant strategy.", }, ] - response = try_create_chat_completion(messages=messages) + response = APIBackend().try_create_chat_completion(messages=messages) print(response) def test_execution(self): task = SummarizeTask() context = WorkflowContextManager() - context.set_context("output_path", "../../examples/benchmarks/Linear") + context.set_context("workspace", "../../examples/benchmarks/Linear") context.set_context("user_prompt", "My main focus is on the performance of the strategy's return." "Please summarize the information and give me some advice.") task.assign_context_manager(context) - resp = task.execution() + resp = task.execute() print(resp) + def test_generate_batch_result(self): + wm = WorkflowManager() + + prompt = wm.default_user_prompt + # prompt = "" + + workdir = os.path.dirname(wm.get_context().get_context("workspace")) + summaries_path = os.path.join(workdir, "summaries") + + if not os.path.exists(summaries_path): + os.makedirs(summaries_path) + + for i in range(10): + wm.run(prompt) + if os.path.exists(f"{workdir}/finCoReport.md"): + shutil.move(f"{workdir}/finCoReport.md", f"{workdir}/summaries/finCoReport{i}.md") + def test_parse2txt(self): task = SummarizeTask() - resp = task.get_info_from_file('') + resp = task.get_info_from_file("") print(resp) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()