From ae0e0eca3d0911ca654efb66172483a245683bf8 Mon Sep 17 00:00:00 2001 From: hadrianl <137150224@qq.com> Date: Mon, 21 Dec 2020 18:17:20 +0800 Subject: [PATCH] better MemCacheUnit implement --- qlib/data/cache.py | 157 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 125 insertions(+), 32 deletions(-) diff --git a/qlib/data/cache.py b/qlib/data/cache.py index d53f578b1..a652018ff 100644 --- a/qlib/data/cache.py +++ b/qlib/data/cache.py @@ -13,6 +13,7 @@ import pickle import traceback import redis_lock import contextlib +import abc from pathlib import Path import numpy as np import pandas as pd @@ -39,33 +40,127 @@ class QlibCacheException(RuntimeError): pass -class MemCacheUnit(OrderedDict): +class MemCacheUnit(abc.ABC): """Memory Cache Unit.""" - # TODO: use min_heap to replace ordereddict for better performance - def __init__(self, *args, **kwargs): - self.size_limit = kwargs.pop("size_limit", None) - # limit_type: check size_limit type, length(call fun: len) or size(call fun: sys.getsizeof) + self.size_limit = kwargs.pop("size_limit", 0) self.limit_type = kwargs.pop("limit_type", "length") - super(MemCacheUnit, self).__init__(*args, **kwargs) - self._check_size_limit() + + assert self.limit_type in ["length", "sizeof"], ValueError( + "limit_type shoule be one of ['length', 'sizeof']" + ) + assert self.size_limit >= 0, ValueError( + "size_limit shoule not be negative.The default 0 means unlimited!" + ) + + # limit_flag: whether to popitem or not + self._limit_flag = 1 if self.size_limit > 0 else 0 + self._size = 0 + self.od = OrderedDict() def __setitem__(self, key, value): - super(MemCacheUnit, self).__setitem__(key, value) - self._check_size_limit() + # TODO: thread safe?__setitem__ failure might cause inconsistent size? + + # precalculate the size after od.__setitem__ + self._adjust_size(key, value) + + self.od.__setitem__(key, value) + + # move the key to end,make it latest + self.od.move_to_end(key) + + # pop the oldest items beyond size limit + while self._size * self._limit_flag > self.size_limit: + self.popitem(last=False) def __getitem__(self, key): - value = super(MemCacheUnit, self).__getitem__(key) - super(MemCacheUnit, self).__delitem__(key) - super(MemCacheUnit, self).__setitem__(key, value) - return value + v = self.od.__getitem__(key) + self.od.move_to_end(key) + return v - def _check_size_limit(self): - if self.size_limit is not None: - get_cur_size = lambda x: len(x) if self.limit_type == "length" else sum(map(sys.getsizeof, x.values())) - while get_cur_size(self) > self.size_limit: - self.popitem(last=False) + def __contains__(self, key): + return key in self.od + + def __len__(self): + return self.od.__len__() + + def __repr__(self): + return f"{self.__class__.__name__}\n{self.od.__repr__()}" + + def set_limit_size(self, limit): + self.size_limit = limit + + def set_limit(self, is_limited=True): + self._limit_flag = 1 if is_limited else 0 + + @property + def is_limited(self): + return bool(self._limit_flag) + + @property + def total_size(self): + return self._size + + def clear(self): + self._size = 0 + self.od.clear() + + @abc.abstractmethod + def _adjust_size(self, key, value): + raise NotImplementedError + + @abc.abstractmethod + def popitem(self, last=True): + raise NotImplementedError + + @abc.abstractmethod + def pop(self, key): + raise NotImplementedError + + +class MemCacheLengthUnit(MemCacheUnit): + def __init__(self, size_limit=0): + super().__init__(size_limit=size_limit, limit_type="length") + + def _adjust_size(self, key, value): + if key not in self.od: + self._size += 1 + + def popitem(self, last=True): + k, v = self.od.popitem(last=last) + self._size -= 1 + + return k, v + + def pop(self, key): + v = self.od.pop(key) + self._size -= 1 + + return v + + +class MemCacheSizeofUnit(MemCacheUnit): + def __init__(self, size_limit=0): + super().__init__(size_limit=size_limit, limit_type="sizeof") + + def _adjust_size(self, key, value): + if key in self.od: + self._size = self._size - sys.getsizeof(self.od[key]) + sys.getsizeof(value) + else: + self._size += sys.getsizeof(value) + + def popitem(self, last=True): + k, v = self.od.popitem(last=last) + self._size -= sys.getsizeof(v) + + return k, v + + def pop(self, key): + v = self.od.pop(key) + self._size -= sys.getsizeof(v) + + return v class MemCache: @@ -79,22 +174,20 @@ class MemCache: mem_cache_size_limit: cache max size. limit_type: length or sizeof; length(call fun: len), size(call fun: sys.getsizeof). """ - if limit_type not in ["length", "sizeof"]: + + size_limit = C.mem_cache_size_limit if mem_cache_size_limit is None else mem_cache_size_limit + + if limit_type == "length": + self.__calendar_mem_cache = MemCacheLengthUnit(size_limit) + self.__instrument_mem_cache = MemCacheLengthUnit(size_limit) + self.__feature_mem_cache = MemCacheLengthUnit(size_limit) + elif limit_type == "sizeof": + self.__calendar_mem_cache = MemCacheSizeofUnit(size_limit) + self.__instrument_mem_cache = MemCacheSizeofUnit(size_limit) + self.__feature_mem_cache = MemCacheSizeofUnit(size_limit) + else: raise ValueError(f"limit_type must be length or sizeof, your limit_type is {limit_type}") - self.__calendar_mem_cache = MemCacheUnit( - size_limit=C.mem_cache_size_limit if mem_cache_size_limit is None else mem_cache_size_limit, - limit_type=limit_type, - ) - self.__instrument_mem_cache = MemCacheUnit( - size_limit=C.mem_cache_size_limit if mem_cache_size_limit is None else mem_cache_size_limit, - limit_type=limit_type, - ) - self.__feature_mem_cache = MemCacheUnit( - size_limit=C.mem_cache_size_limit if mem_cache_size_limit is None else mem_cache_size_limit, - limit_type=limit_type, - ) - def __getitem__(self, key): if key == "c": return self.__calendar_mem_cache