mirror of
https://github.com/microsoft/qlib.git
synced 2026-07-04 11:30:57 +08:00
Merge remote-tracking branch 'qlib/main' into qlib_register_ops
This commit is contained in:
@@ -13,6 +13,7 @@ import pickle
|
||||
import traceback
|
||||
import redis_lock
|
||||
import contextlib
|
||||
import abc
|
||||
from pathlib import Path
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
@@ -39,36 +40,100 @@ class QlibCacheException(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class MemCacheUnit(OrderedDict):
|
||||
class MemCacheUnit(abc.ABC):
|
||||
"""Memory Cache Unit."""
|
||||
|
||||
# TODO: use min_heap to replace ordereddict for better performance
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.size_limit = kwargs.pop("size_limit", None)
|
||||
# limit_type: check size_limit type, length(call fun: len) or size(call fun: sys.getsizeof)
|
||||
self.limit_type = kwargs.pop("limit_type", "length")
|
||||
super(MemCacheUnit, self).__init__(*args, **kwargs)
|
||||
self._check_size_limit()
|
||||
self.size_limit = kwargs.pop("size_limit", 0)
|
||||
self._size = 0
|
||||
self.od = OrderedDict()
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
super(MemCacheUnit, self).__setitem__(key, value)
|
||||
self._check_size_limit()
|
||||
# TODO: thread safe?__setitem__ failure might cause inconsistent size?
|
||||
|
||||
def __getitem__(self, key):
|
||||
value = super(MemCacheUnit, self).__getitem__(key)
|
||||
super(MemCacheUnit, self).__delitem__(key)
|
||||
super(MemCacheUnit, self).__setitem__(key, value)
|
||||
return value
|
||||
# precalculate the size after od.__setitem__
|
||||
self._adjust_size(key, value)
|
||||
|
||||
def _check_size_limit(self):
|
||||
if self.size_limit is not None:
|
||||
get_cur_size = lambda x: len(x) if self.limit_type == "length" else sum(map(sys.getsizeof, x.values()))
|
||||
while get_cur_size(self) > self.size_limit:
|
||||
self.od.__setitem__(key, value)
|
||||
|
||||
# move the key to end,make it latest
|
||||
self.od.move_to_end(key)
|
||||
|
||||
if self.limited:
|
||||
# pop the oldest items beyond size limit
|
||||
while self._size > self.size_limit:
|
||||
self.popitem(last=False)
|
||||
|
||||
def __getitem__(self, key):
|
||||
v = self.od.__getitem__(key)
|
||||
self.od.move_to_end(key)
|
||||
return v
|
||||
|
||||
class MemCache(object):
|
||||
def __contains__(self, key):
|
||||
return key in self.od
|
||||
|
||||
def __len__(self):
|
||||
return self.od.__len__()
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.__class__.__name__}<size_limit:{self.size_limit if self.limited else 'no limit'} total_size:{self._size}>\n{self.od.__repr__()}"
|
||||
|
||||
def set_limit_size(self, limit):
|
||||
self.size_limit = limit
|
||||
|
||||
@property
|
||||
def limited(self):
|
||||
"""whether memory cache is limited"""
|
||||
return self.size_limit > 0
|
||||
|
||||
@property
|
||||
def total_size(self):
|
||||
return self._size
|
||||
|
||||
def clear(self):
|
||||
self._size = 0
|
||||
self.od.clear()
|
||||
|
||||
def popitem(self, last=True):
|
||||
k, v = self.od.popitem(last=last)
|
||||
self._size -= self._get_value_size(v)
|
||||
|
||||
return k, v
|
||||
|
||||
def pop(self, key):
|
||||
v = self.od.pop(key)
|
||||
self._size -= self._get_value_size(v)
|
||||
|
||||
return v
|
||||
|
||||
def _adjust_size(self, key, value):
|
||||
if key in self.od:
|
||||
self._size -= self._get_value_size(self.od[key])
|
||||
|
||||
self._size += self._get_value_size(value)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _get_value_size(self, value):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class MemCacheLengthUnit(MemCacheUnit):
|
||||
def __init__(self, size_limit=0):
|
||||
super().__init__(size_limit=size_limit)
|
||||
|
||||
def _get_value_size(self, value):
|
||||
return 1
|
||||
|
||||
|
||||
class MemCacheSizeofUnit(MemCacheUnit):
|
||||
def __init__(self, size_limit=0):
|
||||
super().__init__(size_limit=size_limit)
|
||||
|
||||
def _get_value_size(self, value):
|
||||
return sys.getsizeof(value)
|
||||
|
||||
|
||||
class MemCache:
|
||||
"""Memory cache."""
|
||||
|
||||
def __init__(self, mem_cache_size_limit=None, limit_type="length"):
|
||||
@@ -79,21 +144,19 @@ class MemCache(object):
|
||||
mem_cache_size_limit: cache max size.
|
||||
limit_type: length or sizeof; length(call fun: len), size(call fun: sys.getsizeof).
|
||||
"""
|
||||
if limit_type not in ["length", "sizeof"]:
|
||||
|
||||
size_limit = C.mem_cache_size_limit if mem_cache_size_limit is None else mem_cache_size_limit
|
||||
|
||||
if limit_type == "length":
|
||||
klass = MemCacheLengthUnit
|
||||
elif limit_type == "sizeof":
|
||||
klass = MemCacheSizeofUnit
|
||||
else:
|
||||
raise ValueError(f"limit_type must be length or sizeof, your limit_type is {limit_type}")
|
||||
|
||||
self.__calendar_mem_cache = MemCacheUnit(
|
||||
size_limit=C.mem_cache_size_limit if mem_cache_size_limit is None else mem_cache_size_limit,
|
||||
limit_type=limit_type,
|
||||
)
|
||||
self.__instrument_mem_cache = MemCacheUnit(
|
||||
size_limit=C.mem_cache_size_limit if mem_cache_size_limit is None else mem_cache_size_limit,
|
||||
limit_type=limit_type,
|
||||
)
|
||||
self.__feature_mem_cache = MemCacheUnit(
|
||||
size_limit=C.mem_cache_size_limit if mem_cache_size_limit is None else mem_cache_size_limit,
|
||||
limit_type=limit_type,
|
||||
)
|
||||
self.__calendar_mem_cache = klass(size_limit)
|
||||
self.__instrument_mem_cache = klass(size_limit)
|
||||
self.__feature_mem_cache = klass(size_limit)
|
||||
|
||||
def __getitem__(self, key):
|
||||
if key == "c":
|
||||
@@ -140,7 +203,7 @@ class MemCacheExpire:
|
||||
return value, expire
|
||||
|
||||
|
||||
class CacheUtils(object):
|
||||
class CacheUtils:
|
||||
LOCK_ID = "QLIB"
|
||||
|
||||
@staticmethod
|
||||
@@ -224,7 +287,7 @@ class CacheUtils(object):
|
||||
current_cache_wlock.release()
|
||||
|
||||
|
||||
class BaseProviderCache(object):
|
||||
class BaseProviderCache:
|
||||
"""Provider cache base class"""
|
||||
|
||||
def __init__(self, provider):
|
||||
|
||||
Reference in New Issue
Block a user