1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00
Files
qlib/examples/run_all_model.py
2020-11-23 15:10:14 +08:00

306 lines
11 KiB
Python

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
import sys
import fire
import venv
import glob
import shutil
import tempfile
import statistics
from pathlib import Path
from subprocess import Popen, PIPE
from threading import Thread
from pprint import pprint
from urllib.parse import urlparse
from urllib.request import urlretrieve
import qlib
from qlib.config import REG_CN
from qlib.workflow import R
from qlib.workflow.cli import workflow
from qlib.utils import exists_qlib_data
# init qlib
provider_uri = "~/.qlib/qlib_data/cn_data"
if not exists_qlib_data(provider_uri):
print(f"Qlib data is not found in {provider_uri}")
sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts")))
from get_data import GetData
GetData().qlib_data(target_dir=provider_uri, region=REG_CN)
qlib.init(provider_uri=provider_uri, region=REG_CN)
class ExtendedEnvBuilder(venv.EnvBuilder):
"""
Thie class is modified based on https://docs.python.org/3/library/venv.html.
This builder installs setuptools and pip so that you can pip or
easy_install other packages into the created virtual environment.
:param nodist: If true, setuptools and pip are not installed into the
created virtual environment.
:param nopip: If true, pip is not installed into the created
virtual environment.
:param progress: If setuptools or pip are installed, the progress of the
installation can be monitored by passing a progress
callable. If specified, it is called with two
arguments: a string indicating some progress, and a
context indicating where the string is coming from.
The context argument can have one of three values:
'main', indicating that it is called from virtualize()
itself, and 'stdout' and 'stderr', which are obtained
by reading lines from the output streams of a subprocess
which is used to install the app.
If a callable is not specified, default progress
information is output to sys.stderr.
"""
def __init__(self, *args, **kwargs):
self.nodist = kwargs.pop("nodist", False)
self.nopip = kwargs.pop("nopip", False)
self.progress = kwargs.pop("progress", None)
self.verbose = kwargs.pop("verbose", False)
super().__init__(*args, **kwargs)
def post_setup(self, context):
"""
Set up any packages which need to be pre-installed into the
virtual environment being created.
:param context: The information for the virtual environment
creation request being processed.
"""
os.environ["VIRTUAL_ENV"] = context.env_dir
if not self.nodist:
self.install_setuptools(context)
# Can't install pip without setuptools
if not self.nopip and not self.nodist:
self.install_pip(context)
def reader(self, stream, context):
"""
Read lines from a subprocess' output stream and either pass to a progress
callable (if specified) or write progress information to sys.stderr.
"""
progress = self.progress
while True:
s = stream.readline()
if not s:
break
if progress is not None:
progress(s, context)
else:
if not self.verbose:
sys.stderr.write(".")
else:
sys.stderr.write(s.decode("utf-8"))
sys.stderr.flush()
stream.close()
def install_script(self, context, name, url):
_, _, path, _, _, _ = urlparse(url)
fn = os.path.split(path)[-1]
binpath = context.bin_path
distpath = os.path.join(binpath, fn)
# Download script into the virtual environment's binaries folder
urlretrieve(url, distpath)
progress = self.progress
if self.verbose:
term = "\n"
else:
term = ""
if progress is not None:
progress("Installing %s ...%s" % (name, term), "main")
else:
sys.stderr.write("Installing %s ...%s" % (name, term))
sys.stderr.flush()
# Install in the virtual environment
args = [context.env_exe, fn]
p = Popen(args, stdout=PIPE, stderr=PIPE, cwd=binpath)
t1 = Thread(target=self.reader, args=(p.stdout, "stdout"))
t1.start()
t2 = Thread(target=self.reader, args=(p.stderr, "stderr"))
t2.start()
p.wait()
t1.join()
t2.join()
if progress is not None:
progress("done.", "main")
else:
sys.stderr.write("done.\n")
# Clean up - no longer needed
os.unlink(distpath)
def install_setuptools(self, context):
"""
Install setuptools in the virtual environment.
:param context: The information for the virtual environment
creation request being processed.
"""
url = "https://bootstrap.pypa.io/ez_setup.py"
self.install_script(context, "setuptools", url)
# clear up the setuptools archive which gets downloaded
pred = lambda o: o.startswith("setuptools-") and o.endswith(".tar.gz")
files = filter(pred, os.listdir(context.bin_path))
for f in files:
f = os.path.join(context.bin_path, f)
os.unlink(f)
def install_pip(self, context):
"""
Install pip in the virtual environment.
:param context: The information for the virtual environment
creation request being processed.
"""
url = "https://bootstrap.pypa.io/get-pip.py"
self.install_script(context, "pip", url)
# function to calculate the mean and std of a list in the results dictionary
def cal_mean_std(results) -> dict:
mean_std = dict()
for fn in results:
mean_std[fn] = dict()
for metric in results[fn]:
mean = statistics.mean(results[fn][metric]) if len(results[fn][metric]) > 1 else results[fn][metric][0]
std = statistics.stdev(results[fn][metric]) if len(results[fn][metric]) > 1 else 0
mean_std[fn][metric] = [mean, std]
return mean_std
# function to get all the folders benchmark folder
def get_all_folders() -> dict:
folders = dict()
for f in os.scandir("benchmarks"):
path = Path("benchmarks") / f.name
if f.name != "TFT":
folders[f.name] = str(path.resolve())
return folders
# function to get all the files under the model folder
def get_all_files(folder_path) -> (str, str):
yaml_path = str(Path(f"{folder_path}") / "*.yaml")
req_path = str(Path(f"{folder_path}") / "*.txt")
return glob.glob(yaml_path)[0], glob.glob(req_path)[0]
# function to retrieve all the results
def get_all_results(folders) -> dict:
results = dict()
for fn in folders:
exp = R.get_exp(experiment_name=fn, create=False)
recorders = exp.list_recorders()
result = dict()
result["annualized_return_with_cost"] = list()
result["information_ratio_with_cost"] = list()
result["max_drawdown_with_cost"] = list()
for recorder_id in recorders:
recorder = R.get_recorder(recorder_id=recorder_id, experiment_name=fn)
metrics = recorder.list_metrics()
result["annualized_return_with_cost"].append(metrics["excess_return_with_cost.annualized_return"])
result["information_ratio_with_cost"].append(metrics["excess_return_with_cost.information_ratio"])
result["max_drawdown_with_cost"].append(metrics["excess_return_with_cost.max_drawdown"])
results[fn] = result
return results
# function to generate and save markdown table
def gen_and_save_md_table(metrics):
table = "| Model Name | Annualized Return | Information Ratio | Max Drawdown |\n"
table += "|---|---|---|---|\n"
for fn in metrics:
ar = metrics[fn]["annualized_return_with_cost"]
ir = metrics[fn]["information_ratio_with_cost"]
md = metrics[fn]["max_drawdown_with_cost"]
table += f"| {fn} | {ar[0]:9.4f}±{ar[1]:9.2f} | {ir[0]:9.4f}±{ir[1]:9.2f}| {md[0]:9.4f}±{md[1]:9.2f} |\n"
pprint(table)
with open("table.md", "w") as f:
f.write(table)
return table
# function to run the all the models
def run(times=1):
"""
Please be aware that this function can only work under Linux. MacOS and Windows will be supported in the future.
Any PR to enhance this method is highly welcomed.
"""
# get all folders
folders = get_all_folders()
# set up
compatible = True
if sys.version_info < (3, 3):
compatible = False
elif not hasattr(sys, "base_prefix"):
compatible = False
if not compatible:
raise ValueError("This script is only for use with " "Python 3.3 or later")
if os.name == "nt":
use_symlinks = False
else:
use_symlinks = True
builder = ExtendedEnvBuilder(
system_site_packages=False,
clear=False,
symlinks=use_symlinks,
upgrade=False,
nodist=False,
nopip=False,
verbose=False,
)
# run all the model for iterations
for fn in folders:
# create env
temp_dir = tempfile.mkdtemp()
env_path = Path(temp_dir).absolute()
sys.stderr.write(f"Creating Virtual Environment with path: {env_path}...\n")
builder.create(str(env_path))
python_path = env_path / "bin" / "python" # TODO: FIX ME!
sys.stderr.write("\n")
# get all files
sys.stderr.write("Retrieving files...\n")
yaml_path, req_path = get_all_files(folders[fn])
sys.stderr.write("\n")
# install requirements.txt
sys.stderr.write("Installing requirements.txt...\n")
os.system(f"{python_path} -m pip install -r {req_path}")
sys.stderr.write("\n")
# install qlib
sys.stderr.write("Installing qlib...\n")
os.system(f"{python_path} -m pip install --upgrade cython") # TODO: FIX ME!
os.system(f"{python_path} -m pip install -e git+https://github.com/you-n-g/qlib#egg=pyqlib") # TODO: FIX ME!
sys.stderr.write("\n")
# run workflow_by_config for multiple times
for i in range(times):
sys.stderr.write(f"Running the model: {fn} for iteration {i+1}...\n")
os.system(f"{python_path} {env_path / 'src/pyqlib/qlib/workflow/cli.py'} {yaml_path} {fn}")
sys.stderr.write("\n")
# remove env
sys.stderr.write(f"Deleting the environment: {env_path}...\n")
shutil.rmtree(env_path)
# getting all results
sys.stderr.write(f"Retrieving results...\n")
results = get_all_results(folders)
# calculating the mean and std
sys.stderr.write(f"Calculating the mean and std of results...\n")
results = cal_mean_std(results)
# generating md table
sys.stderr.write(f"Generating markdown table...\n")
gen_and_save_md_table(results)
if __name__ == "__main__":
rc = 1
try:
fire.Fire(run) # run all the model
rc = 0
except Exception as e:
print("Error: %s" % e, file=sys.stderr)
sys.exit(rc)