1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-05 20:11:08 +08:00
This commit is contained in:
zhupr
2020-09-28 14:43:09 +08:00
committed by you-n-g
parent c0b0920d73
commit 3c9f3acf79
5 changed files with 123 additions and 150 deletions

View File

@@ -154,10 +154,9 @@ Qlib provides a tool named `Estimator` to run the whole workflow automatically (
- Auto Correlation of forecasting signal (model prediction)
![Auto Correlation](http://fintech.msra.cn/images/analysis/analysis_model_auto_correlation.png?v=0.1)
- Portfolio analysis
- Backtest return
![Report](http://fintech.msra.cn/images/analysis/report.png)
![Report](http://fintech.msra.cn/images/analysis/report.png?v=0.1)
<!--
- Score IC
![Score IC](docs/_static/img/score_ic.png)

66
docs/FAQ/FAQ.rst Normal file
View File

@@ -0,0 +1,66 @@
Qlib FAQ
############
Qlib Frequently Asked Questions
================================
.. contents::
:depth: 1
:local:
:backlinks: none
------
1. RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase...
------------------------------------------------------------------------------------------------------------------------------------
.. code-block:: console
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
This is caused by the limitation of multiprocessing under windows OS. Please refer to `here <https://stackoverflow.com/a/24374798>`_ for more info.
**Solution**: To select a start method you use the ``D.features`` in the if __name__ == '__main__' clause of the main module. For example:
.. code-block:: python
import qlib
from qlib.data import D
if __name__ == "__main__":
qlib.init()
instruments = ["SH600000"]
fields = ["$close", "$change"]
df = D.features(instruments, fields, start_time='2010-01-01', end_time='2012-12-31')
print(df.head())
2. qlib.data.cache.QlibCacheException: It sees the key(...) of the redis lock has existed in your redis db now.
-----------------------------------------------------------------------------------------------------------------
It sees the key of the redis lock has existed in your redis db now. You can use the following command to clear your redis keys and rerun your commands
.. code-block:: console
$ redis-cli
> select 1
> flushdb
Also, feel free to post a new issue in our GitHub repository. We always look at each case individually and try to find a root cause.

View File

@@ -54,6 +54,11 @@ Document Structure
API <reference/api.rst>
.. toctree::
:maxdepth: 3
FAQ <FAQ/FAQ.rst>
.. toctree::
:maxdepth: 3
:caption: Change Log:

View File

@@ -35,6 +35,10 @@ from .base import Feature
from .ops import *
class QlibCacheException(RuntimeError):
pass
class MemCacheUnit(OrderedDict):
"""Memory Cache Unit."""
@@ -164,6 +168,21 @@ class CacheUtils(object):
except Exception as e:
get_module_logger("CacheUtils").warning(f"visit {cache_path} cache error: {e}")
@staticmethod
def acquire(lock, lock_name):
try:
lock.acquire()
except redis_lock.AlreadyAcquired:
raise QlibCacheException(
f"""It sees the key(lock:{repr(lock_name)[1:-1]}-wlock) of the redis lock has existed in your redis db now.
You can use the following command to clear your redis keys and rerun your commands:
$ redis-cli
> select {C.redis_task_db}
> del "lock:{repr(lock_name)[1:-1]}-wlock"
> quit
"""
)
@staticmethod
@contextlib.contextmanager
def reader_lock(redis_t, lock_name):
@@ -175,7 +194,7 @@ class CacheUtils(object):
try:
current_cache_readers = redis_t.get("%s-reader" % lock_name)
if current_cache_readers is None or int(current_cache_readers) == 0:
current_cache_wlock.acquire()
CacheUtils.acquire(current_cache_wlock, lock_name)
redis_t.incr("%s-reader" % lock_name)
finally:
current_cache_rlock.release()
@@ -197,7 +216,7 @@ class CacheUtils(object):
def writer_lock(redis_t, lock_name):
lock_name = f"{C.provider_uri}:{lock_name}"
current_cache_wlock = redis_lock.Lock(redis_t, "%s-wlock" % lock_name, id=CacheUtils.LOCK_ID)
current_cache_wlock.acquire()
CacheUtils.acquire(current_cache_wlock, lock_name)
try:
yield
finally:
@@ -275,15 +294,7 @@ class DatasetCache(BaseProviderCache):
HDF_KEY = "df"
def dataset(
self,
instruments,
fields,
start_time=None,
end_time=None,
freq="day",
disk_cache=1,
):
def dataset(self, instruments, fields, start_time=None, end_time=None, freq="day", disk_cache=1):
"""Get feature dataset.
.. note:: Same interface as `dataset` method in dataset provider
@@ -309,30 +320,14 @@ class DatasetCache(BaseProviderCache):
"""
raise NotImplementedError("Implement this function to match your own cache mechanism")
def _dataset(
self,
instruments,
fields,
start_time=None,
end_time=None,
freq="day",
disk_cache=1,
):
def _dataset(self, instruments, fields, start_time=None, end_time=None, freq="day", disk_cache=1):
"""Get feature dataset using cache.
Override this method to define how to get feature dataset corresponding to users' own cache mechanism.
"""
raise NotImplementedError("Implement this method if you want to use dataset feature cache")
def _dataset_uri(
self,
instruments,
fields,
start_time=None,
end_time=None,
freq="day",
disk_cache=1,
):
def _dataset_uri(self, instruments, fields, start_time=None, end_time=None, freq="day", disk_cache=1):
"""Get a uri of feature dataset using cache.
specially:
disk_cache=1 means using data set cache and return the uri of cache file.
@@ -412,13 +407,7 @@ class DiskExpressionCache(ExpressionCache):
return True
def _expression(self, instrument, field, start_time=None, end_time=None, freq="day"):
_cache_uri = self._uri(
instrument=instrument,
field=field,
start_time=None,
end_time=None,
freq=freq,
)
_cache_uri = self._uri(instrument=instrument, field=field, start_time=None, end_time=None, freq=freq)
_instrument_dir = os.path.join(self.expr_cache_path, instrument.lower())
cache_path = os.path.join(_instrument_dir, _cache_uri)
# get calendar
@@ -490,12 +479,7 @@ class DiskExpressionCache(ExpressionCache):
# Make sure the cache runs right when the directory is deleted
# while running
meta = {
"info": {
"instrument": instrument,
"field": field,
"freq": freq,
"last_update": last_update,
},
"info": {"instrument": instrument, "field": field, "freq": freq, "last_update": last_update},
"meta": {"last_visit": time.time(), "visits": 1},
}
self.logger.debug(f"generating expression cache: {meta}")
@@ -557,11 +541,7 @@ class DiskExpressionCache(ExpressionCache):
remove_n = min(rght_etd, ele_n)
assert new_calendar[1] == whole_calendar[current_index]
data = self.provider.expression(
instrument,
field,
whole_calendar[current_index - remove_n],
new_calendar[-1],
freq,
instrument, field, whole_calendar[current_index - remove_n], new_calendar[-1], freq
)
with open(cp_cache_uri, "ab") as f:
data = np.array(data).astype("<f")
@@ -633,27 +613,14 @@ class DiskDatasetCache(DatasetCache):
df = pd.DataFrame(columns=fields)
return df
def _dataset(
self,
instruments,
fields,
start_time=None,
end_time=None,
freq="day",
disk_cache=0,
):
def _dataset(self, instruments, fields, start_time=None, end_time=None, freq="day", disk_cache=0):
if disk_cache == 0:
# In this case, data_set cache is configured but will not be used.
return self.provider.dataset(instruments, fields, start_time, end_time, freq)
_cache_uri = self._uri(
instruments=instruments,
fields=fields,
start_time=None,
end_time=None,
freq=freq,
disk_cache=disk_cache,
instruments=instruments, fields=fields, start_time=None, end_time=None, freq=freq, disk_cache=disk_cache
)
cache_path = os.path.join(self.dtst_cache_path, _cache_uri)
@@ -676,24 +643,13 @@ class DiskDatasetCache(DatasetCache):
# cache unavailable, generate the cache
with CacheUtils.writer_lock(self.r, "dataset-%s" % _cache_uri):
features = self.gen_dataset_cache(
cache_path=cache_path,
instruments=instruments,
fields=fields,
freq=freq,
cache_path=cache_path, instruments=instruments, fields=fields, freq=freq
)
if not features.empty:
features = features.sort_index().loc(axis=0)[:, start_time:end_time]
return features
def _dataset_uri(
self,
instruments,
fields,
start_time=None,
end_time=None,
freq="day",
disk_cache=0,
):
def _dataset_uri(self, instruments, fields, start_time=None, end_time=None, freq="day", disk_cache=0):
if disk_cache == 0:
# In this case, server only checks the expression cache.
# The client will load the cache data by itself.
@@ -703,12 +659,7 @@ class DiskDatasetCache(DatasetCache):
return ""
_cache_uri = self._uri(
instruments=instruments,
fields=fields,
start_time=None,
end_time=None,
freq=freq,
disk_cache=disk_cache,
instruments=instruments, fields=fields, start_time=None, end_time=None, freq=freq, disk_cache=disk_cache
)
cache_path = os.path.join(self.dtst_cache_path, _cache_uri)
@@ -720,12 +671,7 @@ class DiskDatasetCache(DatasetCache):
else:
# cache unavailable, generate the cache
with CacheUtils.writer_lock(self.r, "dataset-%s" % _cache_uri):
self.gen_dataset_cache(
cache_path=cache_path,
instruments=instruments,
fields=fields,
freq=freq,
)
self.gen_dataset_cache(cache_path=cache_path, instruments=instruments, fields=fields, freq=freq)
return _cache_uri
class IndexManager:
@@ -802,16 +748,16 @@ class DiskDatasetCache(DatasetCache):
def gen_dataset_cache(self, cache_path, instruments, fields, freq):
"""gen_dataset_cache
.. note:: This function does not consider the cache read write lock. Please
.. note:: This function does not consider the cache read write lock. Please
Aquire the lock outside this function
The format the cache contains 3 parts(followed by typical filename).
- index : cache/d41366901e25de3ec47297f12e2ba11d.index
- The content of the file may be in following format(pandas.Series)
.. code-block:: python
start end
1999-11-10 00:00:00 0 1
1999-11-11 00:00:00 1 2
@@ -819,7 +765,7 @@ class DiskDatasetCache(DatasetCache):
...
.. note:: The start is closed. The end is open!!!!!
- Each line contains two element <timestamp, end_index>
- It indicates the `end_index` of the data for `timestamp`
@@ -953,11 +899,7 @@ class DiskDatasetCache(DatasetCache):
)
data = self.provider.dataset(
instruments,
fields,
whole_calendar[current_index - rm_n_period],
new_calendar[-1],
freq,
instruments, fields, whole_calendar[current_index - rm_n_period], new_calendar[-1], freq
)
if not data.empty:
@@ -1010,32 +952,15 @@ class SimpleDatasetCache(DatasetCache):
def _uri(self, instruments, fields, start_time, end_time, freq, disk_cache=1, **kwargs):
instruments, fields, freq = self.normalize_uri_args(instruments, fields, freq)
local_cache_path = str(Path(self.local_cache_path).expanduser().resolve())
return hash_args(
instruments,
fields,
start_time,
end_time,
freq,
disk_cache,
local_cache_path,
)
return hash_args(instruments, fields, start_time, end_time, freq, disk_cache, local_cache_path)
def _dataset(
self,
instruments,
fields,
start_time=None,
end_time=None,
freq="day",
disk_cache=1,
):
def _dataset(self, instruments, fields, start_time=None, end_time=None, freq="day", disk_cache=1):
if disk_cache == 0:
# In this case, data_set cache is configured but will not be used.
return self.provider.dataset(instruments, fields, start_time, end_time, freq)
os.makedirs(os.path.expanduser(self.local_cache_path), exist_ok=True)
cache_file = os.path.join(
self.local_cache_path,
self._uri(instruments, fields, start_time, end_time, freq, disk_cache=disk_cache),
self.local_cache_path, self._uri(instruments, fields, start_time, end_time, freq, disk_cache=disk_cache)
)
gen_flag = False
@@ -1065,15 +990,7 @@ class DatasetURICache(DatasetCache):
def _uri(self, instruments, fields, start_time, end_time, freq, disk_cache=1, **kwargs):
return hash_args(*self.normalize_uri_args(instruments, fields, freq), disk_cache)
def dataset(
self,
instruments,
fields,
start_time=None,
end_time=None,
freq="day",
disk_cache=0,
):
def dataset(self, instruments, fields, start_time=None, end_time=None, freq="day", disk_cache=0):
if "local" in C.dataset_provider.lower():
# use LocalDatasetProvider
@@ -1081,15 +998,7 @@ class DatasetURICache(DatasetCache):
if disk_cache == 0:
# do not use data_set cache, load data from remote expression cache directly
return self.provider.dataset(
instruments,
fields,
start_time,
end_time,
freq,
disk_cache,
return_uri=False,
)
return self.provider.dataset(instruments, fields, start_time, end_time, freq, disk_cache, return_uri=False)
# use ClientDatasetProvider
feature_uri = self._uri(instruments, fields, None, None, freq, disk_cache=disk_cache)
@@ -1097,13 +1006,7 @@ class DatasetURICache(DatasetCache):
mnt_feature_uri = os.path.join(C.mount_path, C.dataset_cache_dir_name, feature_uri)
if value is None or expire or not os.path.exists(mnt_feature_uri):
df, uri = self.provider.dataset(
instruments,
fields,
start_time,
end_time,
freq,
disk_cache,
return_uri=True,
instruments, fields, start_time, end_time, freq, disk_cache, return_uri=True
)
# cache uri
MemCacheExpire.set_cache(H["f"], uri, uri)

View File

@@ -407,11 +407,11 @@ class DatasetProvider(object):
normalize_column_names = normalize_cache_fields(column_names)
data = dict()
# One process for one task, so that the memory will be freed quicker.
workers = min(C.kernels, len(instruments_d))
if C.maxtasksperchild is None:
p = Pool(processes=C.kernels)
p = Pool(processes=workers)
else:
p = Pool(processes=C.kernels, maxtasksperchild=C.maxtasksperchild)
p = Pool(processes=workers, maxtasksperchild=C.maxtasksperchild)
if isinstance(instruments_d, dict):
for inst, spans in instruments_d.items():
data[inst] = p.apply_async(
@@ -718,11 +718,11 @@ class LocalDatasetProvider(DatasetProvider):
return
start_time = cal[0]
end_time = cal[-1]
workers = min(C.kernels, len(instruments_d))
if C.maxtasksperchild is None:
p = Pool(processes=C.kernels)
p = Pool(processes=workers)
else:
p = Pool(processes=C.kernels, maxtasksperchild=C.maxtasksperchild)
p = Pool(processes=workers, maxtasksperchild=C.maxtasksperchild)
for inst in instruments_d:
p.apply_async(