API参考
在这里,您可以找到所有``Qlib``接口。
数据
提供者
- class qlib.data.data.ProviderBackendMixin
此辅助类旨在使基于存储后端的提供程序更方便。如果该提供程序不依赖于后端存储,则不必继承此类。
- class qlib.data.data.CalendarProvider
日历提供者基类
提供日历数据。
- calendar(start_time=None, end_time=None, freq='day', future=False)
获取特定市场在给定时间范围内的日历。
- 参数:
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
freq (str) -- 时间频率,可用:年/季度/月/周/日。
future (bool) -- 是否包括未来交易日。
- 返回:
日历列表
- 返回类型:
list
- locate_index(start_time: Timestamp | str, end_time: Timestamp | str, freq: str, future: bool = False)
在某个频率下的日历中定位开始时间索引和结束时间索引。
- 参数:
start_time (pd.Timestamp) -- 时间范围的开始。
end_time (pd.Timestamp) -- 时间范围的结束。
freq (str) -- 时间频率,可用:年/季度/月/周/日。
future (bool) -- 是否包括未来交易日。
- 返回:
pd.Timestamp -- 实际开始时间。
pd.Timestamp -- 实际结束时间。
int -- 开始时间的索引。
int -- 结束时间的索引。
- load_calendar(freq, future)
从文件加载原始日历时间戳。
- 参数:
freq (str) -- 读取日历文件的频率。
future (bool)
- 返回:
时间戳列表
- 返回类型:
list
- class qlib.data.data.InstrumentProvider
仪器提供者基类
提供仪器数据。
- static instruments(market: List | str = 'all', filter_pipe: List | None = None)
获取基础市场的一般配置字典,添加几个动态过滤器。
- 参数:
market (Union[List, str]) -- str: 市场/行业/指数简称,例如 all/sse/szse/sse50/csi300/csi500。list: ["ID1", "ID2"]。股票列表
filter_pipe (list) -- 动态过滤器列表。
- 返回:
dict (if isinstance(market, str)) -- 股票池配置的字典。
{market => 基础市场名称, filter_pipe => 过滤器列表}
例子:
{'market': 'csi500', 'filter_pipe': [{'filter_type': 'ExpressionDFilter', 'rule_expression': '$open<40', 'filter_start_time': None, 'filter_end_time': None, 'keep': False}, {'filter_type': 'NameDFilter', 'name_rule_re': 'SH[0-9]{4}55', 'filter_start_time': None, 'filter_end_time': None}]}
list (if isinstance(market, list)) -- 直接返回原始列表。注意:这将使工具在更多情况下兼容。用户代码将更简单。
- abstract list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)
根据特定的股票池配置列出工具。
- 参数:
instruments (dict) -- stockpool 配置。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
as_list (bool) -- 将乐器作为列表或字典返回。
- 返回:
乐器列表或字典及时间跨度
- 返回类型:
dict or list
- class qlib.data.data.FeatureProvider
特性提供者类
提供功能数据。
- abstract feature(instrument, field, start_time, end_time, freq)
获取功能数据。
- 参数:
instrument (str) -- 某种仪器。
field (str) -- 某个特定领域的特征。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
freq (str) -- 时间频率,可用:年/季度/月/周/日。
- 返回:
某一特征的数据
- 返回类型:
pd.Series
- class qlib.data.data.PITProvider
- abstract period_feature(instrument, field, start_index: int, end_index: int, cur_time: Timestamp, period: int | None = None) Series
获取介于 start_index 和 end_index 之间的历史时期数据序列
- 参数:
start_index (int) -- start_index 是相对于 cur_time 的最新周期的相对索引。
end_index (int) -- end_index 通常是相对于 cur_time 的最新周期的索引,在大多数情况下,start_index 和 end_index 将是非正值。例如,start_index == -3,end_index == 0,当前周期索引为 cur_idx,则将检索 [start_index + cur_idx, end_index + cur_idx] 之间的数据。
period (int) -- 用于查询特定时期。时期在Qlib中用整数表示。(例如,202001可能代表2020年的第一季度)注意:period`将覆盖`start_index`和`end_index。
- 返回:
索引将是整数,用于指示数据的周期。一个典型的例子是 TODO
- 返回类型:
pd.Series
- 抛出:
FileNotFoundError -- 如果查询的数据不存在,将引发此异常。
- class qlib.data.data.ExpressionProvider
表达式提供者类
提供表达数据。
- __init__()
- abstract expression(instrument, field, start_time=None, end_time=None, freq='day') Series
获取表达数据。
expression 的责任 - 解析 field 并加载相应的数据。- 在加载数据时,应处理数据的时间依赖性。get_expression_instance 通常在此方法中使用。
- 参数:
instrument (str) -- 某种仪器。
field (str) -- 某个特定领域的特征。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
freq (str) -- 时间频率,可用:年/季度/月/周/日。
- 返回:
某个表达式的数据 数据有两种格式 1) 带有日期时间索引的表达式 2) 带有整数索引的表达式 - 因为日期时间不如
- 返回类型:
pd.Series
- class qlib.data.data.DatasetProvider
数据集提供者类
提供数据集数据。
- abstract dataset(instruments, fields, start_time=None, end_time=None, freq='day', inst_processors=[])
获取数据集数据。
- 参数:
instruments (list or dict) -- 乐器列表/字典或股票池配置字典。
fields (list) -- 特征实例列表。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
freq (str) -- 时间频率。
inst_processors (Iterable[Union[dict, InstProcessor]]) -- 每个仪器上执行的操作
- 返回:
一个带有<instrument, datetime>索引的pandas数据框。
- 返回类型:
pd.DataFrame
- static get_instruments_d(instruments, freq)
解析不同类型的输入工具以输出instruments_d。错误格式的输入工具将导致异常。
- static get_column_names(fields)
从输入字段获取列名
- static dataset_processor(instruments_d, column_names, start_time, end_time, freq, inst_processors=[])
加载和处理数据,返回数据集。- 默认使用多核方法。
- static inst_calculator(inst, start_time, end_time, freq, column_names, spans=None, g_config=None, inst_processors=[])
计算**一个**仪器的表达式,返回一个df结果。如果表达式之前已经计算过,则从缓存中加载。
返回值:一个数据框,索引为“datetime”,以及其他数据列。
- class qlib.data.data.LocalCalendarProvider(remote=False, backend={})
本地日历数据提供者类
从本地数据源提供日历数据。
- __init__(remote=False, backend={})
- load_calendar(freq, future)
从文件加载原始日历时间戳。
- 参数:
freq (str) -- 读取日历文件的频率。
future (bool)
- 返回:
时间戳列表
- 返回类型:
list
- class qlib.data.data.LocalInstrumentProvider(backend={})
本地仪器数据提供者类
从本地数据源提供仪器数据。
- __init__(backend={}) None
- list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)
根据特定的股票池配置列出工具。
- 参数:
instruments (dict) -- stockpool 配置。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
as_list (bool) -- 将乐器作为列表或字典返回。
- 返回:
乐器列表或字典及时间跨度
- 返回类型:
dict or list
- class qlib.data.data.LocalFeatureProvider(remote=False, backend={})
本地特征数据提供者类
从本地数据源提供特征数据。
- __init__(remote=False, backend={})
- feature(instrument, field, start_index, end_index, freq)
获取功能数据。
- 参数:
instrument (str) -- 某种仪器。
field (str) -- 某个特定领域的特征。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
freq (str) -- 时间频率,可用:年/季度/月/周/日。
- 返回:
某一特征的数据
- 返回类型:
pd.Series
- class qlib.data.data.LocalPITProvider
- period_feature(instrument, field, start_index, end_index, cur_time, period=None)
获取介于 start_index 和 end_index 之间的历史时期数据序列
- 参数:
start_index (int) -- start_index 是相对于 cur_time 的最新周期的相对索引。
end_index (int) -- end_index 通常是相对于 cur_time 的最新周期的索引,在大多数情况下,start_index 和 end_index 将是非正值。例如,start_index == -3,end_index == 0,当前周期索引为 cur_idx,则将检索 [start_index + cur_idx, end_index + cur_idx] 之间的数据。
period (int) -- 用于查询特定时期。时期在Qlib中用整数表示。(例如,202001可能代表2020年的第一季度)注意:period`将覆盖`start_index`和`end_index。
- 返回:
索引将是整数,用于指示数据的周期。一个典型的例子是 TODO
- 返回类型:
pd.Series
- 抛出:
FileNotFoundError -- 如果查询的数据不存在,将引发此异常。
- class qlib.data.data.LocalExpressionProvider(time2idx=True)
本地表达式数据提供者类
从本地数据源提供表达式数据。
- __init__(time2idx=True)
- expression(instrument, field, start_time=None, end_time=None, freq='day')
获取表达数据。
expression 的责任 - 解析 field 并加载相应的数据。- 在加载数据时,应处理数据的时间依赖性。get_expression_instance 通常在此方法中使用。
- 参数:
instrument (str) -- 某种仪器。
field (str) -- 某个特定领域的特征。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
freq (str) -- 时间频率,可用:年/季度/月/周/日。
- 返回:
某个表达式的数据 数据有两种格式 1) 带有日期时间索引的表达式 2) 带有整数索引的表达式 - 因为日期时间不如
- 返回类型:
pd.Series
- class qlib.data.data.LocalDatasetProvider(align_time: bool = True)
本地数据集数据提供者类
从本地数据源提供数据集数据。
- __init__(align_time: bool = True)
- 参数:
align_time (bool) -- 我们是否会将时间与日历对齐,因为在某些数据集中频率是灵活的,无法对齐。对于具有固定频率且共享日历的数据,将数据与日历对齐将提供以下好处 - 将查询对齐到相同参数,以便缓存可以共享。
- dataset(instruments, fields, start_time=None, end_time=None, freq='day', inst_processors=[])
获取数据集数据。
- 参数:
instruments (list or dict) -- 乐器列表/字典或股票池配置字典。
fields (list) -- 特征实例列表。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
freq (str) -- 时间频率。
inst_processors (Iterable[Union[dict, InstProcessor]]) -- 每个仪器上执行的操作
- 返回:
一个带有<instrument, datetime>索引的pandas数据框。
- 返回类型:
pd.DataFrame
- static multi_cache_walker(instruments, fields, start_time=None, end_time=None, freq='day')
此方法用于为客户端准备表达式缓存。然后客户端将自行从表达式缓存中加载数据。
- static cache_walker(inst, start_time, end_time, freq, column_names)
如果某个仪器的表达式之前没有被计算过,请计算并将其写入表达式缓存。
- class qlib.data.data.ClientCalendarProvider
客户端日历数据提供者类
作为客户端,通过请求服务器来提供日历数据。
- __init__()
- calendar(start_time=None, end_time=None, freq='day', future=False)
获取特定市场在给定时间范围内的日历。
- 参数:
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
freq (str) -- 时间频率,可用:年/季度/月/周/日。
future (bool) -- 是否包括未来交易日。
- 返回:
日历列表
- 返回类型:
list
- class qlib.data.data.ClientInstrumentProvider
客户端仪器数据提供者类
作为客户端,通过向服务器请求数据来提供仪器数据。
- __init__()
- list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)
根据特定的股票池配置列出工具。
- 参数:
instruments (dict) -- stockpool 配置。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
as_list (bool) -- 将乐器作为列表或字典返回。
- 返回:
乐器列表或字典及时间跨度
- 返回类型:
dict or list
- class qlib.data.data.ClientDatasetProvider
客户端数据集数据提供者类
作为客户端,通过向服务器请求数据来提供数据集。
- __init__()
- dataset(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=0, return_uri=False, inst_processors=[])
获取数据集数据。
- 参数:
instruments (list or dict) -- 乐器列表/字典或股票池配置字典。
fields (list) -- 特征实例列表。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
freq (str) -- 时间频率。
inst_processors (Iterable[Union[dict, InstProcessor]]) -- 每个仪器上执行的操作
- 返回:
一个带有<instrument, datetime>索引的pandas数据框。
- 返回类型:
pd.DataFrame
- class qlib.data.data.BaseProvider
本地提供者类是一组接口,允许用户访问数据。由于PITD未公开向用户展示,因此未包含在接口中。
与旧的 qlib 提供程序保持兼容。
- features(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=None, inst_processors=[])
- 参数:
disk_cache (int) -- 是否跳过(0)/使用(1)/替换(2)磁盘缓存
此函数将尝试使用具有关键字 disk_cache 的缓存方法,如果由于 DatasetD 实例是提供者类而引发类型错误,则将使用提供者方法。
- class qlib.data.data.LocalProvider
- features_uri(instruments, fields, start_time, end_time, freq, disk_cache=1)
返回生成的特征/数据集缓存的URI
- 参数:
disk_cache
instruments
fields
start_time
end_time
freq
- class qlib.data.data.ClientProvider
客户端提供者
作为客户端请求服务器数据。可以提出请求:
日历:直接响应日历列表
仪器(无过滤器):直接响应一个仪器列表/字典
仪器(带过滤器):返回仪器的列表/字典
功能:响应缓存URI
一般工作流程描述如下:当用户使用客户端提供者提出请求时,客户端提供者将连接服务器并发送请求。客户端将开始等待响应。响应将立即生成,指示缓存是否可用。等待过程只有在客户端收到响应并且`feature_available`为真时才会终止。BUG:每次我们请求某些数据时都需要连接到服务器,等待响应并断开连接。我们无法在一个连接中进行一系列请求。您可以参考https://python-socketio.readthedocs.io/en/latest/client.html获取python-socketIO客户端的文档。
- __init__()
- qlib.data.data.register_all_wrappers(C)
过滤器
- class qlib.data.filter.BaseDFilter
动态仪器过滤器抽象类
用户可以重写此类以构建自己的过滤器
重写__init__以输入过滤规则
重写filter_main以使用规则过滤仪器
- __init__()
- static from_config(config)
从配置字典构建实例。
- 参数:
config (dict) -- 配置参数字典。
- abstract to_config()
从配置字典构建实例。
- 返回:
返回配置参数字典。
- 返回类型:
dict
- class qlib.data.filter.SeriesDFilter(fstart_time=None, fend_time=None, keep=False)
动态仪器过滤器抽象类,用于过滤一系列特定特征
过滤器应提供参数:
过滤开始时间
过滤结束时间
过滤规则
重写__init__以分配特定规则来过滤系列。
重写_getFilterSeries以使用规则过滤系列并获取{inst => series}的字典,或重写filter_main以获得更高级的系列过滤规则
- __init__(fstart_time=None, fend_time=None, keep=False)
- 过滤器基类的初始化函数。
根据fstart_time和fend_time指定的特定规则在特定时间段内过滤一组仪器。
- 参数:
fstart_time (str) -- 过滤规则开始过滤仪器的时间。
fend_time (str) -- 停止过滤乐器的过滤规则的时间。
keep (bool) -- 是否保留在过滤时间范围内不存在特征的乐器。
- filter_main(instruments, start_time=None, end_time=None)
实现此方法以过滤乐器。
- 参数:
instruments (dict) -- 输入要过滤的乐器。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
- 返回:
过滤后的乐器,与输入乐器结构相同。
- 返回类型:
dict
- class qlib.data.filter.NameDFilter(name_rule_re, fstart_time=None, fend_time=None)
命名动态乐器过滤器
根据规定的名称格式过滤乐器。
需要一个名称规则的正则表达式。
- __init__(name_rule_re, fstart_time=None, fend_time=None)
名称过滤器类的初始化函数
- 参数:
name_rule_re (str) -- 名称规则的正则表达式。
- static from_config(config)
从配置字典构建实例。
- 参数:
config (dict) -- 配置参数字典。
- to_config()
从配置字典构建实例。
- 返回:
返回配置参数字典。
- 返回类型:
dict
- class qlib.data.filter.ExpressionDFilter(rule_expression, fstart_time=None, fend_time=None, keep=False)
表达动态仪器滤波器
根据特定表达式筛选仪器。
需要一个指示特定特征字段的表达规则。
示例
基本特征过滤器 : 规则表达式 = '$close/$open>5'
横截面特征过滤器 : 规则表达式 = '$rank($close)<10'
时间序列特征过滤器:rule_expression = '$Ref($close, 3)>100'
- __init__(rule_expression, fstart_time=None, fend_time=None, keep=False)
表达式过滤器类的初始化函数
- 参数:
fstart_time (str) -- filter the feature starting from this time.
fend_time (str) -- 在此时间之前过滤结束的功能。
rule_expression (str) -- 规则的输入表达式。
- static from_config(config)
从配置字典构建实例。
- 参数:
config (dict) -- 配置参数字典。
- to_config()
从配置字典构建实例。
- 返回:
返回配置参数字典。
- 返回类型:
dict
类
- class qlib.data.base.Expression
表达式基类
Expression旨在处理以下格式的数据计算,每个仪器的数据具有二维。
特征
时间:可以是观测时间或周期时间。
期间时间是为时间点数据库设计的。例如,期间时间可能是2014Q4,其值可以被多次观察(由于修订,不同时间可能观察到不同的值)。
- load(instrument, start_index, end_index, *args)
加载功能 此功能负责根据表达式引擎加载特征/表达式。
具体实现将分为两个部分:
缓存数据,处理错误。
这一部分由所有表达式共享,并在表达式中实现
根据特定表达式处理和计算数据。
这一部分在每个表达式中不同,并在每个表达式中实现
表达式引擎由不同的数据共享。不同的数据将为 args 提供不同的额外信息。
- 参数:
instrument (str) -- 仪器代码。
start_index (str) -- 特征开始索引 [在日历中]。
end_index (str) -- 特征结束索引 [在日历中]。
information (*args may contain following)
data (2) if is used in PIT) -- 频率: str 特征频率。
arguments (it contains following) -- 频率: str 特征频率。
data -- cur_pit: 它是为时间点数据设计的。 period: int 这用于查询特定的时间段。 在Qlib中,时间段用int表示。(例如,202001可能代表2020年的第一季度)
arguments -- cur_pit: 它是为时间点数据设计的。 period: int 这用于查询特定的时间段。 在Qlib中,时间段用int表示。(例如,202001可能代表2020年的第一季度)
- 返回:
特征系列: 系列的索引是日历索引
- 返回类型:
pd.Series
- abstract get_longest_back_rolling()
获取特征访问的历史数据的最长长度
这旨在首先获取计算特定范围内特征所需的数据范围。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需的历史数据长度。
- abstract get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内,我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的*叶特征*。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- class qlib.data.base.Feature(name=None)
静态表达式
这种特征将从提供者加载数据
- __init__(name=None)
- get_longest_back_rolling()
获取特征访问的历史数据的最长长度
这旨在首先获取计算特定范围内特征所需的数据范围。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需的历史数据长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内,我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的*叶特征*。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- class qlib.data.base.PFeature(name=None)
- class qlib.data.base.ExpressionOps
操作符表达式
这种特征将使用操作符进行动态特征构建。
操作符
- class qlib.data.ops.ElemOperator(feature)
逐元素操作符
- 参数:
feature (Expression) -- 特征实例
- 返回:
特征操作输出
- 返回类型:
- __init__(feature)
- get_longest_back_rolling()
获取特征访问的历史数据的最长长度
这旨在首先获取计算特定范围内特征所需的数据范围。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需的历史数据长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内,我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的*叶特征*。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- class qlib.data.ops.ChangeInstrument(instrument, feature)
更改工具操作员 在某些情况下,可能希望在计算时更改为另一个工具,例如,计算某只股票相对于市场指数的贝塔值。这将需要将特征的计算从股票(原始工具)更改为指数(参考工具):param instrument:即,SH000300(CSI300指数)或^GPSC(SP500指数)。:type instrument:新工具,后续操作应在其上执行。:param feature::type feature:要为新工具计算的特征。
- 返回:
特征操作输出
- 返回类型:
- __init__(instrument, feature)
- load(instrument, start_index, end_index, *args)
加载功能 此功能负责根据表达式引擎加载特征/表达式。
具体实现将分为两个部分:
缓存数据,处理错误。
这一部分由所有表达式共享,并在表达式中实现
根据特定表达式处理和计算数据。
这一部分在每个表达式中不同,并在每个表达式中实现
表达式引擎由不同的数据共享。不同的数据将为 args 提供不同的额外信息。
- 参数:
instrument (str) -- 仪器代码。
start_index (str) -- 特征开始索引 [在日历中]。
end_index (str) -- 特征结束索引 [在日历中]。
information (*args may contain following)
data (2) if is used in PIT) -- 频率: str 特征频率。
arguments (it contains following) -- 频率: str 特征频率。
data -- cur_pit: 它是为时间点数据设计的。 period: int 这用于查询特定的时间段。 在Qlib中,时间段用int表示。(例如,202001可能代表2020年的第一季度)
arguments -- cur_pit: 它是为时间点数据设计的。 period: int 这用于查询特定的时间段。 在Qlib中,时间段用int表示。(例如,202001可能代表2020年的第一季度)
- 返回:
特征系列: 系列的索引是日历索引
- 返回类型:
pd.Series
- class qlib.data.ops.NpElemOperator(feature, func)
Numpy 元素级操作符
- 参数:
feature (Expression) -- 特征实例
func (str) -- numpy 特征操作方法
- 返回:
特征操作输出
- 返回类型:
- __init__(feature, func)
- class qlib.data.ops.Abs(feature)
特征绝对值
- 参数:
feature (Expression) -- 特征实例
- 返回:
具有绝对输出的特征实例
- 返回类型:
- __init__(feature)
- class qlib.data.ops.Sign(feature)
特征符号
- 参数:
feature (Expression) -- 特征实例
- 返回:
具有符号的特征实例
- 返回类型:
- __init__(feature)
- class qlib.data.ops.Log(feature)
特征对数
- 参数:
feature (Expression) -- 特征实例
- 返回:
具有对数的特征实例
- 返回类型:
- __init__(feature)
- class qlib.data.ops.Mask(feature, instrument)
特征掩码
- 参数:
feature (Expression) -- 特征实例
instrument (str) -- 仪器掩码
- 返回:
具有掩码仪器的特征实例
- 返回类型:
- __init__(feature, instrument)
- class qlib.data.ops.Not(feature)
非运算符
- 参数:
feature (Expression) -- 特征实例
- 返回:
特征逐元素非输出
- 返回类型:
- __init__(feature)
- class qlib.data.ops.PairOperator(feature_left, feature_right)
成对运算符
- 参数:
feature_left (Expression) -- 特征实例或数值
feature_right (Expression) -- 特征实例或数值
- 返回:
两个特征的操作输出
- 返回类型:
- __init__(feature_left, feature_right)
- get_longest_back_rolling()
获取特征访问的历史数据的最长长度
这旨在首先获取计算特定范围内特征所需的数据范围。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需的历史数据长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内,我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的*叶特征*。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- class qlib.data.ops.NpPairOperator(feature_left, feature_right, func)
Numpy成对运算符
- 参数:
feature_left (Expression) -- 特征实例或数值
feature_right (Expression) -- 特征实例或数值
func (str) -- 运算符函数
- 返回:
两个特征的操作输出
- 返回类型:
- __init__(feature_left, feature_right, func)
- class qlib.data.ops.Power(feature_left, feature_right)
幂运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
feature_left中的基数提升到feature_right中的指数
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Add(feature_left, feature_right)
加法运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
两个特征的和
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Sub(feature_left, feature_right)
减法运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
两个特征的差
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Mul(feature_left, feature_right)
乘法运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
两个特征的积
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Div(feature_left, feature_right)
除法运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
两个特征的商
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Greater(feature_left, feature_right)
大于运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
从输入的两个特征中提取的较大元素
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Less(feature_left, feature_right)
小于运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
从输入的两个特征中提取的较小元素
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Gt(feature_left, feature_right)
大于运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
布尔系列指示 左 > 右
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Ge(feature_left, feature_right)
大于等于运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
布尔系列指示 左 >= 右
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Lt(feature_left, feature_right)
小于运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
布尔系列指示 左 < 右
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Le(feature_left, feature_right)
小于等于运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
布尔序列指示 left <= right
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Eq(feature_left, feature_right)
等于运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
布尔序列指示 left == right
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Ne(feature_left, feature_right)
不等于运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
布尔序列指示 left != right
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.And(feature_left, feature_right)
与运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
两个特征逐行比较 & 输出
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Or(feature_left, feature_right)
或运算符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- 返回:
两个特征逐行比较 | 输出
- 返回类型:
- __init__(feature_left, feature_right)
- class qlib.data.ops.If(condition, feature_left, feature_right)
如果运算符
- 参数:
condition (Expression) -- 具有布尔值作为条件的特征实例
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
- __init__(condition, feature_left, feature_right)
- get_longest_back_rolling()
获取特征访问的历史数据的最长长度
这旨在首先获取计算特定范围内特征所需的数据范围。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需的历史数据长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内,我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的*叶特征*。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- class qlib.data.ops.Rolling(feature, N, func)
滚动操作符 在pandas中,滚动和扩展的含义是相同的。当窗口设置为0时,操作符的行为应遵循`expanding`,否则遵循`rolling`
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
func (str) -- 滚动方法
- 返回:
滚动输出
- 返回类型:
- __init__(feature, N, func)
- get_longest_back_rolling()
获取特征访问的历史数据的最长长度
这旨在首先获取计算特定范围内特征所需的数据范围。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需的历史数据长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内,我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的*叶特征*。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- class qlib.data.ops.Ref(feature, N)
特征参考
- 参数:
feature (Expression) -- 特征实例
N (int) -- N = 0,检索第一条数据;N > 0,检索N个周期之前的数据;N < 0,未来数据
- 返回:
具有目标参考的特征实例
- 返回类型:
- __init__(feature, N)
- get_longest_back_rolling()
获取特征访问的历史数据的最长长度
这旨在首先获取计算特定范围内特征所需的数据范围。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需的历史数据长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内,我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的*叶特征*。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- class qlib.data.ops.Mean(feature, N)
滚动均值 (MA)
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动平均的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Sum(feature, N)
滚动总和
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动总和的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Std(feature, N)
滚动标准差
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动标准差的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Var(feature, N)
滚动方差
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动方差的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Skew(feature, N)
滚动偏度
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动偏度的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Kurt(feature, N)
滚动峰度
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动峰度的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Max(feature, N)
滚动最大值
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动最大值的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.IdxMax(feature, N)
滚动最大值索引
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动最大值索引的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Min(feature, N)
滚动最小值
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动最小值的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.IdxMin(feature, N)
滚动最小值索引
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动最小值索引的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Quantile(feature, N, qscore)
滚动分位数
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动分位数的特征实例
- 返回类型:
- __init__(feature, N, qscore)
- class qlib.data.ops.Med(feature, N)
滚动中位数
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动中位数的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Mad(feature, N)
滚动平均绝对偏差
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动平均绝对偏差的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Rank(feature, N)
滚动排名(百分位数)
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动排名的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Count(feature, N)
滚动计数
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有非NaN元素数量的滚动计数特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Delta(feature, N)
滚动增量
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有滚动窗口中结束减去开始的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Slope(feature, N)
滚动斜率 此操作符计算 idx 和 feature 之间的斜率。 (例如 [<feature_t1>, <feature_t2>, <feature_t3>] 和 [1, 2, 3])
使用示例: - "Slope($close, %d)/$close"
# TODO: # 一些用户可能希望成对滚动,如 Slope(A, B, N)
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有给定窗口的线性回归斜率的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Rsquare(feature, N)
滚动 R 值平方
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有给定窗口的线性回归 r 值平方的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.Resi(feature, N)
滚动回归残差
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有给定窗口的回归残差的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.WMA(feature, N)
滚动加权移动平均
- 参数:
feature (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有加权移动平均输出的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.EMA(feature, N)
滚动指数移动平均 (EMA)
- 参数:
feature (Expression) -- 特征实例
N (int, float) -- 滚动窗口大小
- 返回:
具有给定窗口的回归 r 值平方的特征实例
- 返回类型:
- __init__(feature, N)
- class qlib.data.ops.PairRolling(feature_left, feature_right, N, func)
配对滚动操作符
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有两个输入特征滚动输出的特征实例
- 返回类型:
- __init__(feature_left, feature_right, N, func)
- get_longest_back_rolling()
获取特征访问的历史数据的最长长度
这旨在首先获取计算特定范围内特征所需的数据范围。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需的历史数据长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内,我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的*叶特征*。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- class qlib.data.ops.Corr(feature_left, feature_right, N)
滚动相关性
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有两个输入特征滚动相关性的特征实例
- 返回类型:
- __init__(feature_left, feature_right, N)
- class qlib.data.ops.Cov(feature_left, feature_right, N)
滚动协方差
- 参数:
feature_left (Expression) -- 特征实例
feature_right (Expression) -- 特征实例
N (int) -- 滚动窗口大小
- 返回:
具有两个输入特征滚动最大值的特征实例
- 返回类型:
- __init__(feature_left, feature_right, N)
- class qlib.data.ops.TResample(feature, freq, func)
- __init__(feature, freq, func)
将数据重新采样到目标频率。使用 pandas 的 resample 函数。
重新采样后时间戳将位于时间跨度的开始。
- 参数:
feature (Expression) -- 计算特征的表达式
freq (str) -- 它将被传递到重采样方法中,以根据给定频率进行重采样
func (method) -- 获取重采样值的方法,一些表达式被频繁使用
- class qlib.data.ops.OpsWrapper
操作包装器
- __init__()
- register(ops_list: List[Type[ExpressionOps] | dict])
注册操作符
- 参数:
ops_list (List[Union[Type[ExpressionOps], dict]]) --
如果类型(ops_list)是 List[Type[ExpressionOps]],则 ops_list 的每个元素表示操作符类,应该是 ExpressionOps 的子类。
如果类型(ops_list)是 List[dict],则 ops_list 的每个元素表示操作符的配置,格式如下:
{ "class": class_name, "module_path": path, }注意:class 应该是操作符的类名,module_path 应该是一个 Python 模块或文件路径。
- qlib.data.ops.register_all_ops(C)
注册所有操作符
缓存
- class qlib.data.cache.MemCacheUnit(*args, **kwargs)
内存缓存单元。
- __init__(*args, **kwargs)
- property limited
内存缓存是否有限
- class qlib.data.cache.MemCache(mem_cache_size_limit=None, limit_type='length')
内存缓存。
- __init__(mem_cache_size_limit=None, limit_type='length')
- 参数:
mem_cache_size_limit -- 缓存最大大小。
limit_type -- 长度或大小;长度(调用函数:len),大小(调用函数:sys.getsizeof)。
- class qlib.data.cache.ExpressionCache(provider)
表达式缓存机制基类。
此类用于包装具有自定义表达式缓存机制的表达式提供者。
备注
重写 _uri 和 _expression 方法以创建您自己的表达式缓存机制。
- expression(instrument, field, start_time, end_time, freq)
获取表达式数据。
备注
与表达式提供者中的 expression 方法相同的接口
- update(cache_uri: str | Path, freq: str = 'day')
将表达式缓存更新为最新日历。
重写此方法以定义如何根据用户自己的缓存机制更新表达式缓存。
- 参数:
cache_uri (str or Path) -- 表达式缓存文件的完整 URI(包括目录路径)。
freq (str)
- 返回:
0(成功更新)/ 1(无需更新)/ 2(更新失败)。
- 返回类型:
int
- class qlib.data.cache.DatasetCache(provider)
数据集缓存机制基类。
此类用于包装具有自定义数据集缓存机制的数据集提供者。
备注
重写 _uri 和 _dataset 方法以创建您自己的数据集缓存机制。
- dataset(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=1, inst_processors=[])
获取特征数据集。
备注
与数据集提供者中的 dataset 方法相同的接口
备注
服务器使用 redis_lock 确保不会触发读写冲突,但客户端读取者不被考虑。
- update(cache_uri: str | Path, freq: str = 'day')
将数据集缓存更新为最新日历。
重写此方法以定义如何根据用户自己的缓存机制更新数据集缓存。
- 参数:
cache_uri (str or Path) -- 数据集缓存文件的完整 URI(包括目录路径)。
freq (str)
- 返回:
0(成功更新)/ 1(无需更新)/ 2(更新失败)
- 返回类型:
int
- static cache_to_origin_data(data, fields)
缓存数据到原始数据
- 参数:
data -- pd.DataFrame,缓存数据。
fields -- 特征字段。
- 返回:
pd.DataFrame。
- static normalize_uri_args(instruments, fields, freq)
规范化 URI 参数
- class qlib.data.cache.DiskExpressionCache(provider, **kwargs)
为服务器准备的缓存机制。
- __init__(provider, **kwargs)
- gen_expression_cache(expression_data, cache_path, instrument, field, freq, last_update)
使用bin文件保存类似特征数据。
- update(sid, cache_uri, freq: str = 'day')
将表达式缓存更新为最新日历。
重写此方法以定义如何根据用户自己的缓存机制更新表达式缓存。
- 参数:
cache_uri (str or Path) -- 表达式缓存文件的完整 URI(包括目录路径)。
freq (str)
- 返回:
0(成功更新)/ 1(无需更新)/ 2(更新失败)。
- 返回类型:
int
- class qlib.data.cache.DiskDatasetCache(provider, **kwargs)
为服务器准备的缓存机制。
- __init__(provider, **kwargs)
- classmethod read_data_from_cache(cache_path: str | Path, start_time, end_time, fields)
read_cache_from
此函数可以从磁盘缓存数据集读取数据。
- 参数:
cache_path
start_time
end_time
fields -- 数据集缓存的字段顺序是排序的。因此,请重新排列列以保持一致性。
- 返回:
- class IndexManager(cache_path: str | Path)
类中未考虑锁。请在代码外部考虑锁。此类是磁盘数据的代理。
- __init__(cache_path: str | Path)
- gen_dataset_cache(cache_path: str | Path, instruments, fields, freq, inst_processors=[])
备注
此函数未考虑缓存读写锁。请在此函数外部获取锁。
缓存的格式包含3个部分(后跟典型文件名)。
索引:cache/d41366901e25de3ec47297f12e2ba11d.index
文件的内容可能采用以下格式(pandas.Series)
start end 1999-11-10 00:00:00 0 1 1999-11-11 00:00:00 1 2 1999-11-12 00:00:00 2 3 ...
备注
开始是关闭的。结束是开放的!!!!!
每行包含两个元素 <start_index, end_index>,以时间戳作为索引。
它表示数据的 start_index`(包含)和 `end_index`(不包含)对应于 `timestamp
元数据:cache/d41366901e25de3ec47297f12e2ba11d.meta
数据 : cache/d41366901e25de3ec47297f12e2ba11d
这是一个按日期时间排序的hdf文件
- 参数:
cache_path -- 存储缓存的路径。
instruments -- 存储缓存的工具。
fields -- 存储缓存的字段。
freq -- 存储缓存的频率。
inst_processors -- 仪器处理器。
:返回类型 pd.DataFrame;返回的 DataFrame 的字段与函数的参数一致。
- update(cache_uri, freq: str = 'day')
将数据集缓存更新为最新日历。
重写此方法以定义如何根据用户自己的缓存机制更新数据集缓存。
- 参数:
cache_uri (str or Path) -- 数据集缓存文件的完整 URI(包括目录路径)。
freq (str)
- 返回:
0(成功更新)/ 1(无需更新)/ 2(更新失败)
- 返回类型:
int
存储
- class qlib.data.storage.storage.BaseStorage
- class qlib.data.storage.storage.CalendarStorage(freq: str, future: bool, **kwargs)
CalendarStorage 的方法和 List 同名方法的行为保持一致
- __init__(freq: str, future: bool, **kwargs)
- property data: Iterable[str]
获取所有数据
- 抛出:
ValueError -- 如果数据(存储)不存在,抛出 ValueError
- index(value: str) int
- 抛出:
ValueError -- 如果数据(存储)不存在,抛出 ValueError
- class qlib.data.storage.storage.InstrumentStorage(market: str, freq: str, **kwargs)
- __init__(market: str, freq: str, **kwargs)
- property data: Dict[str, List[Tuple[str, str]]]
获取所有数据
- 抛出:
ValueError -- 如果数据(存储)不存在,抛出 ValueError
- update([E, ]**F) None. Update D from mapping/iterable E and F.
备注
如果 E 存在并且有 .keys() 方法,执行: for k in E: D[k] = E[k]
如果 E 存在但缺少 .keys() 方法,执行: for (k, v) in E: D[k] = v
在这两种情况下,接下来执行:for k, v in F.items(): D[k] = v
- class qlib.data.storage.storage.FeatureStorage(instrument: str, field: str, freq: str, **kwargs)
- __init__(instrument: str, field: str, freq: str, **kwargs)
- property data: Series
获取所有数据
备注
如果数据(存储)不存在,返回空的 pd.Series:return pd.Series(dtype=np.float32)
- property start_index: int | None
获取 FeatureStorage 起始索引
备注
如果数据(存储)不存在,返回 None
- property end_index: int | None
获取 FeatureStorage 结束索引
备注
数据范围的右侧索引(两侧均为闭区间)
下一个数据追加点将是 end_index + 1
如果数据(存储)不存在,返回 None
- write(data_array: List | ndarray | Tuple, index: int | None = None)
从索引开始将 data_array 写入 FeatureStorage。
备注
如果索引为 None,将 data_array 附加到特征中。
如果 len(data_array) == 0;返回
如果 (index - self.end_index) >= 1,self[end_index+1: index] 将填充 np.nan
示例
feature: 3 4 4 5 5 6 >>> self.write([6, 7], index=6) feature: 3 4 4 5 5 6 6 6 7 7 >>> self.write([8], index=9) feature: 3 4 4 5 5 6 6 6 7 7 8 np.nan 9 8 >>> self.write([1, np.nan], index=3) feature: 3 1 4 np.nan 5 6 6 6 7 7 8 np.nan 9 8
- rebase(start_index: int | None = None, end_index: int | None = None)
重新基准化 FeatureStorage 的 start_index 和 end_index。
start_index 和 end_index 是闭区间:[start_index, end_index]
示例
feature: 3 4 4 5 5 6 >>> self.rebase(start_index=4) feature: 4 5 5 6 >>> self.rebase(start_index=3) feature: 3 np.nan 4 5 5 6 >>> self.write([3], index=3) feature: 3 3 4 5 5 6 >>> self.rebase(end_index=4) feature: 3 3 4 5 >>> self.write([6, 7, 8], index=4) feature: 3 3 4 6 5 7 6 8 >>> self.rebase(start_index=4, end_index=5) feature: 4 6 5 7
- rewrite(data: List | ndarray | Tuple, index: int)
用数据覆盖 FeatureStorage 中的所有数据
- 参数:
data (Union[List, np.ndarray, Tuple]) -- 数据
index (int) -- 数据起始索引
- class qlib.data.storage.file_storage.FileStorageMixin
FileStorageMixin,适用于 FileXXXStorage 子类,需要具有 provider_uri、freq、storage_name、file_name 属性
- check()
检查 self.uri
- 抛出:
ValueError --
- class qlib.data.storage.file_storage.FileCalendarStorage(freq: str, future: bool, provider_uri: dict | None = None, **kwargs)
- __init__(freq: str, future: bool, provider_uri: dict | None = None, **kwargs)
- property data: List[str]
获取所有数据
- 抛出:
ValueError -- 如果数据(存储)不存在,抛出 ValueError
- index(value: str) int
- 抛出:
ValueError -- 如果数据(存储)不存在,抛出 ValueError
- class qlib.data.storage.file_storage.FileInstrumentStorage(market: str, freq: str, provider_uri: dict | None = None, **kwargs)
- __init__(market: str, freq: str, provider_uri: dict | None = None, **kwargs)
- property data: Dict[str, List[Tuple[str, str]]]
获取所有数据
- 抛出:
ValueError -- 如果数据(存储)不存在,抛出 ValueError
- update([E, ]**F) None. Update D from mapping/iterable E and F.
备注
如果 E 存在并且有 .keys() 方法,执行: for k in E: D[k] = E[k]
如果 E 存在但缺少 .keys() 方法,执行: for (k, v) in E: D[k] = v
在这两种情况下,接下来执行:for k, v in F.items(): D[k] = v
- class qlib.data.storage.file_storage.FileFeatureStorage(instrument: str, field: str, freq: str, provider_uri: dict | None = None, **kwargs)
- __init__(instrument: str, field: str, freq: str, provider_uri: dict | None = None, **kwargs)
- property data: Series
获取所有数据
备注
如果数据(存储)不存在,返回空的 pd.Series:return pd.Series(dtype=np.float32)
- write(data_array: List | ndarray, index: int | None = None) None
从索引开始将 data_array 写入 FeatureStorage。
备注
如果索引为 None,将 data_array 附加到特征中。
如果 len(data_array) == 0;返回
如果 (index - self.end_index) >= 1,self[end_index+1: index] 将填充 np.nan
示例
feature: 3 4 4 5 5 6 >>> self.write([6, 7], index=6) feature: 3 4 4 5 5 6 6 6 7 7 >>> self.write([8], index=9) feature: 3 4 4 5 5 6 6 6 7 7 8 np.nan 9 8 >>> self.write([1, np.nan], index=3) feature: 3 1 4 np.nan 5 6 6 6 7 7 8 np.nan 9 8
- property start_index: int | None
获取 FeatureStorage 起始索引
备注
如果数据(存储)不存在,返回 None
- property end_index: int | None
获取 FeatureStorage 结束索引
备注
数据范围的右侧索引(两侧均为闭区间)
下一个数据追加点将是 end_index + 1
如果数据(存储)不存在,返回 None
数据集
数据集类
- class qlib.data.dataset.__init__.Dataset(**kwargs)
为模型训练和推理准备数据。
- __init__(**kwargs)
init 旨在完成以下步骤:
- 初始化子实例和数据集的状态(准备数据的信息)
用于准备数据的基本状态名称不应以 '_' 开头,以便在序列化时可以在磁盘上进行序列化。
- 设置数据
与数据相关的属性名称应以 '_' 开头,以便在序列化时不会保存到磁盘上。
数据可以指定计算准备所需基本数据的信息
- config(**kwargs)
config 旨在配置和参数,这些参数无法从数据中学习
- setup_data(**kwargs)
设置数据。
我们将 setup_data 函数拆分为以下情况:
用户在磁盘上有一个具有学习状态的 Dataset 对象。
用户从磁盘加载数据集对象。
用户调用 setup_data 来加载新数据。
用户根据之前的状态准备模型所需的数据。
- prepare(**kwargs) object
数据集的类型取决于模型。 (它可以是 pd.DataFrame、pytorch.DataLoader 等) 参数应指定准备数据的范围。该方法应: - 处理数据
返回处理后的数据
- 返回:
返回对象
- 返回类型:
object
- class qlib.data.dataset.__init__.DatasetH(handler: Dict | DataHandler, segments: Dict[str, Tuple], fetch_kwargs: Dict = {}, **kwargs)
带有数据(H)处理器的数据集
用户应尝试将数据预处理函数放入处理器中。仅以下数据处理函数应放置在数据集中:
处理与特定模型相关。
处理与数据拆分相关。
- __init__(handler: Dict | DataHandler, segments: Dict[str, Tuple], fetch_kwargs: Dict = {}, **kwargs)
设置基础数据。
- 参数:
handler (Union[dict, DataHandler]) -- 处理器可以是: - DataHandler 的实例 - DataHandler 的配置。请参考 DataHandler
segments (dict) -- 描述分割数据的选项。以下是一些示例: .. code-block:: 1) 'segments': { 'train': ("2008-01-01", "2014-12-31"), 'valid': ("2017-01-01", "2020-08-01",), 'test': ("2015-01-01", "2016-12-31",), } 2) 'segments': { 'insample': ("2008-01-01", "2014-12-31"), 'outsample': ("2017-01-01", "2020-08-01",), }
- config(handler_kwargs: dict | None = None, **kwargs)
初始化 DatasetH
- 参数:
handler_kwargs (dict) -- DataHandler 的配置,可以包括以下参数: - DataHandler.conf_data 的参数,例如 'instruments'、'start_time' 和 'end_time'。
kwargs (dict) -- DatasetH的配置,例如 - segments : dict segments的配置与self.__init__中的'segments'相同。
- setup_data(handler_kwargs: dict | None = None, **kwargs)
设置数据
- 参数:
handler_kwargs (dict) -- 初始化 DataHandler 的参数,可能包括以下参数: - init_type : 处理程序的初始化类型 - enable_cache : 是否启用缓存
- prepare(segments: List[str] | Tuple[str] | str | slice | Index, col_set='__all', data_key='infer', **kwargs) List[DataFrame] | DataFrame
为学习和推理准备数据。
- 参数:
segments (Union[List[Text], Tuple[Text], Text, slice]) -- 描述要准备的数据的范围,以下是一些示例: - 'train' - ['train', 'valid']
col_set (str) -- col_set将在获取数据时传递给self.handler。TODO:使其自动化: - 选择DK_I作为测试数据 - 选择DK_L作为训练数据。
data_key (str) -- 要获取的数据: DK_* 默认值为 DK_I,表示获取 推理 的数据。
kwargs -- kwargs 可能包含的参数: flt_col : str 仅在 TSDatasetH 中存在,可用于添加一列数据(True 或 False)以过滤数据。 该参数仅在它是 TSDatasetH 的实例时受支持。
- 返回类型:
Union[List[pd.DataFrame], pd.DataFrame]
- 抛出:
NotImplementedError: --
数据加载器
- class qlib.data.dataset.loader.DataLoader
DataLoader 旨在从原始数据源加载原始数据。
- abstract load(instruments, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多重索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289- 参数:
instruments (str or dict) -- 它可以是市场名称或由InstrumentProvider生成的工具配置文件。如果工具的值为None,则表示没有进行过滤。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
- 返回:
从底层源加载数据
- 返回类型:
pd.DataFrame
- 抛出:
KeyError: -- 如果不支持工具过滤,则引发KeyError
- class qlib.data.dataset.loader.DLWParser(config: list | tuple | dict)
(D)ata(L)oader (W)ith (P)arser用于特征和名称
提取此类,以便QlibDataLoader和其他数据加载器(如QdbDataLoader)可以共享字段。
- __init__(config: list | tuple | dict)
- 参数:
config (Union[list, tuple, dict]) -- 配置将用于描述字段和列名 .. 代码块:: <config> := { "group_name1": <fields_info1> "group_name2": <fields_info2> } 或 <config> := <fields_info> <fields_info> := ["expr", ...] | (["expr", ...], ["col_name", ...]) # 注意:在解析时,列表或元组将被视为内容
- abstract load_group_df(instruments, exprs: list, names: list, start_time: str | Timestamp | None = None, end_time: str | Timestamp | None = None, gp_name: str | None = None) DataFrame
加载特定组的数据框
- 参数:
instruments -- 工具。
exprs (list) -- 描述数据内容的表达式。
names (list) -- 数据的名称。
- 返回:
查询的数据框。
- 返回类型:
pd.DataFrame
- load(instruments=None, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多重索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289- 参数:
instruments (str or dict) -- 它可以是市场名称或由InstrumentProvider生成的工具配置文件。如果工具的值为None,则表示没有进行过滤。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
- 返回:
从底层源加载数据
- 返回类型:
pd.DataFrame
- 抛出:
KeyError: -- 如果不支持工具过滤,则引发KeyError
- class qlib.data.dataset.loader.QlibDataLoader(config: Tuple[list, tuple, dict], filter_pipe: List | None = None, swap_level: bool = True, freq: str | dict = 'day', inst_processors: dict | list | None = None)
与QlibDataLoader相同。字段可以通过配置定义。
- __init__(config: Tuple[list, tuple, dict], filter_pipe: List | None = None, swap_level: bool = True, freq: str | dict = 'day', inst_processors: dict | list | None = None)
- 参数:
config (Tuple[list, tuple, dict]) -- 请参考DLWParser的文档。
filter_pipe -- 用于仪器的过滤管道。
swap_level -- 是否交换MultiIndex的级别。
freq (dict or str) -- 如果type(config) == dict并且type(freq) == str,则使用freq加载配置数据。如果type(config) == dict并且type(freq) == dict,则使用freq[<group_name>]加载config[<group_name>]数据。
inst_processors (dict | list) -- 如果inst_processors不为None并且type(config) == dict;使用inst_processors[<group_name>]加载config[<group_name>]数据。如果inst_processors是一个列表,则将应用于所有组。
- load_group_df(instruments, exprs: list, names: list, start_time: str | Timestamp | None = None, end_time: str | Timestamp | None = None, gp_name: str | None = None) DataFrame
加载特定组的数据框
- 参数:
instruments -- 工具。
exprs (list) -- 描述数据内容的表达式。
names (list) -- 数据的名称。
- 返回:
查询的数据框。
- 返回类型:
pd.DataFrame
- class qlib.data.dataset.loader.StaticDataLoader(config: dict | str | DataFrame, join='outer')
支持从文件或提供的数据加载的DataLoader。
- __init__(config: dict | str | DataFrame, join='outer')
- 参数:
config (dict) -- {fields_group: <路径或对象>}
join (str) -- 如何对齐不同的数据框
- load(instruments=None, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多重索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289- 参数:
instruments (str or dict) -- 它可以是市场名称或由InstrumentProvider生成的工具配置文件。如果工具的值为None,则表示没有进行过滤。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
- 返回:
从底层源加载数据
- 返回类型:
pd.DataFrame
- 抛出:
KeyError: -- 如果不支持工具过滤,则引发KeyError
- class qlib.data.dataset.loader.NestedDataLoader(dataloader_l: List[Dict], join='left')
我们有多个数据加载器,可以使用这个类将它们组合在一起。
- __init__(dataloader_l: List[Dict], join='left') None
- 参数:
dataloader_l (list[dict]) -- 一个数据加载器的列表,例如 .. code-block:: python nd = NestedDataLoader( dataloader_l=[ { "class": "qlib.contrib.data.loader.Alpha158DL", }, { "class": "qlib.contrib.data.loader.Alpha360DL", "kwargs": { "config": { "label": ( ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]) } } } ] )
join -- 在合并时将传递给 pd.concat。
- load(instruments=None, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多重索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289- 参数:
instruments (str or dict) -- 它可以是市场名称或由InstrumentProvider生成的工具配置文件。如果工具的值为None,则表示没有进行过滤。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
- 返回:
从底层源加载数据
- 返回类型:
pd.DataFrame
- 抛出:
KeyError: -- 如果不支持工具过滤,则引发KeyError
- class qlib.data.dataset.loader.DataLoaderDH(handler_config: dict, fetch_kwargs: dict = {}, is_group=False)
基于 (D)ata (H)andler 的数据加载器,旨在从数据处理器加载多个数据 - 如果您只想从单个数据处理器加载数据,可以将它们写入单个数据处理器。
TODO: 使这个模块不那么容易使用的原因。
对于在线场景
底层数据处理器应该被配置。但数据加载器不提供这样的接口和钩子。
- __init__(handler_config: dict, fetch_kwargs: dict = {}, is_group=False)
- 参数:
handler_config (dict) -- handler_config 将用于描述处理器 .. code-block:: <handler_config> := { "group_name1": <handler> "group_name2": <handler> } 或 <handler_config> := <handler> <handler> := 数据处理器实例 | 数据处理器配置
fetch_kwargs (dict) -- fetch_kwargs 将用于描述 fetch 方法的不同参数,例如 col_set、squeeze、data_key 等。
is_group (bool) -- is_group 将用于描述 handler_config 的键是否为组
- load(instruments=None, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多重索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289- 参数:
instruments (str or dict) -- 它可以是市场名称或由InstrumentProvider生成的工具配置文件。如果工具的值为None,则表示没有进行过滤。
start_time (str) -- 时间范围的开始。
end_time (str) -- 时间范围的结束。
- 返回:
从底层源加载数据
- 返回类型:
pd.DataFrame
- 抛出:
KeyError: -- 如果不支持工具过滤,则引发KeyError
数据处理程序
- class qlib.data.dataset.handler.DataHandler(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, init_data=True, fetch_orig=True)
使用处理程序的步骤 1. 初始化数据处理程序(通过 init 调用)。 2. 使用数据。
数据处理程序尝试维护一个具有 2 个级别的处理程序。datetime 和 instruments。
可以支持索引级别的任何顺序(顺序将在数据中隐含)。当数据框索引名称缺失时,将使用 <datetime、instruments> 的顺序。
数据示例:列的多重索引是可选的。
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289提高数据处理程序性能的提示 - 使用 col_set=CS_RAW 获取数据将返回原始数据,并可能避免在调用 loc 时 pandas 复制数据。
- __init__(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, init_data=True, fetch_orig=True)
- 参数:
instruments -- 要检索的股票列表。
start_time -- 原始数据的开始时间。
end_time -- 原始数据的结束时间。
data_loader (Union[dict, str, DataLoader]) -- 用于加载数据的数据加载器。
init_data -- 在构造函数中初始化原始数据。
fetch_orig (bool) -- 如果可能,返回原始数据而不是副本。
- config(**kwargs)
数据的配置。# 从数据源加载哪些数据
当从数据集加载序列化处理程序时,将使用此方法。数据将使用不同的时间范围进行初始化。
- setup_data(enable_cache: bool = False)
在多次时间运行初始化的情况下设置数据
负责维护以下变量 1) self._data
- 参数:
enable_cache (bool) -- 默认值为 false: - 如果 enable_cache == True:处理后的数据将保存在磁盘上,处理程序将在下次调用 init 时直接从磁盘加载缓存的数据
- fetch(selector: Timestamp | slice | str | Index = slice(None, None, None), level: str | int = 'datetime', col_set: str | List[str] = '__all', squeeze: bool = False, proc_func: Callable | None = None) DataFrame
从底层数据源获取数据
设计动机:- 为底层数据提供统一接口。- 使接口更友好的潜力。- 用户可以在这个额外层中提高获取数据的性能
- 参数:
selector (Union[pd.Timestamp, slice, str]) -- 描述如何通过索引选择数据,可以分为以下几类 - 获取单个索引 - 获取一系列索引 - 切片范围 - pd.Index 用于特定索引 可能发生以下冲突 - ["20200101", "20210101"] 是选择这个切片还是这两天? - 切片具有更高的优先级
level (Union[str, int]) -- 选择哪个索引级别来获取数据
col_set (Union[str, List[str]]) --
如果 isinstance(col_set, str):
选择一组有意义的 pd.Index 列。(例如,特征,列)
如果 col_set == CS_RAW:
将返回原始数据集。
如果 isinstance(col_set, List[str]):
选择几组有意义的列,返回的数据具有多个层次
proc_func (Callable) --
提供一个在获取数据之前处理数据的钩子
一个解释钩子必要性的例子:
一个数据集学习了一些处理器来处理与数据分割相关的数据
它将在每次准备数据时应用这些处理器。
学习的处理器要求数据框在拟合和应用时保持相同的格式
然而,数据格式将根据参数而变化。
因此,处理器应该应用于底层数据。
squeeze (bool) -- 是否挤压列和索引
- 返回类型:
pd.DataFrame.
- get_cols(col_set='__all') list
获取列名
- 参数:
col_set (str) -- 选择一组有意义的列(例如,特征,列)
- 返回:
列名列表
- 返回类型:
list
- get_range_selector(cur_date: Timestamp | str, periods: int) slice
通过周期数获取范围选择器
- 参数:
cur_date (pd.Timestamp or str) -- 当前日期
periods (int) -- 周期数
- get_range_iterator(periods: int, min_periods: int | None = None, **kwargs) Iterator[Tuple[Timestamp, DataFrame]]
获取具有给定周期的切片数据迭代器
- 参数:
periods (int) -- 周期数。
min_periods (int) -- 切片数据框的最小周期数。
kwargs (dict) -- 将被传递给 self.fetch。
- class qlib.data.dataset.handler.DataHandlerLP(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, infer_processors: List = [], learn_processors: List = [], shared_processors: List = [], process_type='append', drop_raw=False, **kwargs)
带有 (L)可学习 (P)处理器 的数据处理器
此处理器将生成三份以 pd.DataFrame 格式的数据。
DK_R / self._data: 从加载器加载的原始数据
DK_I / self._infer: 处理用于推理的数据
DK_L / self._learn: 处理用于学习模型的数据。
使用不同处理器工作流进行学习和推理的动机,以下是一些示例。
用于学习和推理的工具宇宙可能不同。
某些样本的处理可能依赖于标签(例如,一些样本达到限制可能需要额外处理或被丢弃)。
这些处理器仅适用于学习阶段。
数据处理器的提示
减少内存成本
drop_raw=True:这将直接修改原始数据中的数据;
请注意,处理过的数据如 self._infer 或 self._learn 与 Qlib 的 Dataset 中的 segments 概念不同,如 "train" 和 "test"。
处理过的数据如 self._infer 或 self._learn 是经过不同处理器处理的基础数据。
Qlib 的 Dataset 中的 segments 如 "train" 和 "test" 只是查询数据时的时间分段("train" 通常在时间序列中位于 "test" 之前)。
例如,您可以查询在 "train" 时间分段中由 infer_processors 处理的 data._infer。
- __init__(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, infer_processors: List = [], learn_processors: List = [], shared_processors: List = [], process_type='append', drop_raw=False, **kwargs)
- 参数:
infer_processors (list) --
生成推断数据的处理器的 <描述信息> 列表
<描述信息> 的示例:
1) classname & kwargs: { "class": "MinMaxNorm", "kwargs": { "fit_start_time": "20080101", "fit_end_time": "20121231" } } 2) Only classname: "DropnaFeature" 3) object instance of Processor
learn_processors (list) -- 类似于 infer_processors,但用于生成学习模型的数据
process_type (str) -- PTYPE_I = '独立' - self._infer 将由 infer_processors 处理 - self._learn 将由 learn_processors 处理 PTYPE_A = '附加' - self._infer 将由 infer_processors 处理 - self._learn 将由 infer_processors + learn_processors 处理 - (例如 self._infer 由 learn_processors 处理 )
drop_raw (bool) -- 是否丢弃原始数据
- fit()
拟合数据而不处理数据
- fit_process_data()
拟合和处理数据
fit 的输入将是前一个处理器的输出
- process_data(with_fit: bool = False)
处理数据。如果必要,调用 processor.fit
符号: (data) [processor]
# 当 self.process_type == DataHandlerLP.PTYPE_I 时的数据处理流程
(self._data)-[shared_processors]-(_shared_df)-[learn_processors]-(_learn_df) \ -[infer_processors]-(_infer_df)# 当 self.process_type == DataHandlerLP.PTYPE_A 时的数据处理流程
(self._data)-[shared_processors]-(_shared_df)-[infer_processors]-(_infer_df)-[learn_processors]-(_learn_df)
- 参数:
with_fit (bool) -- fit 的输入将是前一个处理器的输出
- config(processor_kwargs: dict | None = None, **kwargs)
数据的配置。# 从数据源加载哪些数据
当从数据集加载序列化处理程序时,将使用此方法。数据将使用不同的时间范围进行初始化。
- setup_data(init_type: str = 'fit_seq', **kwargs)
设置数据以便在多次运行初始化时使用
- 参数:
init_type (str) -- 上述列出的类型 IT_*。
enable_cache (bool) -- 默认值为 false: - 如果 enable_cache == True:处理后的数据将保存在磁盘上,处理程序将在下次调用 init 时直接从磁盘加载缓存的数据
- fetch(selector: Timestamp | slice | str = slice(None, None, None), level: str | int = 'datetime', col_set='__all', data_key: Literal['raw', 'infer', 'learn'] = 'infer', squeeze: bool = False, proc_func: Callable | None = None) DataFrame
从底层数据源获取数据
- 参数:
selector (Union[pd.Timestamp, slice, str]) -- 描述如何通过索引选择数据。
level (Union[str, int]) -- 选择数据的索引级别。
col_set (str) -- 选择一组有意义的列。(例如:特征,列)。
data_key (str) -- 要获取的数据: DK_*。
proc_func (Callable) -- 请参考 DataHandler.fetch 的文档
- 返回类型:
pd.DataFrame
- get_cols(col_set='__all', data_key: Literal['raw', 'infer', 'learn'] = 'infer') list
获取列名
- 参数:
col_set (str) -- 选择一组有意义的列。(例如:特征,列)。
data_key (DATA_KEY_TYPE) -- 要获取的数据: DK_*。
- 返回:
列名列表
- 返回类型:
list
- classmethod cast(handler: DataHandlerLP) DataHandlerLP
动机
用户在他自定义的包中创建一个数据处理器。然后他想将处理后的处理器分享给其他用户,而不引入包依赖和复杂的数据处理逻辑。
这个类通过将类转换为 DataHandlerLP 并仅保留处理后的数据,使其成为可能
- 参数:
handler (DataHandlerLP) -- DataHandlerLP 的子类
- 返回:
转换后的处理数据
- 返回类型:
- classmethod from_df(df: DataFrame) DataHandlerLP
动机: - 当用户想要快速获取数据处理器时。
创建的数据处理器将只有一个共享的 Dataframe,没有处理器。在创建处理器后,用户可能经常想要转储处理器以便重用。这是一个典型的用例
from qlib.data.dataset import DataHandlerLP dh = DataHandlerLP.from_df(df) dh.to_pickle(fname, dump_all=True)
TODO: - StaticDataLoader速度较慢。它不需要再次复制数据...
处理器
- qlib.data.dataset.processor.get_group_columns(df: DataFrame, group: str | None)
从多重索引列的DataFrame中获取一组列
- 参数:
df (pd.DataFrame) -- 与多列一起。
group (str) -- 特征组的名称,即组索引的第一级值。
- class qlib.data.dataset.processor.Processor
- fit(df: DataFrame | None = None)
学习数据处理参数
- 参数:
df (pd.DataFrame) -- 当我们逐个使用处理器拟合和处理数据时,拟合函数依赖于前一个处理器的输出,即`df`。
- is_for_infer() bool
这个处理器可以用于推断吗?有些处理器不能用于推断。
- 返回:
如果它可以用于推断。
- 返回类型:
bool
- readonly() bool
处理器在处理时是否将输入数据视为只读(即不写入输入数据)
了解只读信息对处理程序避免不必要的复制是有帮助的
- config(**kwargs)
配置可序列化对象
- 参数:
keys (kwargs may include following) -- dump_all : bool 对象是否会转储所有对象 exclude : list 哪些属性将不会被转储 include : list 哪些属性将被转储
recursive (bool) -- 配置是否会递归
- class qlib.data.dataset.processor.DropnaProcessor(fields_group=None)
- __init__(fields_group=None)
- readonly()
处理器在处理时是否将输入数据视为只读(即不写入输入数据)
了解只读信息对处理程序避免不必要的复制是有帮助的
- class qlib.data.dataset.processor.DropnaLabel(fields_group='label')
- __init__(fields_group='label')
- is_for_infer() bool
样本根据标签被丢弃。因此它不能用于推理
- class qlib.data.dataset.processor.DropCol(col_list=[])
- __init__(col_list=[])
- readonly()
处理器在处理时是否将输入数据视为只读(即不写入输入数据)
了解只读信息对处理程序避免不必要的复制是有帮助的
- class qlib.data.dataset.processor.FilterCol(fields_group='feature', col_list=[])
- __init__(fields_group='feature', col_list=[])
- readonly()
处理器在处理时是否将输入数据视为只读(即不写入输入数据)
了解只读信息对处理程序避免不必要的复制是有帮助的
- class qlib.data.dataset.processor.TanhProcess
使用tanh处理噪声数据
- class qlib.data.dataset.processor.ProcessInf
处理无穷大
- class qlib.data.dataset.processor.Fillna(fields_group=None, fill_value=0)
处理NaN
- __init__(fields_group=None, fill_value=0)
- class qlib.data.dataset.processor.MinMaxNorm(fit_start_time, fit_end_time, fields_group=None)
- __init__(fit_start_time, fit_end_time, fields_group=None)
- fit(df: DataFrame | None = None)
学习数据处理参数
- 参数:
df (pd.DataFrame) -- 当我们逐个使用处理器拟合和处理数据时,拟合函数依赖于前一个处理器的输出,即`df`。
- class qlib.data.dataset.processor.ZScoreNorm(fit_start_time, fit_end_time, fields_group=None)
Z分数标准化
- __init__(fit_start_time, fit_end_time, fields_group=None)
- fit(df: DataFrame | None = None)
学习数据处理参数
- 参数:
df (pd.DataFrame) -- 当我们逐个使用处理器拟合和处理数据时,拟合函数依赖于前一个处理器的输出,即`df`。
- class qlib.data.dataset.processor.RobustZScoreNorm(fit_start_time, fit_end_time, fields_group=None, clip_outlier=True)
稳健Z分数标准化
- 使用稳健统计进行Z分数标准化:
mean(x) = median(x) std(x) = MAD(x) * 1.4826
- 参考文献:
https://zh.wikipedia.org/wiki/中位数绝对偏差。
- __init__(fit_start_time, fit_end_time, fields_group=None, clip_outlier=True)
- fit(df: DataFrame | None = None)
学习数据处理参数
- 参数:
df (pd.DataFrame) -- 当我们逐个使用处理器拟合和处理数据时,拟合函数依赖于前一个处理器的输出,即`df`。
- class qlib.data.dataset.processor.CSZScoreNorm(fields_group=None, method='zscore')
横截面Z分数归一化
- __init__(fields_group=None, method='zscore')
- class qlib.data.dataset.processor.CSRankNorm(fields_group=None)
横截面排名归一化。"横截面"通常用于描述数据操作。不同股票之间的操作通常称为横截面操作。
例如,CSRankNorm是一种操作,它按每天对数据进行分组,并在每一天对所有股票进行排名。
关于3.46和0.5的解释
import numpy as np import pandas as pd x = np.random.random(10000) # for any variable x_rank = pd.Series(x).rank(pct=True) # if it is converted to rank, it will be a uniform distributed x_rank_norm = (x_rank - x_rank.mean()) / x_rank.std() # Normally, we will normalize it to make it like normal distribution x_rank.mean() # accounts for 0.5 1 / x_rank.std() # accounts for 3.46
- __init__(fields_group=None)
- class qlib.data.dataset.processor.CSZFillna(fields_group=None)
横截面填充缺失值
- __init__(fields_group=None)
- class qlib.data.dataset.processor.HashStockFormat
将df中的存储处理为哈希股票格式
- class qlib.data.dataset.processor.TimeRangeFlt(start_time: Timestamp | str | None = None, end_time: Timestamp | str | None = None, freq: str = 'day')
这是一个过滤器,用于过滤股票。仅保留从start_time到end_time之间存在的数据(中间的存在性不进行检查。)警告:这可能会导致泄漏!!!
- __init__(start_time: Timestamp | str | None = None, end_time: Timestamp | str | None = None, freq: str = 'day')
- 参数:
start_time (Optional[Union[pd.Timestamp, str]]) -- 数据必须在`start_time`之前(或等于)开始,None表示数据将不根据`start_time`进行过滤
end_time (Optional[Union[pd.Timestamp, str]]) -- 类似于开始时间
freq (str) -- 日历的频率
贡献
模型
- class qlib.model.base.Model
可学习模型
- fit(dataset: Dataset, reweighter: Reweighter)
从基础模型学习模型
备注
学习模型的属性名称不应以 '_' 开头。这样模型才能被转储到磁盘。
以下代码示例演示如何从 dataset 中检索 x_train、y_train 和 w_train:
# get features and labels df_train, df_valid = dataset.prepare( ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L ) x_train, y_train = df_train["feature"], df_train["label"] x_valid, y_valid = df_valid["feature"], df_valid["label"] # get weights try: wdf_train, wdf_valid = dataset.prepare(["train", "valid"], col_set=["weight"], data_key=DataHandlerLP.DK_L) w_train, w_valid = wdf_train["weight"], wdf_valid["weight"] except KeyError as e: w_train = pd.DataFrame(np.ones_like(y_train.values), index=y_train.index) w_valid = pd.DataFrame(np.ones_like(y_valid.values), index=y_valid.index)
- 参数:
dataset (Dataset) -- 数据集将生成来自模型训练的处理数据。
- class qlib.model.base.ModelFT
模型 (F)ine(t)unable
- abstract finetune(dataset: Dataset)
基于给定数据集微调模型
使用 qlib.workflow.R 微调模型的典型用例
# start exp to train init model with R.start(experiment_name="init models"): model.fit(dataset) R.save_objects(init_model=model) rid = R.get_recorder().id # Finetune model based on previous trained model with R.start(experiment_name="finetune model"): recorder = R.get_recorder(recorder_id=rid, experiment_name="init models") model = recorder.load_object("init_model") model.finetune(dataset, num_boost_round=10)
- 参数:
dataset (Dataset) -- 数据集将生成来自模型训练的处理数据集。
策略
- class qlib.contrib.strategy.TopkDropoutStrategy(*, topk, n_drop, method_sell='bottom', method_buy='top', hold_thresh=1, only_tradable=False, forbid_all_trade_at_limit=True, **kwargs)
- __init__(*, topk, n_drop, method_sell='bottom', method_buy='top', hold_thresh=1, only_tradable=False, forbid_all_trade_at_limit=True, **kwargs)
- 参数:
topk (int) -- 投资组合中的股票数量。
n_drop (int) -- 每个交易日期要替换的股票数量。
method_sell (str) -- 剔除方法_卖出,随机/底部。
method_buy (str) -- 剔除方法_买入,随机/顶部。
hold_thresh (int) -- 在卖出股票之前的最小持有天数,将检查当前.get_stock_count(order.stock_id) >= self.hold_thresh。
only_tradable (bool) -- 策略在买入和卖出时是否只考虑可交易股票。如果 only_tradable: 策略将根据股票信息的可交易状态做出决策,并避免买卖它们。否则: 策略将在不检查股票的可交易状态的情况下做出买卖决策。
forbid_all_trade_at_limit (bool) -- 如果在达到涨停或跌停时禁止所有交易。如果 forbid_all_trade_at_limit: 策略在价格达到涨停/跌停时将不进行任何交易,甚至在涨停时不卖出,跌停时不买入,尽管在现实中是允许的。否则: 策略将在涨停时卖出,在跌停时买入。
- generate_trade_decision(execute_result=None)
在每个交易条生成交易决策
- 参数:
execute_result (List[object], optional) -- 交易决策的执行结果,默认值为 None - 当第一次调用 generate_trade_decision 时,execute_result 可能为 None
- class qlib.contrib.strategy.WeightStrategyBase(*, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWOInteract'>, **kwargs)
- __init__(*, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWOInteract'>, **kwargs)
- 信号:
描述信号的信息。请参考`qlib.backtest.signal.create_signal_from`的文档,策略的决策将基于给定的信号
- 交易所交易所
提供市场信息的交易所,用于处理订单和生成报告
如果`trade_exchange`为None,self.trade_exchange将被设置为common_infra
它允许在不同的执行中使用不同的trade_exchanges。
例如:
在日常执行中,日常交易所和分钟交易所都可用,但推荐使用日常交易所,因为它运行更快。
在分钟执行中,日常交易所不可用,仅推荐使用分钟交易所。
- generate_target_weight_position(score, current, trade_start_time, trade_end_time)
根据该日期的得分和当前持仓生成目标持仓。现金不考虑在持仓中
- 参数:
score (pd.Series) -- 该交易日期的预测得分,索引为stock_id,包含'score'列。
current (Position()) -- 当前持仓。
trade_start_time (pd.Timestamp)
trade_end_time (pd.Timestamp)
- generate_trade_decision(execute_result=None)
在每个交易条生成交易决策
- 参数:
execute_result (List[object], optional) -- 交易决策的执行结果,默认值为 None - 当第一次调用 generate_trade_decision 时,execute_result 可能为 None
- class qlib.contrib.strategy.EnhancedIndexingStrategy(*, riskmodel_root, market='csi500', turn_limit=None, name_mapping={}, optimizer_kwargs={}, verbose=False, **kwargs)
增强型指数策略
增强型指数结合了主动管理和被动管理的艺术,旨在在控制风险暴露(即跟踪误差)的同时,在投资组合回报方面超越基准指数(例如,标准普尔500指数)。
用户需要准备如下的风险模型数据:
├── /path/to/riskmodel ├──── 20210101 ├────── factor_exp.{csv|pkl|h5} ├────── factor_cov.{csv|pkl|h5} ├────── specific_risk.{csv|pkl|h5} ├────── blacklist.{csv|pkl|h5} # optional风险模型数据可以从风险数据提供者处获得。您还可以使用`qlib.model.riskmodel.structured.StructuredCovEstimator`来准备这些数据。
- 参数:
riskmodel_path (str) -- 风险模型路径
name_mapping (dict) -- 替代文件名
- __init__(*, riskmodel_root, market='csi500', turn_limit=None, name_mapping={}, optimizer_kwargs={}, verbose=False, **kwargs)
- 信号:
描述信号的信息。请参考`qlib.backtest.signal.create_signal_from`的文档,策略的决策将基于给定的信号
- 交易所交易所
提供市场信息的交易所,用于处理订单和生成报告
如果`trade_exchange`为None,self.trade_exchange将被设置为common_infra
它允许在不同的执行中使用不同的trade_exchanges。
例如:
在日常执行中,日常交易所和分钟交易所都可用,但推荐使用日常交易所,因为它运行更快。
在分钟执行中,日常交易所不可用,仅推荐使用分钟交易所。
- generate_target_weight_position(score, current, trade_start_time, trade_end_time)
根据该日期的得分和当前持仓生成目标持仓。现金不考虑在持仓中
- 参数:
score (pd.Series) -- 该交易日期的预测得分,索引为stock_id,包含'score'列。
current (Position()) -- 当前持仓。
trade_start_time (pd.Timestamp)
trade_end_time (pd.Timestamp)
- class qlib.contrib.strategy.TWAPStrategy(outer_trade_decision: BaseTradeDecision = None, level_infra: LevelInfrastructure = None, common_infra: CommonInfrastructure = None, trade_exchange: Exchange = None)
用于交易的TWAP策略
备注
该TWAP策略在交易时将进行四舍五入。这将使TWAP交易策略在总交易单位金额小于交易步长时提前产生订单
- reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs)
- 参数:
outer_trade_decision (BaseTradeDecision, optional)
- generate_trade_decision(execute_result=None)
在每个交易条生成交易决策
- 参数:
execute_result (List[object], optional) -- 交易决策的执行结果,默认值为 None - 当第一次调用 generate_trade_decision 时,execute_result 可能为 None
- class qlib.contrib.strategy.SBBStrategyBase(outer_trade_decision: BaseTradeDecision = None, level_infra: LevelInfrastructure = None, common_infra: CommonInfrastructure = None, trade_exchange: Exchange = None)
(S)选择每两个相邻交易(B)条中的(B)更好者进行买入或卖出。
- reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs)
- 参数:
outer_trade_decision (BaseTradeDecision, optional)
- generate_trade_decision(execute_result=None)
在每个交易条生成交易决策
- 参数:
execute_result (List[object], optional) -- 交易决策的执行结果,默认值为 None - 当第一次调用 generate_trade_decision 时,execute_result 可能为 None
- class qlib.contrib.strategy.SBBStrategyEMA(outer_trade_decision: BaseTradeDecision | None = None, instruments: List | str = 'csi300', freq: str = 'day', trade_exchange: Exchange | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs)
(S)选择每两个相邻交易(B)条中的(B)更好者进行买入或卖出,使用(EMA)信号。
- __init__(outer_trade_decision: BaseTradeDecision | None = None, instruments: List | str = 'csi300', freq: str = 'day', trade_exchange: Exchange | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs)
- 参数:
instruments (Union[List, str], optional) -- EMA信号的工具,默认"csi300"
freq (str, optional) -- EMA信号的频率,默认"day" 注意:`freq`可能与`time_per_step`不同
- reset_level_infra(level_infra)
重置级别共享基础设施 - 重置交易日历后,信号将会改变
- class qlib.contrib.strategy.SoftTopkStrategy(model, dataset, topk, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWInteract'>, max_sold_weight=1.0, risk_degree=0.95, buy_method='first_fill', trade_exchange=None, level_infra=None, common_infra=None, **kwargs)
- __init__(model, dataset, topk, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWInteract'>, max_sold_weight=1.0, risk_degree=0.95, buy_method='first_fill', trade_exchange=None, level_infra=None, common_infra=None, **kwargs)
- 参数:
topk (int) -- 要购买的前N只股票
risk_degree (float) -- 总价值的持仓百分比 买入方式: rank_fill: 优先分配排名高的股票权重(1/topk最大) average_fill: 平均分配排名高的股票权重。
- get_risk_degree(trade_step=None)
返回您在投资中将使用的总价值的比例。动态风险程度将影响市场时机。
- generate_target_weight_position(score, current, trade_start_time, trade_end_time)
- 参数:
score -- 该交易日期的预测分数,pd.Series,索引为stock_id,包含'score'列
current -- 当前持仓,使用Position()类
trade_date -- 交易日期 从该日期的分数和当前持仓生成目标持仓 缓存不考虑在持仓中
评估
- qlib.contrib.evaluate.risk_analysis(r, N: int | None = None, freq: str = 'day')
风险分析 注意:年化收益的计算与年化收益的定义不同。这是设计使然。Qlib尝试通过求和而不是乘积来累积收益,以避免累积曲线呈指数偏斜。Qlib中所有年化收益的计算都遵循这一原则。
TODO:添加一个参数以启用使用收益的乘积累积计算指标。
- 参数:
r (pandas.Series) -- 每日收益系列。
N (int) -- 用于年化信息比率的缩放器(天:252,周:50,月:12),至少应存在 N 和 freq 中的一个
freq (str) -- 用于计算缩放器的分析频率,至少应存在 N 和 freq 中的一个
- qlib.contrib.evaluate.indicator_analysis(df, method='mean')
分析交易的统计时间序列指标
- 参数:
df (pandas.DataFrame) -- 列:如 ['pa', 'pos', 'ffr', 'deal_amount', 'value']。 必需字段: - 'pa' 是交易指标中的价格优势 - 'pos' 是交易指标中的正率 - 'ffr' 是交易指标中的履约率 可选字段: - 'deal_amount' 是总交易金额,仅在方法为 'amount_weighted' 时必需 - 'value' 是总交易价值,仅在方法为 'value_weighted' 时必需 索引:索引(日期时间)
method (str, optional) -- pa/ffr 的统计方法,默认 "mean" - 如果方法为 'mean',计算每个交易指标的平均统计值 - 如果方法为 'amount_weighted',计算每个交易指标的交易金额加权平均统计值 - 如果方法为 'value_weighted',计算每个交易指标的价值加权平均统计值 注意:pos 的统计方法始终为 "mean"
- 返回:
每个交易指标的统计值
- 返回类型:
pd.DataFrame
- qlib.contrib.evaluate.backtest_daily(start_time: str | Timestamp, end_time: str | Timestamp, strategy: str | dict | BaseStrategy, executor: str | dict | BaseExecutor | None = None, account: float | int | Position = 100000000.0, benchmark: str = 'SH000300', exchange_kwargs: dict | None = None, pos_type: str = 'Position')
初始化策略和执行器,然后执行日频回测
- 参数:
start_time (Union[str, pd.Timestamp]) -- 回测的结束时间 注意:这将应用于最外层执行器的日历。
end_time (Union[str, pd.Timestamp]) -- 回测的结束时间 注意:这将应用于最外层执行器的日历。例如,Executor[day](Executor[1min]),设置 end_time == 20XX0301 将包括 20XX0301 的所有分钟
strategy (Union[str, dict, BaseStrategy]) -- 用于初始化最外层投资组合策略。有关更多信息,请参考 init_instance_by_config 的文档。例如,.. code-block:: python # 字典 strategy = { "class": "TopkDropoutStrategy", "module_path": "qlib.contrib.strategy.signal_strategy", "kwargs": { "signal": (model, dataset), "topk": 50, "n_drop": 5, }, } # 基础策略 pred_score = pd.read_pickle("score.pkl")["score"] STRATEGY_CONFIG = { "topk": 50, "n_drop": 5, "signal": pred_score, } strategy = TopkDropoutStrategy(**STRATEGY_CONFIG) # 字符串示例。 # 1) 指定一个 pickle 对象 # - 路径如 'file:///<path to pickle file>/obj.pkl' # 2) 指定一个类名 # - "ClassName": getattr(module, "ClassName")() 将被使用。 # 3) 指定带类名的模块路径 # - "a.b.c.ClassName" getattr(<a.b.c.module>, "ClassName")() 将被使用。
executor (Union[str, dict, BaseExecutor]) -- 用于初始化最外层执行器。
benchmark (str) -- 报告的基准。
account (Union[float, int, Position]) -- 描述如何创建账户的信息 对于 float 或 int: 使用仅有初始现金的账户 对于 Position: 使用带有头寸的账户
exchange_kwargs (dict) -- 用于初始化交易所的 kwargs 例如 .. code-block:: python exchange_kwargs = { "freq": freq, "limit_threshold": None, # limit_threshold 为 None,使用 C.limit_threshold "deal_price": None, # deal_price 为 None,使用 C.deal_price "open_cost": 0.0005, "close_cost": 0.0015, "min_cost": 5, }
pos_type (str) -- 头寸的类型。
- 返回:
report_normal (pd.DataFrame) -- 回测报告
positions_normal (pd.DataFrame) -- 回测头寸
- qlib.contrib.evaluate.long_short_backtest(pred, topk=50, deal_price=None, shift=1, open_cost=0, close_cost=0, trade_unit=None, limit_threshold=None, min_cost=5, subscribe_fields=[], extract_codes=False)
一个多空策略的回测
- 参数:
pred -- 在第 T 天产生的交易信号。
topk -- 短仓前k只证券和长仓前k只证券。
deal_price -- 进行交易的价格。
shift -- 是否将预测推迟一天。如果 shift==1,交易日将为 T+1。
open_cost -- 开仓交易成本。
close_cost -- 平仓交易成本。
trade_unit -- 中国A股的100。
limit_threshold -- 限制移动0.1(10%),例如,长短仓具有相同限制。
min_cost -- 最小交易成本。
subscribe_fields -- 订阅字段。
extract_codes -- 布尔值。我们是否将从预测中提取的代码传递给交易所。注意:使用离线qlib会更快。
- 返回:
回测结果,以字典表示。{ "long": long_returns(excess), "short": short_returns(excess), "long_short": long_short_returns}
报告
- qlib.contrib.report.analysis_position.report.report_graph(report_df: DataFrame, show_notebook: bool = True) [<class 'list'>, <class 'tuple'>]
显示回测报告
示例:
import qlib import pandas as pd from qlib.utils.time import Freq from qlib.utils import flatten_dict from qlib.backtest import backtest, executor from qlib.contrib.evaluate import risk_analysis from qlib.contrib.strategy import TopkDropoutStrategy # init qlib qlib.init(provider_uri=<qlib data dir>) CSI300_BENCH = "SH000300" FREQ = "day" STRATEGY_CONFIG = { "topk": 50, "n_drop": 5, # pred_score, pd.Series "signal": pred_score, } EXECUTOR_CONFIG = { "time_per_step": "day", "generate_portfolio_metrics": True, } backtest_config = { "start_time": "2017-01-01", "end_time": "2020-08-01", "account": 100000000, "benchmark": CSI300_BENCH, "exchange_kwargs": { "freq": FREQ, "limit_threshold": 0.095, "deal_price": "close", "open_cost": 0.0005, "close_cost": 0.0015, "min_cost": 5, }, } # strategy object strategy_obj = TopkDropoutStrategy(**STRATEGY_CONFIG) # executor object executor_obj = executor.SimulatorExecutor(**EXECUTOR_CONFIG) # backtest portfolio_metric_dict, indicator_dict = backtest(executor=executor_obj, strategy=strategy_obj, **backtest_config) analysis_freq = "{0}{1}".format(*Freq.parse(FREQ)) # backtest info report_normal_df, positions_normal = portfolio_metric_dict.get(analysis_freq) qcr.analysis_position.report_graph(report_normal_df)
- 参数:
report_df -- df.index.name 必须是 date,df.columns 必须包含 return,turnover,cost,bench。 .. code-block:: python return cost bench turnover date 2017-01-04 0.003421 0.000864 0.011693 0.576325 2017-01-05 0.000508 0.000447 0.000721 0.227882 2017-01-06 -0.003321 0.000212 -0.004322 0.102765 2017-01-09 0.006753 0.000212 0.006874 0.105864 2017-01-10 -0.000416 0.000440 -0.003350 0.208396
show_notebook -- 是否在笔记本中显示图形,默认值为 True。
- 返回:
如果 show_notebook 为 True,则在笔记本中显示;否则返回 plotly.graph_objs.Figure 列表。
- qlib.contrib.report.analysis_position.score_ic.score_ic_graph(pred_label: DataFrame, show_notebook: bool = True, **kwargs) [<class 'list'>, <class 'tuple'>]
得分 IC
示例:
from qlib.data import D from qlib.contrib.report import analysis_position pred_df_dates = pred_df.index.get_level_values(level='datetime') features_df = D.features(D.instruments('csi500'), ['Ref($close, -2)/Ref($close, -1)-1'], pred_df_dates.min(), pred_df_dates.max()) features_df.columns = ['label'] pred_label = pd.concat([features_df, pred], axis=1, sort=True).reindex(features_df.index) analysis_position.score_ic_graph(pred_label)
- 参数:
pred_label -- 索引是 pd.MultiIndex,索引名称是 [instrument, datetime];列名称是 [score, label]。 .. code-block:: python instrument datetime score label SH600004 2017-12-11 -0.013502 -0.013502 2017-12-12 -0.072367 -0.072367 2017-12-13 -0.068605 -0.068605 2017-12-14 0.012440 0.012440 2017-12-15 -0.102778 -0.102778
show_notebook -- 是否在笔记本中显示图形,默认值为 True。
- 返回:
如果 show_notebook 为 True,则在笔记本中显示;否则返回 plotly.graph_objs.Figure 列表。
- qlib.contrib.report.analysis_position.cumulative_return.cumulative_return_graph(position: dict, report_normal: DataFrame, label_data: DataFrame, show_notebook=True, start_date=None, end_date=None) Iterable[Figure]
回测买入、卖出和持有的累计收益图
示例:
from qlib.data import D from qlib.contrib.evaluate import risk_analysis, backtest, long_short_backtest from qlib.contrib.strategy import TopkDropoutStrategy # backtest parameters bparas = {} bparas['limit_threshold'] = 0.095 bparas['account'] = 1000000000 sparas = {} sparas['topk'] = 50 sparas['n_drop'] = 5 strategy = TopkDropoutStrategy(**sparas) report_normal_df, positions = backtest(pred_df, strategy, **bparas) pred_df_dates = pred_df.index.get_level_values(level='datetime') features_df = D.features(D.instruments('csi500'), ['Ref($close, -1)/$close - 1'], pred_df_dates.min(), pred_df_dates.max()) features_df.columns = ['label'] qcr.analysis_position.cumulative_return_graph(positions, report_normal_df, features_df)
图表描述:
X轴:交易日。
Y轴:
Y轴上方:(((Ref($close, -1)/$close - 1) * weight).sum() / weight.sum()).cumsum()。
Y轴下方:每日权重总和。
在 卖出 图中,y < 0 表示盈利;在其他情况下,y > 0 表示盈利。
在 买入减去卖出 图中,底部 权重 图的 y 值为 buy_weight + sell_weight。
在每个图中,右侧直方图中的 红线 表示平均值。
- 参数:
position -- 持仓数据
report_normal --
return cost bench turnover date 2017-01-04 0.003421 0.000864 0.011693 0.576325 2017-01-05 0.000508 0.000447 0.000721 0.227882 2017-01-06 -0.003321 0.000212 -0.004322 0.102765 2017-01-09 0.006753 0.000212 0.006874 0.105864 2017-01-10 -0.000416 0.000440 -0.003350 0.208396
label_data -- D.features 结果;索引是 pd.MultiIndex,索引名称是 [instrument, datetime];列名称是 [label]。 标签 T 是从 T 到 T+1 的变化,建议使用
close,示例:D.features(D.instruments('csi500'), ['Ref($close, -1)/$close-1']) .. code-block:: python label instrument datetime SH600004 2017-12-11 -0.013502 2017-12-12 -0.072367 2017-12-13 -0.068605 2017-12-14 0.012440 2017-12-15 -0.102778show_notebook -- 真或假。如果为真,则在笔记本中显示图表,否则返回图形
start_date -- 开始日期
end_date -- 结束日期
- 返回:
- qlib.contrib.report.analysis_position.risk_analysis.risk_analysis_graph(analysis_df: DataFrame | None = None, report_normal_df: DataFrame | None = None, report_long_short_df: DataFrame | None = None, show_notebook: bool = True) Iterable[Figure]
生成分析图和月度分析
示例:
import qlib import pandas as pd from qlib.utils.time import Freq from qlib.utils import flatten_dict from qlib.backtest import backtest, executor from qlib.contrib.evaluate import risk_analysis from qlib.contrib.strategy import TopkDropoutStrategy # init qlib qlib.init(provider_uri=<qlib data dir>) CSI300_BENCH = "SH000300" FREQ = "day" STRATEGY_CONFIG = { "topk": 50, "n_drop": 5, # pred_score, pd.Series "signal": pred_score, } EXECUTOR_CONFIG = { "time_per_step": "day", "generate_portfolio_metrics": True, } backtest_config = { "start_time": "2017-01-01", "end_time": "2020-08-01", "account": 100000000, "benchmark": CSI300_BENCH, "exchange_kwargs": { "freq": FREQ, "limit_threshold": 0.095, "deal_price": "close", "open_cost": 0.0005, "close_cost": 0.0015, "min_cost": 5, }, } # strategy object strategy_obj = TopkDropoutStrategy(**STRATEGY_CONFIG) # executor object executor_obj = executor.SimulatorExecutor(**EXECUTOR_CONFIG) # backtest portfolio_metric_dict, indicator_dict = backtest(executor=executor_obj, strategy=strategy_obj, **backtest_config) analysis_freq = "{0}{1}".format(*Freq.parse(FREQ)) # backtest info report_normal_df, positions_normal = portfolio_metric_dict.get(analysis_freq) analysis = dict() analysis["excess_return_without_cost"] = risk_analysis( report_normal_df["return"] - report_normal_df["bench"], freq=analysis_freq ) analysis["excess_return_with_cost"] = risk_analysis( report_normal_df["return"] - report_normal_df["bench"] - report_normal_df["cost"], freq=analysis_freq ) analysis_df = pd.concat(analysis) # type: pd.DataFrame analysis_position.risk_analysis_graph(analysis_df, report_normal_df)
- 参数:
analysis_df -- 分析数据,索引为 pd.MultiIndex;列名为 [risk]。 .. 代码块:: python risk excess_return_without_cost mean 0.000692 std 0.005374 annualized_return 0.174495 information_ratio 2.045576 max_drawdown -0.079103 excess_return_with_cost mean 0.000499 std 0.005372 annualized_return 0.125625 information_ratio 1.473152 max_drawdown -0.088263
report_normal_df -- df.index.name 必须为 date,df.columns 必须包含 return,turnover,cost,bench。 .. 代码块:: python return cost bench turnover date 2017-01-04 0.003421 0.000864 0.011693 0.576325 2017-01-05 0.000508 0.000447 0.000721 0.227882 2017-01-06 -0.003321 0.000212 -0.004322 0.102765 2017-01-09 0.006753 0.000212 0.006874 0.105864 2017-01-10 -0.000416 0.000440 -0.003350 0.208396
report_long_short_df -- df.index.name 必须为 date,df.columns 包含 long,short,long_short。 .. 代码块:: python long short long_short date 2017-01-04 -0.001360 0.001394 0.000034 2017-01-05 0.002456 0.000058 0.002514 2017-01-06 0.000120 0.002739 0.002859 2017-01-09 0.001436 0.001838 0.003273 2017-01-10 0.000824 -0.001944 -0.001120
show_notebook -- 是否在笔记本中显示图形,默认 True。如果为 True,则在笔记本中显示图形;如果为 False,则返回图形。
- 返回:
- qlib.contrib.report.analysis_position.rank_label.rank_label_graph(position: dict, label_data: DataFrame, start_date=None, end_date=None, show_notebook=True) Iterable[Figure]
交易日买入、卖出和持有股票的排名百分比。每日交易的平均排名比率(类似于 sell_df['label'].rank(ascending=False) / len(sell_df))。
示例:
from qlib.data import D from qlib.contrib.evaluate import backtest from qlib.contrib.strategy import TopkDropoutStrategy # backtest parameters bparas = {} bparas['limit_threshold'] = 0.095 bparas['account'] = 1000000000 sparas = {} sparas['topk'] = 50 sparas['n_drop'] = 230 strategy = TopkDropoutStrategy(**sparas) _, positions = backtest(pred_df, strategy, **bparas) pred_df_dates = pred_df.index.get_level_values(level='datetime') features_df = D.features(D.instruments('csi500'), ['Ref($close, -1)/$close-1'], pred_df_dates.min(), pred_df_dates.max()) features_df.columns = ['label'] qcr.analysis_position.rank_label_graph(positions, features_df, pred_df_dates.min(), pred_df_dates.max())
- 参数:
position -- 头寸数据; qlib.backtest.backtest 结果。
label_data -- D.features 结果;索引为 pd.MultiIndex,索引名为 [instrument, datetime];列名为 [label]。 标签 T 是从 T 到 T+1 的变化,建议使用
close,示例:D.features(D.instruments('csi500'), ['Ref($close, -1)/$close-1'])。 .. 代码块:: python label instrument datetime SH600004 2017-12-11 -0.013502 2017-12-12 -0.072367 2017-12-13 -0.068605 2017-12-14 0.012440 2017-12-15 -0.102778start_date -- 开始日期
end_date -- 结束日期
show_notebook -- True 或 False。如果为 True,则在笔记本中显示图形,否则返回图形。
- 返回:
- qlib.contrib.report.analysis_model.analysis_model_performance.ic_figure(ic_df: DataFrame, show_nature_day=True, **kwargs) Figure
IC 图
- 参数:
ic_df -- ic 数据框
show_nature_day -- 是否显示非交易日的横坐标
**kwargs -- 包含一些参数以控制 plotly 中的绘图样式。目前支持 - rangebreaks: https://plotly.com/python/time-series/#Hiding-Weekends-and-Holidays
- 返回:
plotly.graph_objs.Figure
- qlib.contrib.report.analysis_model.analysis_model_performance.model_performance_graph(pred_label: DataFrame, lag: int = 1, N: int = 5, reverse=False, rank=False, graph_names: list = ['group_return', 'pred_ic', 'pred_autocorr'], show_notebook: bool = True, show_nature_day: bool = False, **kwargs) [<class 'list'>, <class 'tuple'>]
模型性能
- 参数:
pred_label -- 索引是 pd.MultiIndex,索引名称是 [instrument, datetime];列名称是 [score, label]。它通常与模型训练的标签相同(例如:"Ref($close, -2)/Ref($close, -1) - 1")。 .. code-block:: python instrument datetime score label SH600004 2017-12-11 -0.013502 -0.013502 2017-12-12 -0.072367 -0.072367 2017-12-13 -0.068605 -0.068605 2017-12-14 0.012440 0.012440 2017-12-15 -0.102778 -0.102778
lag -- pred.groupby(level='instrument')['score'].shift(lag)。它将仅用于自相关计算。
N -- 组数,默认值为 5。
reverse -- 如果 True,则 pred['score'] *= -1。
rank -- 如果 True,计算排名 ic。
graph_names -- 图形名称;默认值 ['cumulative_return', 'pred_ic', 'pred_autocorr', 'pred_turnover']。
show_notebook -- 是否在笔记本中显示图形,默认值为 True。
show_nature_day -- 是否显示非交易日的横坐标。
**kwargs -- 包含一些参数以控制 plotly 中的绘图样式。目前支持 - rangebreaks: https://plotly.com/python/time-series/#Hiding-Weekends-and-Holidays
- 返回:
如果 show_notebook 为 True,则在笔记本中显示;否则返回 plotly.graph_objs.Figure 列表。
工作流程
实验管理器
- class qlib.workflow.expm.ExpManager(uri: str, default_exp_name: str | None)
这是用于管理实验的 ExpManager 类。API 的设计类似于 mlflow。(链接:https://mlflow.org/docs/latest/python_api/mlflow.html)
预计 ExpManager 是一个单例(顺便说一下,我们可以有多个具有不同 uri 的 Experiment。用户可以从不同的 uri 获取不同的实验,然后比较它们的记录)。全局配置(即 C)也是一个单例。
因此我们尝试将它们对齐。它们共享同一个变量,称为 default uri。有关变量共享的详细信息,请参阅 ExpManager.default_uri。
当用户开始实验时,用户可能希望将 uri 设置为特定的 uri(在此期间将覆盖 default uri),然后取消设置 specific uri 并回退到 default uri。ExpManager._active_exp_uri 是该 specific uri。
- __init__(uri: str, default_exp_name: str | None)
- start_exp(*, experiment_id: str | None = None, experiment_name: str | None = None, recorder_id: str | None = None, recorder_name: str | None = None, uri: str | None = None, resume: bool = False, **kwargs) Experiment
开始一个实验。此方法包括首先获取或创建一个实验,然后将其设置为活动状态。
维护 _active_exp_uri 包含在 start_exp 中,剩余实现应包含在子类的 _end_exp 中。
- 参数:
experiment_id (str) -- 活动实验的 id。
experiment_name (str) -- 活动实验的名称。
recorder_id (str) -- 要启动的记录器的ID。
recorder_name (str) -- 要启动的记录器的名称。
uri (str) -- 当前的跟踪URI。
resume (boolean) -- 是否恢复实验和记录器。
- 返回类型:
An active experiment.
- end_exp(recorder_status: str = 'SCHEDULED', **kwargs)
结束一个活动实验。
维护 _active_exp_uri 包含在 end_exp 中,剩余实现应包含在子类的 _end_exp 中。
- 参数:
experiment_name (str) -- 活动实验的名称。
recorder_status (str) -- 实验的活动记录器的状态。
- create_exp(experiment_name: str | None = None)
创建一个实验。
- 参数:
experiment_name (str) -- 实验名称,必须是唯一的。
- 返回类型:
An experiment object.
- 抛出:
ExpAlreadyExistError --
- search_records(experiment_ids=None, **kwargs)
获取符合实验搜索条件的记录的 pandas DataFrame。输入是用户想要应用的搜索条件。
- 返回:
一个 pandas.DataFrame 的记录,其中每个指标、参数和标签
扩展到各自命名为 metrics.、params.* 和 tags.* 的列。**
分别。对于没有特定指标、参数或标签的记录,它们的
值将分别为 (NumPy) Nan、None 或 None.
- get_exp(*, experiment_id=None, experiment_name=None, create: bool = True, start: bool = False)
检索一个实验。此方法包括获取一个活动实验,以及获取或创建一个特定实验。
当用户指定实验ID和名称时,该方法将尝试返回特定实验。当用户未提供记录器ID或名称时,该方法将尝试返回当前活动实验。create 参数决定该方法是否会根据用户的规范自动创建一个新实验,如果该实验之前尚未创建。
如果 create 为 True:
如果 active experiment 存在:
未指定ID或名称,返回活动实验。
如果指定了ID或名称,返回指定的实验。如果未找到该实验,则使用给定的ID或名称创建一个新实验。如果`start`设置为True,则该实验将被设置为活动状态。
如果`活动实验`不存在:
未指定ID或名称,创建一个默认实验。
如果指定了ID或名称,返回指定的实验。如果未找到该实验,则使用给定的ID或名称创建一个新实验。如果`start`设置为True,则该实验将被设置为活动状态。
否则如果`create`为False:
如果 active experiment 存在:
未指定ID或名称,返回活动实验。
如果指定了ID或名称,返回指定的实验。如果未找到该实验,则引发错误。
如果`活动实验`不存在:
未指定ID或名称。如果默认实验存在,则返回它,否则引发错误。
如果指定了ID或名称,返回指定的实验。如果未找到该实验,则引发错误。
- 参数:
experiment_id (str) -- 要返回的实验ID。
experiment_name (str) -- 要返回的实验名称。
create (boolean) -- 如果实验尚未创建,则创建该实验。
start (boolean) -- 如果创建了一个新实验,则启动该实验。
- 返回类型:
An experiment object.
- delete_exp(experiment_id=None, experiment_name=None)
删除一个实验。
- 参数:
experiment_id (str) -- 实验ID。
experiment_name (str) -- 实验名称。
- property default_uri
从qlib.config.C获取默认跟踪URI。
- property uri
获取默认跟踪URI或当前URI。
- 返回类型:
The tracking URI string.
- list_experiments()
列出所有现有实验。
- 返回类型:
A dictionary (name -> experiment) of experiments information that being stored.
实验
- class qlib.workflow.exp.Experiment(id, name)
这是每个正在运行的实验的`Experiment`类。该API的设计类似于mlflow。(链接:https://mlflow.org/docs/latest/python_api/mlflow.html)
- __init__(id, name)
- start(*, recorder_id=None, recorder_name=None, resume=False)
启动实验并将其设置为活动状态。此方法还将启动一个新的记录器。
- 参数:
recorder_id (str) -- 要创建的记录器的ID。
recorder_name (str) -- 要创建的记录器的名称。
resume (bool) -- 是否恢复第一个记录器。
- 返回类型:
An active recorder.
- end(recorder_status='SCHEDULED')
结束实验。
- 参数:
recorder_status (str) -- 结束时要设置的记录器状态(SCHEDULED, RUNNING, FINISHED, FAILED)。
- create_recorder(recorder_name=None)
为每个实验创建一个记录器。
- 参数:
recorder_name (str) -- 要创建的记录器的名称。
- 返回类型:
A recorder object.
- search_records(**kwargs)
获取符合实验搜索条件的记录的 pandas DataFrame。输入是用户想要应用的搜索条件。
- 返回:
一个 pandas.DataFrame 的记录,其中每个指标、参数和标签
扩展到各自命名为 metrics.、params.* 和 tags.* 的列。**
分别。对于没有特定指标、参数或标签的记录,它们的
值将分别为 (NumPy) Nan、None 或 None.
- delete_recorder(recorder_id)
为每个实验创建一个记录器。
- 参数:
recorder_id (str) -- 要删除的记录器的ID。
- get_recorder(recorder_id=None, recorder_name=None, create: bool = True, start: bool = False) Recorder
为用户检索记录器。当用户指定记录器ID和名称时,该方法将尝试返回特定的记录器。当用户未提供记录器ID或名称时,该方法将尝试返回当前活动的记录器。`create`参数决定如果记录器之前未创建,该方法是否会根据用户的规范自动创建一个新的记录器。
如果 create 为 True:
如果存在`活动记录器`:
未指定ID或名称,返回活动记录器。
如果指定了ID或名称,返回指定的记录器。如果未找到这样的实验,则使用给定的ID或名称创建一个新的记录器。如果`start`设置为True,则记录器被设置为活动状态。
如果不存在`活动记录器`:
未指定ID或名称,创建一个新的记录器。
如果指定了ID或名称,返回指定的实验。如果未找到这样的实验,则使用给定的ID或名称创建一个新的记录器。如果`start`设置为True,则记录器被设置为活动状态。
否则如果`create`为False:
如果存在`活动记录器`:
未指定ID或名称,返回活动记录器。
如果指定了ID或名称,返回指定的记录器。如果未找到这样的实验,则引发错误。
如果不存在`活动记录器`:
未指定ID或名称,引发错误。
如果指定了ID或名称,返回指定的记录器。如果未找到这样的实验,则引发错误。
- 参数:
recorder_id (str) -- 要删除的记录器的ID。
recorder_name (str) -- 要删除的记录器的名称。
create (boolean) -- 如果记录器之前未创建,则创建记录器。
start (boolean) -- 如果创建了新的记录器,则启动该记录器。
- 返回类型:
A recorder object.
- list_recorders(rtype: Literal['dict', 'list'] = 'dict', **flt_kwargs) List[Recorder] | Dict[str, Recorder]
列出该实验的所有现有记录器。请在调用此方法之前先获取实验实例。如果用户想使用方法`R.list_recorders()`,请参考`QlibRecorder`中的相关API文档。
- flt_kwargs字典
通过条件过滤记录器,例如:list_recorders(status=Recorder.STATUS_FI)
- 返回:
如果 rtype == "dict": 一个存储记录器信息的字典(id -> recorder)。否则如果 rtype == "list": 一个记录器的列表。
- 返回类型:
返回类型取决于 rtype
Recorder
- class qlib.workflow.recorder.Recorder(experiment_id, name)
这是用于记录实验的 Recorder 类。API 的设计类似于 mlflow。(链接:https://mlflow.org/docs/latest/python_api/mlflow.html)
记录器的状态可以是 SCHEDULED、RUNNING、FINISHED、FAILED。
- __init__(experiment_id, name)
- save_objects(local_path=None, artifact_path=None, **kwargs)
将对象(如预测文件或模型检查点)保存到工件 URI。用户可以通过关键字参数(name:value)保存对象。
请参考 qlib.workflow:R.save_objects 的文档
- 参数:
local_path (str) -- 如果提供,则将文件或目录保存到工件 URI。
artifact_path=None (str) -- 要存储在 URI 中的工件的相对路径。
- load_object(name)
加载对象,例如预测文件或模型检查点。
- 参数:
name (str) -- 要加载的文件名。
- 返回类型:
The saved object.
- start_run()
开始运行或恢复记录器。返回值可以在 with 块内用作上下文管理器;否则,您必须调用 end_run() 来终止当前运行。(请参见 mlflow 中的 ActiveRun 类)
- 返回类型:
An active running object (e.g. mlflow.ActiveRun object).
- end_run()
结束一个活动的记录器。
- log_params(**kwargs)
记录当前运行的一批参数。
- 参数:
arguments (keyword) -- 要记录为参数的键值对。
- log_metrics(step=None, **kwargs)
记录当前运行的多个指标。
- 参数:
arguments (keyword) -- 要记录为指标的键值对。
- log_artifact(local_path: str, artifact_path: str | None = None)
将本地文件或目录记录为当前活动运行的工件。
- 参数:
local_path (str) -- 要写入的文件路径。
artifact_path (Optional[str]) -- 如果提供,写入``artifact_uri``中的目录。
- set_tags(**kwargs)
为当前运行记录一批标签。
- 参数:
arguments (keyword) -- 要记录为标签的键值对。
- delete_tags(*keys)
从运行中删除一些标签。
- 参数:
keys (series of strs of the keys) -- 要删除的标签的所有名称。
- list_artifacts(artifact_path: str | None = None)
列出记录器的所有工件。
- 参数:
artifact_path (str) -- 要存储在 URI 中的工件的相对路径。
- 返回类型:
A list of artifacts information (name, path, etc.) that being stored.
- download_artifact(path: str, dst_path: str | None = None) str
如果适用,从运行中下载工件文件或目录到本地目录,并返回其本地路径。
- 参数:
path (str) -- 所需工件的相对源路径。
dst_path (Optional[str]) -- 要下载指定工件的本地文件系统目标目录的绝对路径。此目录必须已经存在。如果未指定,工件将下载到本地文件系统上的新唯一命名目录。
- 返回:
所需工件的本地路径。
- 返回类型:
str
- list_metrics()
列出记录器的所有指标。
- 返回类型:
A dictionary of metrics that being stored.
- list_params()
列出记录器的所有参数。
- 返回类型:
A dictionary of params that being stored.
- list_tags()
列出记录器的所有标签。
- 返回类型:
A dictionary of tags that being stored.
记录模板
- class qlib.workflow.record_temp.RecordTemp(recorder)
这是记录模板类,允许用户以特定格式生成实验结果,例如IC和回测。
- save(**kwargs)
它的行为与self.recorder.save_objects相同。但它是一个更简单的接口,因为用户不必关心`get_path`和`artifact_path`
- __init__(recorder)
- generate(**kwargs)
生成某些记录,例如IC、回测等,并保存它们。
- 参数:
kwargs
- load(name: str, parents: bool = True)
它的行为与self.recorder.load_object相同。但它是一个更简单的接口,因为用户不必关心`get_path`和`artifact_path`
- 参数:
name (str) -- 要加载的文件名。
parents (bool) -- 每个记录器都有不同的`artifact_path`。因此,父类递归查找路径时,父类的子类具有更高的优先级
- 返回类型:
The stored records.
- list()
列出支持的工件。用户不必考虑self.get_path
- 返回类型:
A list of all the supported artifacts.
- check(include_self: bool = False, parents: bool = True)
检查记录是否正确生成和保存。这在以下示例中很有用
检查依赖文件在生成新内容之前是否完整。
检查最终文件是否已完成
- 参数:
include_self (bool) -- 是否是由self生成的文件
parents (bool) -- 我们会检查父类吗
- 抛出:
FileNotFoundError -- 记录是否正确存储。
- class qlib.workflow.record_temp.SignalRecord(model=None, dataset=None, recorder=None)
这是生成信号预测的信号记录类。该类继承自``RecordTemp``类。
- __init__(model=None, dataset=None, recorder=None)
- generate(**kwargs)
生成某些记录,例如IC、回测等,并保存它们。
- 参数:
kwargs
- list()
列出支持的工件。用户不必考虑self.get_path
- 返回类型:
A list of all the supported artifacts.
- class qlib.workflow.record_temp.ACRecordTemp(recorder, skip_existing=False)
自动检查记录模板
- __init__(recorder, skip_existing=False)
- generate(*args, **kwargs)
自动检查文件,然后运行具体的生成任务
- class qlib.workflow.record_temp.HFSignalRecord(recorder, **kwargs)
这是生成分析结果(例如IC和IR)的信号分析记录类。该类继承自``RecordTemp``类。
- depend_cls
SignalRecord的别名
- __init__(recorder, **kwargs)
- generate()
生成某些记录,例如IC、回测等,并保存它们。
- 参数:
kwargs
- list()
列出支持的工件。用户不必考虑self.get_path
- 返回类型:
A list of all the supported artifacts.
- class qlib.workflow.record_temp.SigAnaRecord(recorder, ana_long_short=False, ann_scaler=252, label_col=0, skip_existing=False)
这是生成分析结果(例如IC和IR)的信号分析记录类。该类继承自``RecordTemp``类。
- depend_cls
SignalRecord的别名
- __init__(recorder, ana_long_short=False, ann_scaler=252, label_col=0, skip_existing=False)
- list()
列出支持的工件。用户不必考虑self.get_path
- 返回类型:
A list of all the supported artifacts.
- class qlib.workflow.record_temp.PortAnaRecord(recorder, config=None, risk_analysis_freq: List | str | None = None, indicator_analysis_freq: List | str | None = None, indicator_analysis_method=None, skip_existing=False, **kwargs)
这是生成分析结果(例如回测结果)的投资组合分析记录类。该类继承自``RecordTemp``类。
以下文件将存储在记录器中
report_normal.pkl & positions_normal.pkl:
回测的返回报告和详细头寸,由`qlib/contrib/evaluate.py:backtest`返回
port_analysis.pkl : 您的投资组合的风险分析,由 qlib/contrib/evaluate.py:risk_analysis 返回
- depend_cls
SignalRecord的别名
- __init__(recorder, config=None, risk_analysis_freq: List | str | None = None, indicator_analysis_freq: List | str | None = None, indicator_analysis_method=None, skip_existing=False, **kwargs)
- config["strategy"]字典
定义策略类以及关键字参数。
- config["executor"]字典
定义执行器类以及关键字参数。
- config["backtest"]字典
定义回测的关键字参数。
- risk_analysis_freqstr|List[str]
报告的风险分析频率
- indicator_analysis_freqstr|List[str]
报告的指标分析频率
- indicator_analysis_methodstr,可选,默认值为 None
候选值包括 'mean'、'amount_weighted'、'value_weighted'
- list()
列出支持的工件。用户不必考虑self.get_path
- 返回类型:
A list of all the supported artifacts.
- class qlib.workflow.record_temp.MultiPassPortAnaRecord(recorder, pass_num=10, shuffle_init_score=True, **kwargs)
这是多次回测并生成分析结果(如回测结果)的多重通道投资组合分析记录类。该类继承自
PortAnaRecord类。如果启用 shuffle_init_score,则第一次回测日期的预测分数将被打乱,从而使初始头寸随机。shuffle_init_score 仅在信号作为 <PRED> 占位符使用时有效。占位符将被保存在记录器中的 pred.pkl 替换。
- 参数:
recorder (Recorder) -- 用于保存回测结果的记录器。
pass_num (int) -- 回测的通过次数。
shuffle_init_score (bool) -- 是否打乱第一次回测日期的预测分数。
- depend_cls
SignalRecord的别名
- __init__(recorder, pass_num=10, shuffle_init_score=True, **kwargs)
- 参数:
recorder (Recorder) -- 用于保存回测结果的记录器。
pass_num (int) -- 回测的通过次数。
shuffle_init_score (bool) -- 是否打乱第一次回测日期的预测分数。
- list()
列出支持的工件。用户不必考虑self.get_path
- 返回类型:
A list of all the supported artifacts.
任务管理
任务生成器
任务生成器模块可以基于任务生成器和一些任务模板生成许多任务。
- qlib.workflow.task.gen.task_generator(tasks, generators) list
使用任务生成器列表和任务模板列表生成不同的任务。
例如:
有3个任务模板a、b、c和2个任务生成器A、B。A将从一个模板生成2个任务,而B将从一个模板生成3个任务。task_generator([a, b, c], [A, B])最终将生成3*2*3 = 18个任务。
- class qlib.workflow.task.gen.TaskGen
生成不同任务的基类
示例1:
输入:一个特定的任务模板和滚动步骤
输出:任务的滚动版本
示例2:
输入:一个特定的任务模板和损失列表
输出:一组具有不同损失的任务
- abstract generate(task: dict) List[dict]
基于任务模板生成不同的任务
- 参数:
task (dict) -- 一个任务模板
- 返回:
一组任务
- 返回类型:
List[dict]
- qlib.workflow.task.gen.handler_mod(task: dict, rolling_gen)
帮助在使用RollingGen时修改处理程序结束时间,它尝试处理以下情况
处理器的数据结束时间早于数据集的测试数据段。
为了解决这个问题,处理器的数据结束时间被延长。
如果处理器的结束时间为 None,则不需要更改其结束时间。
- 参数:
task (dict) -- 一个任务模板
rg (RollingGen) -- 一个 RollingGen 的实例
- qlib.workflow.task.gen.trunc_segments(ta: TimeAdjuster, segments: Dict[str, Timestamp], days, test_key='test')
为了避免未来信息泄露,段应该根据测试开始时间进行截断。
备注
此函数将**就地**更改段。
- class qlib.workflow.task.gen.RollingGen(step: int = 40, rtype: str = 'expanding', ds_extra_mod_func: None | ~typing.Callable = <function handler_mod>, test_key='test', train_key='train', trunc_days: int | None = None, task_copy_func: ~typing.Callable = <function deepcopy>)
- __init__(step: int = 40, rtype: str = 'expanding', ds_extra_mod_func: None | ~typing.Callable = <function handler_mod>, test_key='test', train_key='train', trunc_days: int | None = None, task_copy_func: ~typing.Callable = <function deepcopy>)
为滚动生成任务
- 参数:
step (int) -- 滚动步骤
rtype (str) -- 滚动类型(扩展,滑动)
ds_extra_mod_func (Callable) -- 一种方法:handler_mod(task: dict, rg: RollingGen) 在生成任务后执行一些额外操作。例如,使用
handler_mod修改数据集处理器的结束时间。trunc_days (int) -- 截断一些数据以避免未来信息泄露
task_copy_func (Callable) -- 复制整个任务的函数。当用户想在任务之间共享某些内容时,这非常有用。
- gen_following_tasks(task: dict, test_end: Timestamp) List[dict]
为 task 生成后续滚动任务,直到 test_end
- 参数:
task (dict) -- Qlib 任务格式
test_end (pd.Timestamp) -- 最新的滚动任务包括 test_end
- 返回:
task 的后续任务(task 本身被排除在外)
- 返回类型:
List[dict]
- generate(task: dict) List[dict]
将任务转换为滚动任务。
- 参数:
task (dict) -- 描述任务的字典。例如。 .. code-block:: python DEFAULT_TASK = { "model": { "class": "LGBModel", "module_path": "qlib.contrib.model.gbdt", }, "dataset": { "class": "DatasetH", "module_path": "qlib.data.dataset", "kwargs": { "handler": { "class": "Alpha158", "module_path": "qlib.contrib.data.handler", "kwargs": { "start_time": "2008-01-01", "end_time": "2020-08-01", "fit_start_time": "2008-01-01", "fit_end_time": "2014-12-31", "instruments": "csi100", }, }, "segments": { "train": ("2008-01-01", "2014-12-31"), "valid": ("2015-01-01", "2016-12-20"), # 请避免将未来测试数据泄露到验证中 "test": ("2017-01-01", "2020-08-01"), }, }, }, "record": [ { "class": "SignalRecord", "module_path": "qlib.workflow.record_temp", }, ] }
- 返回:
List[dict]
- 返回类型:
a list of tasks
- class qlib.workflow.task.gen.MultiHorizonGenBase(horizon: List[int] = [5], label_leak_n=2)
- __init__(horizon: List[int] = [5], label_leak_n=2)
该任务生成器尝试根据现有任务为不同的时间范围生成任务
- 参数:
horizon (List[int]) -- 任务的可能时间范围
label_leak_n (int) -- 从预测日开始,完成标签所需的未来天数。例如:- 用户在第 T 天进行预测(在获取第 T 天的收盘价后) - 标签是第 T + 1 天买入股票并在第 T + 2 天卖出的收益 - label_leak_n 将为 2(例如,有两天的信息泄露以利用此样本)
- abstract set_horizon(task: dict, hr: int)
该方法旨在**就地**更改任务
- 参数:
task (dict) -- Qlib 的任务
hr (int) -- 任务的时间范围
- generate(task: dict)
基于任务模板生成不同的任务
- 参数:
task (dict) -- 一个任务模板
- 返回:
一组任务
- 返回类型:
List[dict]
任务管理器
TaskManager can fetch unused tasks automatically and manage the lifecycle of a set of tasks with error handling. These features can run tasks concurrently and ensure every task will be used only once. Task Manager will store all tasks in MongoDB. Users MUST finished the configuration of MongoDB when using this module.
任务管理器中的任务由三部分组成 - 任务描述:描述将定义任务 - 任务状态:任务的状态 - 任务结果:用户可以通过任务描述和任务结果获取任务。
- class qlib.workflow.task.manage.TaskManager(task_pool: str)
以下是任务管理器创建的任务的样子
{ 'def': pickle serialized task definition. using pickle will make it easier 'filter': json-like data. This is for filtering the tasks. 'status': 'waiting' | 'running' | 'done' 'res': pickle serialized task result, }
任务管理器假设您只会更新您获取的任务。Mongo 的获取和更新将使日期更新安全。
此类可以作为命令行工具使用。以下是几个示例。您可以使用以下命令查看管理模块的帮助:python -m qlib.workflow.task.manage -h # 显示管理模块 CLI 的手册 python -m qlib.workflow.task.manage wait -h # 显示管理命令的手册
python -m qlib.workflow.task.manage -t <pool_name> wait python -m qlib.workflow.task.manage -t <pool_name> task_stat
备注
假设:MongoDB 中的数据已编码,MongoDB 中的数据已解码
以下是四种状态:
STATUS_WAITING:等待训练
STATUS_RUNNING:训练中
STATUS_PART_DONE:完成某些步骤并等待下一步
STATUS_DONE:所有工作完成
- __init__(task_pool: str)
初始化任务管理器,记得首先声明 MongoDB 的 URL 和数据库名称。一个 TaskManager 实例服务于特定的任务池。该模块的静态方法服务于整个 MongoDB。
- 参数:
task_pool (str) -- MongoDB 中的集合名称
- static list() list
列出数据库的所有集合(task_pool)。
- 返回:
list
- replace_task(task, new_task)
用一个新任务替换一个旧任务
- 参数:
task -- 旧任务
new_task -- 新任务
- insert_task(task)
插入一个任务。
- 参数:
task -- 等待插入的任务
- 返回:
pymongo.results.InsertOneResult
- insert_task_def(task_def)
将一个任务插入到task_pool
- 参数:
task_def (dict) -- 任务定义
- 返回类型:
pymongo.results.InsertOneResult
- create_task(task_def_l, dry_run=False, print_nt=False) List[str]
如果task_def_l中的任务是新的,则将新任务插入到task_pool,并记录inserted_id。如果任务不是新的,则仅查询其_id。
- 参数:
task_def_l (list) -- 任务列表
dry_run (bool) -- 如果将这些新任务插入到任务池
print_nt (bool) -- 如果打印新任务
- 返回:
task_def_l的_id列表
- 返回类型:
List[str]
- fetch_task(query={}, status='waiting') dict
使用查询获取任务。
- 参数:
query (dict, optional) -- 查询字典。默认为{}。
status (str, optional) -- [描述]。默认为STATUS_WAITING。
- 返回:
解码后的任务(集合中的文档)
- 返回类型:
dict
- safe_fetch_task(query={}, status='waiting')
使用上下文管理器从task_pool中获取任务
- 参数:
query (dict) -- 查询的字典
- 返回:
字典
- 返回类型:
a task(document in collection) after decoding
- query(query={}, decode=True)
在集合中查询任务。如果迭代生成器的时间过长,此函数可能会引发异常 pymongo.errors.CursorNotFound: cursor id not found
python -m qlib.workflow.task.manage -t <你的任务池> query '{"_id": "615498be837d0053acbc5d58"}'
- 参数:
query (dict) -- 查询的字典
decode (bool)
- 返回:
字典
- 返回类型:
a task(document in collection) after decoding
- re_query(_id) dict
使用 _id 查询任务。
- 参数:
_id (str) -- 文档的 _id
- 返回:
解码后的任务(集合中的文档)
- 返回类型:
dict
- commit_task_res(task, res, status='done')
将结果提交到 task['res']。
- 参数:
task ([type]) -- [描述]
res (object) -- 你想要保存的结果
status (str, optional) -- STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_DONE。
- return_task(task, status='waiting')
将任务返回到状态。始终在错误处理时使用。
- 参数:
task ([type]) -- [描述]
status (str, optional) -- STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_WAITING。
- remove(query={})
使用查询删除任务
- 参数:
query (dict) -- 查询的字典
- task_stat(query={}) dict
计算每个状态中的任务数量。
- 参数:
query (dict, optional) -- 查询字典。默认为 {}。
- 返回:
字典
- reset_waiting(query={})
将所有正在运行的任务重置为等待状态。当某些正在运行的任务意外退出时,可以使用。
- 参数:
query (dict, optional) -- 查询字典。默认为 {}。
- prioritize(task, priority: int)
为任务设置优先级
- 参数:
task (dict) -- 从数据库中查询任务
priority (int) -- 目标优先级
- wait(query={})
在多进程时,主进程可能无法从 TaskManager 获取任何内容,因为仍然有一些正在运行的任务。因此,主进程应该等待,直到所有任务都被其他进程或机器训练好。
- 参数:
query (dict, optional) -- 查询字典。默认为 {}。
- qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)
当任务池不为空(有等待任务)时,使用 task_func 从 task_pool 获取并运行任务
运行此方法后,以下是4种情况(before_status -> after_status):
STATUS_WAITING -> STATUS_DONE:使用 task["def"] 作为 task_func 参数,这意味着任务尚未开始
STATUS_WAITING -> STATUS_PART_DONE:使用 task["def"] 作为 task_func 参数
STATUS_PART_DONE -> STATUS_PART_DONE:使用 task["res"] 作为 task_func 参数,这意味着任务已开始但未完成
STATUS_PART_DONE -> STATUS_DONE:使用 task["res"] 作为 task_func 参数
- 参数:
task_func (Callable) -- def (task_def, **kwargs) -> <res which will be committed> 执行任务的函数
task_pool (str) -- 任务池的名称(MongoDB中的集合)
query (dict) -- 在获取任务时将使用此字典查询任务池
force_release (bool) -- 程序是否会强制释放资源
before_status (str:) -- 在 before_status 中的任务将被获取并训练。可以是 STATUS_WAITING 或 STATUS_PART_DONE。
after_status (str:) -- 训练后的任务将变为 after_status。可以是 STATUS_WAITING 或 STATUS_PART_DONE。
kwargs -- 用于 task_func 的参数
训练器
训练器将训练一组任务并返回一组模型记录器。每个训练器包括两个步骤:``train``(生成模型记录器)和 ``end_train``(修改模型记录器)。
这是一个称为 DelayTrainer 的概念,可以用于在线模拟以进行并行训练。在 DelayTrainer 中,第一步仅保存一些必要的信息到模型记录器中,第二步将在最后完成,可以进行一些并发和耗时的操作,例如模型拟合。
Qlib 提供两种类型的训练器,TrainerR 是最简单的方式,TrainerRM 基于 TaskManager 自动管理任务生命周期。
- qlib.model.trainer.begin_task_train(task_config: dict, experiment_name: str, recorder_name: str | None = None) Recorder
开始任务训练以启动记录器并保存任务配置。
- 参数:
task_config (dict) -- 任务的配置
experiment_name (str) -- 实验名称
recorder_name (str) -- 给定的名称将作为记录者名称。使用rid时为无。
- 返回:
模型记录器
- 返回类型:
- qlib.model.trainer.task_train(task_config: dict, experiment_name: str, recorder_name: str | None = None) Recorder
基于任务的训练,将分为两个步骤。
- 参数:
task_config (dict) -- 任务的配置。
experiment_name (str) -- 实验名称
recorder_name (str) -- 记录器名称
- 返回:
记录器
- 返回类型:
The instance of the recorder
- class qlib.model.trainer.Trainer
训练器可以训练一系列模型。有Trainer和DelayTrainer,可以通过完成真实训练的时间来区分。
- __init__()
- train(tasks: list, *args, **kwargs) list
给定一系列任务定义,开始训练,并返回模型。
对于Trainer,它在此方法中完成真实训练。对于DelayTrainer,它仅在此方法中做一些准备。
- 参数:
tasks -- 一组任务
- 返回:
一系列模型
- 返回类型:
list
- end_train(models: list, *args, **kwargs) list
给定一系列模型,如果需要,在训练结束时完成某些操作。模型可以是记录器、txt文件、数据库等。
对于Trainer,它在此方法中做一些收尾工作。对于DelayTrainer,它在此方法中完成真实训练。
- 参数:
models -- 一系列模型
- 返回:
一系列模型
- 返回类型:
list
- is_delay() bool
如果Trainer将延迟完成`end_train`。
- 返回:
如果是DelayTrainer
- 返回类型:
bool
- has_worker() bool
某些训练器有后端工作者以支持并行训练。此方法可以判断工作者是否启用。
- 返回:
如果工作者已启用
- 返回类型:
bool
- worker()
启动工作者
- 抛出:
NotImplementedError: -- 如果不支持该工作者
- class qlib.model.trainer.TrainerR(experiment_name: str | None = None, train_func: ~typing.Callable = <function task_train>, call_in_subproc: bool = False, default_rec_name: str | None = None)
基于(R)ecorder的训练器。它将训练一系列任务,并以线性方式返回一系列模型记录器。
假设:模型由`task`定义,结果将保存到`Recorder`。
- __init__(experiment_name: str | None = None, train_func: ~typing.Callable = <function task_train>, call_in_subproc: bool = False, default_rec_name: str | None = None)
初始化TrainerR。
- 参数:
experiment_name (str, optional) -- 实验的默认名称。
train_func (Callable, optional) -- 默认训练方法。默认为`task_train`。
call_in_subproc (bool) -- 在子进程中调用该过程以强制释放内存
- train(tasks: list, train_func: Callable | None = None, experiment_name: str | None = None, **kwargs) List[Recorder]
给定一系列`tasks`并返回一系列训练好的Recorder。可以保证顺序。
- 参数:
tasks (list) -- 基于`task`字典的定义列表
train_func (Callable) -- 训练方法至少需要`tasks`和`experiment_name`。为默认训练方法时为None。
experiment_name (str) -- 实验名称,使用默认名称时为None。
kwargs -- train_func的参数。
- 返回:
一系列Recorders
- 返回类型:
List[Recorder]
- class qlib.model.trainer.DelayTrainerR(experiment_name: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, **kwargs)
基于TrainerR的延迟实现,这意味着`train`方法可能仅进行一些准备,而`end_train`方法可以进行实际的模型拟合。
- __init__(experiment_name: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, **kwargs)
初始化TrainerRM。
- 参数:
experiment_name (str) -- 实验的默认名称。
train_func (Callable, optional) -- 默认训练方法。默认为`begin_task_train`。
end_train_func (Callable, optional) -- 默认的 end_train 方法。默认为 end_task_train。
- end_train(models, end_train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]
给定一个 Recorder 列表并返回一个训练后的 Recorder 列表。该类将完成真实数据加载和模型拟合。
- 参数:
models (list) -- 一个 Recorder 列表,任务已保存到它们中
end_train_func (Callable, optional) -- 需要至少 recorders 和 experiment_name 的 end_train 方法。默认为 None,以使用 self.end_train_func。
experiment_name (str) -- 实验名称,使用默认名称时为None。
kwargs -- end_train_func 的参数。
- 返回:
一系列Recorders
- 返回类型:
List[Recorder]
- class qlib.model.trainer.TrainerRM(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function task_train>, skip_run_task: bool = False, default_rec_name: str | None = None)
基于 (R)ecorder 和 Task(M)anager 的训练器。它可以训练一系列任务并以多进程方式返回一系列模型记录器。
假设:task 将被保存到 TaskManager 中,并且 task 将从 TaskManager 中获取并训练。
- __init__(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function task_train>, skip_run_task: bool = False, default_rec_name: str | None = None)
初始化TrainerR。
- 参数:
experiment_name (str) -- 实验的默认名称。
task_pool (str) -- TaskManager 中的任务池名称。为 None 时使用与 experiment_name 相同的名称。
train_func (Callable, optional) -- 默认训练方法。默认为`task_train`。
skip_run_task (bool) -- 如果 skip_run_task == True:仅在工作线程中运行任务。否则跳过运行任务。
- train(tasks: list, train_func: Callable | None = None, experiment_name: str | None = None, before_status: str = 'waiting', after_status: str = 'done', default_rec_name: str | None = None, **kwargs) List[Recorder]
给定一系列`tasks`并返回一系列训练好的Recorder。可以保证顺序。
该方法默认为单进程,但 TaskManager 提供了一个很好的并行训练方式。用户可以自定义他们的 train_func 来实现多个进程甚至多个机器。
- 参数:
tasks (list) -- 基于`task`字典的定义列表
train_func (Callable) -- 训练方法至少需要`tasks`和`experiment_name`。为默认训练方法时为None。
experiment_name (str) -- 实验名称,使用默认名称时为None。
before_status (str) -- 在 before_status 中的任务将被获取并训练。可以是 STATUS_WAITING 或 STATUS_PART_DONE。
after_status (str) -- 训练后的任务将变为 after_status。可以是 STATUS_WAITING 或 STATUS_PART_DONE。
kwargs -- train_func的参数。
- 返回:
一系列Recorders
- 返回类型:
List[Recorder]
- end_train(recs: list, **kwargs) List[Recorder]
将STATUS_END标签设置到记录器。
- 参数:
recs (list) -- 一系列训练好的记录器。
- 返回:
与参数相同的列表。
- 返回类型:
List[Recorder]
- worker(train_func: Callable | None = None, experiment_name: str | None = None)
用于 train 的多进程方法。它可以与 train 共享相同的任务池,并可以在其他进程或其他机器上运行。
- 参数:
train_func (Callable) -- 训练方法至少需要`tasks`和`experiment_name`。为默认训练方法时为None。
experiment_name (str) -- 实验名称,使用默认名称时为None。
- has_worker() bool
某些训练器有后端工作者以支持并行训练。此方法可以判断工作者是否启用。
- 返回:
如果工作者已启用
- 返回类型:
bool
- class qlib.model.trainer.DelayTrainerRM(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, skip_run_task: bool = False, **kwargs)
基于 TrainerRM 的延迟实现,这意味着 train 方法可能仅执行一些准备工作,而 end_train 方法可以进行真实的模型拟合。
- __init__(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, skip_run_task: bool = False, **kwargs)
初始化 DelayTrainerRM。
- 参数:
experiment_name (str) -- 实验的默认名称。
task_pool (str) -- TaskManager 中的任务池名称。为 None 时使用与 experiment_name 相同的名称。
train_func (Callable, optional) -- 默认训练方法。默认为`begin_task_train`。
end_train_func (Callable, optional) -- 默认的 end_train 方法。默认为 end_task_train。
skip_run_task (bool) -- 如果 skip_run_task == True:仅在工作线程中运行任务。否则跳过运行任务。例如,在 CPU 虚拟机上启动训练器,然后等待任务在 GPU 虚拟机上完成。
- train(tasks: list, train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]
与 TrainerRM 的 train 相同,after_status 将为 STATUS_PART_DONE。
- 参数:
tasks (list) -- 基于 task 字典的一系列定义
train_func (Callable) -- 需要至少 tasks 和 experiment_name 的 train 方法。默认为 None,以使用 self.train_func。
experiment_name (str) -- 实验名称,使用默认名称时为None。
- 返回:
一系列Recorders
- 返回类型:
List[Recorder]
- end_train(recs, end_train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]
给定一个 Recorder 列表并返回一个训练后的 Recorder 列表。该类将完成真实数据加载和模型拟合。
- 参数:
recs (list) -- 一个 Recorder 列表,任务已保存到它们中。
end_train_func (Callable, optional) -- 需要至少 recorders 和 experiment_name 的 end_train 方法。默认为 None,以使用 self.end_train_func。
experiment_name (str) -- 实验名称,使用默认名称时为None。
kwargs -- end_train_func 的参数。
- 返回:
一系列Recorders
- 返回类型:
List[Recorder]
- worker(end_train_func=None, experiment_name: str | None = None)
用于 end_train 的多进程方法。它可以与 end_train 共享相同的任务池,并可以在其他进程或其他机器上运行。
- 参数:
end_train_func (Callable, optional) -- 需要至少 recorders 和 experiment_name 的 end_train 方法。默认为 None,以使用 self.end_train_func。
experiment_name (str) -- 实验名称,使用默认名称时为None。
- has_worker() bool
某些训练器有后端工作者以支持并行训练。此方法可以判断工作者是否启用。
- 返回:
如果工作者已启用
- 返回类型:
bool
收集器
收集器模块可以从各处收集对象并处理它们,例如合并、分组、平均等。
- class qlib.workflow.task.collect.Collector(process_list=[])
收集器用于收集不同的结果
- __init__(process_list=[])
初始化收集器。
- 参数:
process_list (list or Callable) -- 处理字典的处理器列表或处理器实例。
- collect() dict
收集结果并返回一个字典,如 {key: things}
- 返回:
收集后的字典。例如:{"prediction": pd.Series} {"IC": {"Xgboost": pd.Series, "LSTM": pd.Series}} ...
- 返回类型:
dict
- static process_collect(collected_dict, process_list=[], *args, **kwargs) dict
对收集返回的字典进行一系列处理,并返回一个字典,如 {key: things} 例如,您可以进行分组和集成。
- 参数:
collected_dict (dict) -- 由 collect 返回的字典
process_list (list or Callable) -- 处理字典的处理器列表或处理器实例。处理器顺序与列表顺序相同。例如:[Group1(..., Ensemble1()), Group2(..., Ensemble2())]
- 返回:
处理后的字典。
- 返回类型:
dict
- class qlib.workflow.task.collect.MergeCollector(collector_dict: Dict[str, Collector], process_list: List[Callable] = [], merge_func=None)
一个收集器,用于收集其他收集器的结果
例如:
我们有两个收集器,分别命名为 A 和 B。A 可以收集 {"prediction": pd.Series},而 B 可以收集 {"IC": {"Xgboost": pd.Series, "LSTM": pd.Series}}。然后在这个类的收集之后,我们可以收集到 {"A_prediction": pd.Series, "B_IC": {"Xgboost": pd.Series, "LSTM": pd.Series}}
...
- __init__(collector_dict: Dict[str, Collector], process_list: List[Callable] = [], merge_func=None)
初始化合并收集器。
- 参数:
collector_dict (Dict[str,Collector]) -- 字典如 {collector_key, Collector}
process_list (List[Callable]) -- 处理字典的处理器列表或处理器实例。
merge_func (Callable) -- 生成最外层键的方法。给定的参数是来自collector_dict的``collector_key``和每个收集器在收集后得到的``key``。使用元组连接它们时为None,例如"ABC"+("a","b") -> ("ABC", ("a","b")).
- collect() dict
收集collector_dict的所有结果并将最外层键更改为重组键。
- 返回:
收集后的字典。
- 返回类型:
dict
- class qlib.workflow.task.collect.RecorderCollector(experiment, process_list=[], rec_key_func=None, rec_filter_func=None, artifacts_path={'pred': 'pred.pkl'}, artifacts_key=None, list_kwargs={}, status: Iterable = {'FINISHED'})
- __init__(experiment, process_list=[], rec_key_func=None, rec_filter_func=None, artifacts_path={'pred': 'pred.pkl'}, artifacts_key=None, list_kwargs={}, status: Iterable = {'FINISHED'})
初始化RecorderCollector。
- 参数:
experiment -- (Experiment或str): Experiment的实例或Experiment的名称 (Callable): 一个可调用函数,返回实验列表。
process_list (list or Callable) -- 处理字典的处理器列表或处理器实例。
rec_key_func (Callable) -- 获取记录器键的函数。如果为None,则使用记录器ID。
rec_filter_func (Callable, optional) -- 通过返回True或False来过滤记录器。默认为None。
artifacts_path (dict, optional) -- 记录器中的工件名称及其路径。默认为{"pred": "pred.pkl", "IC": "sig_analysis/ic.pkl"}。
artifacts_key (str or List, optional) -- 您想要获取的工件键。如果为None,则获取所有工件。
list_kwargs (str) -- list_recorders函数的参数。
status (Iterable) -- 仅收集具有特定状态的记录器。None表示收集所有记录器。
- collect(artifacts_key=None, rec_filter_func=None, only_exist=True) dict
根据过滤后的记录器收集不同的工件。
- 参数:
artifacts_key (str or List, optional) -- 您想要获取的工件键。如果为None,则使用默认值。
rec_filter_func (Callable, optional) -- 通过返回True或False来过滤记录器。如果为None,则使用默认值。
only_exist (bool, optional) -- 仅在记录器确实存在工件时才收集。如果为True,则在加载时出现异常的记录器将不会被收集。但如果为False,则会引发异常。
- 返回:
收集后的字典,如{artifact: {rec_key: object}}。
- 返回类型:
dict
- get_exp_name() str
获取实验名称
- 返回:
实验名称
- 返回类型:
str
组
组可以基于`group_func`对一组对象进行分组并将其更改为字典。分组后,我们提供一种方法来减少它们。
例如:
group: {(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}} reduce: {(A,B): {C1: object, C2: object}} -> {(A,B): object}
- class qlib.model.ens.group.Group(group_func=None, ens: Ensemble | None = None)
根据字典对对象进行分组
- __init__(group_func=None, ens: Ensemble | None = None)
初始化分组。
- 参数:
group_func (Callable, optional) -- 给定一个字典,返回分组键和一个分组元素。例如:{(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}
None. (Defaults to)
ens (Ensemble, optional) -- 如果不为None,在分组后对分组值进行集成。
- group(*args, **kwargs) dict
对一组对象进行分组并将其转换为字典。
例如:{(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}
- 返回:
分组字典
- 返回类型:
dict
- reduce(*args, **kwargs) dict
减少分组字典。
例如:{(A,B): {C1: object, C2: object}} -> {(A,B): object}
- 返回:
减少后的字典
- 返回类型:
dict
- class qlib.model.ens.group.RollingGroup(ens=<qlib.model.ens.ensemble.RollingEnsemble object>)
对滚动字典进行分组
- group(rolling_dict: dict) dict
给定一个类似于{(A,B,R): things}的滚动字典,返回类似于{(A,B): {R:things}}的分组字典
注意:有一个假设,即滚动键在键元组的末尾,因为滚动结果总是需要首先进行集成。
- 参数:
rolling_dict (dict) -- 一个滚动字典。如果键不是元组,则不执行任何操作。
- 返回:
分组字典
- 返回类型:
dict
集成
集成模块可以合并集成中的对象。例如,如果有多个子模型的预测,我们可能需要将它们合并为一个集成预测。
- class qlib.model.ens.ensemble.Ensemble
将ensemble_dict合并为一个集成对象。
例如:{Rollinga_b: object, Rollingb_c: object} -> object
调用此类时:
- 参数:
ensemble_dict (字典): 等待合并的集合字典,如 {name: things}
- 返回:
对象: 集合对象
- class qlib.model.ens.ensemble.SingleKeyEnsemble
如果字典中只有一个键值对,则提取该对象。使结果更具可读性。 {唯一键: 唯一值} -> 唯一值
如果有多个键或少于一个键,则不执行任何操作。甚至可以递归运行此操作以使字典更具可读性。
注意: 默认情况下递归运行。
调用此类时:
- 参数:
ensemble_dict (字典): 字典。字典的键将被忽略。
- 返回:
字典: 可读的字典。
- class qlib.model.ens.ensemble.RollingEnsemble
将类似 prediction 或 IC 的滚动数据框字典合并为一个集合。
注意: 字典的值必须是 pd.DataFrame,并且具有 "datetime" 索引。
调用此类时:
- 参数:
ensemble_dict (字典): 类似 {"A": pd.DataFrame, "B": pd.DataFrame} 的字典。字典的键将被忽略。
- 返回:
pd.DataFrame: 滚动的完整结果。
- class qlib.model.ens.ensemble.AverageEnsemble
将相同形状的数据框字典,如 prediction 或 IC,进行平均和标准化,合并为一个集合。
注意: 字典的值必须是 pd.DataFrame,并且具有 "datetime" 索引。如果是嵌套字典,则将其扁平化。
调用此类时:
- 参数:
ensemble_dict (字典): 类似 {"A": pd.DataFrame, "B": pd.DataFrame} 的字典。字典的键将被忽略。
- 返回:
pd.DataFrame: 平均和标准化的完整结果。
工具
一些任务管理工具。
- qlib.workflow.task.utils.get_mongodb() Database
获取 MongoDB 中的数据库,这意味着您需要首先声明地址和数据库名称。
例如:
使用 qlib.init():
mongo_conf = { "task_url": task_url, # your MongoDB url "task_db_name": task_db_name, # database name } qlib.init(..., mongo=mongo_conf)
在 qlib.init() 之后:
C["mongo"] = { "task_url" : "mongodb://localhost:27017/", "task_db_name" : "rolling_db" }
- 返回:
数据库实例
- 返回类型:
Database
- qlib.workflow.task.utils.list_recorders(experiment, rec_filter_func=None)
列出在实验中可以通过过滤器的所有记录器。
- 参数:
experiment (str or Experiment) -- 实验的名称或实例
rec_filter_func (Callable, optional) -- 返回 True 以保留给定的记录器。默认为 None。
- 返回:
过滤后的字典 {rid: recorder}。
- 返回类型:
dict
- class qlib.workflow.task.utils.TimeAdjuster(future=True, end_time=None)
找到合适的日期并调整日期。
- __init__(future=True, end_time=None)
- set_end_time(end_time=None)
设置结束时间。为 None 时使用日历的结束时间。
- 参数:
end_time
- get(idx: int)
通过索引获取日期时间。
- 参数:
idx (int) -- 日历的索引
- max() Timestamp
返回最大日历日期时间
- align_idx(time_point, tp_type='start') int
对齐日历中时间点的索引。
- 参数:
time_point
tp_type (str)
- 返回:
索引
- 返回类型:
int
- cal_interval(time_point_A, time_point_B) int
计算交易日间隔 (time_point_A - time_point_B)
- 参数:
time_point_A -- time_point_A
time_point_B -- time_point_B(是 time_point_A 的过去)
- 返回:
A 和 B 之间的间隔
- 返回类型:
int
- align_time(time_point, tp_type='start') Timestamp
将时间点对齐到日历的交易日期
- 参数:
time_point -- 时间点
tp_type -- 字符串时间点类型 ("start", "end")
- 返回:
pd.Timestamp
- align_seg(segment: dict | tuple) dict | tuple
将给定日期对齐到交易日期
例如:
input: {'train': ('2008-01-01', '2014-12-31'), 'valid': ('2015-01-01', '2016-12-31'), 'test': ('2017-01-01', '2020-08-01')} output: {'train': (Timestamp('2008-01-02 00:00:00'), Timestamp('2014-12-31 00:00:00')), 'valid': (Timestamp('2015-01-05 00:00:00'), Timestamp('2016-12-30 00:00:00')), 'test': (Timestamp('2017-01-03 00:00:00'), Timestamp('2020-07-31 00:00:00'))}
- 参数:
segment
- 返回:
Union[dict, tuple]
- 返回类型:
the start and end trade date (pd.Timestamp) between the given start and end date.
- truncate(segment: tuple, test_start, days: int) tuple
根据 test_start 日期截断该段
- 参数:
segment (tuple) -- 时间段
test_start
days (int) -- 在此段中需要截断数据的交易日可能需要 'days' 数据,days 是基于 test_start 的。例如,如果标签包含未来 2 天的信息,预测范围为 1 天。(例如,预测目标是 Ref($close, -2)/Ref($close, -1) - 1)则天数应为 2 + 1 == 3 天。
- 返回:
tuple
- 返回类型:
new segment
- shift(seg: tuple, step: int, rtype='sliding') tuple
移动时间段的日期时间
如果该段中存在 None(表示无界索引),此方法将返回 None。
- 参数:
seg -- 日期时间段
step (int) -- 滚动步长
rtype (str) -- 滚动类型("滑动"或"扩展")
- 返回:
tuple
- 返回类型:
new segment
- 抛出:
KeyError: -- 如果索引(开始和结束)超出 self.cal,shift 将引发错误。
- qlib.workflow.task.utils.replace_task_handler_with_cache(task: dict, cache_dir: str | Path = '.') dict
用缓存处理程序替换任务中的处理程序。它将自动缓存文件并保存在 cache_dir 中。
>>> import qlib >>> qlib.auto_init() >>> import datetime >>> # it is simplified task >>> task = {"dataset": {"kwargs":{'handler': {'class': 'Alpha158', 'module_path': 'qlib.contrib.data.handler', 'kwargs': {'start_time': datetime.date(2008, 1, 1), 'end_time': datetime.date(2020, 8, 1), 'fit_start_time': datetime.date(2008, 1, 1), 'fit_end_time': datetime.date(2014, 12, 31), 'instruments': 'CSI300'}}}}} >>> new_task = replace_task_handler_with_cache(task) >>> print(new_task) {'dataset': {'kwargs': {'handler': 'file...Alpha158.3584f5f8b4.pkl'}}}
在线服务
在线管理器
在线管理器可以管理一组 在线策略 并动态运行它们。
随着时间的变化,决定性模型也会发生变化。在此模块中,我们称这些贡献模型为 在线 模型。在每个例行程序(例如每天或每分钟),在线 模型可能会发生变化,且它们的预测需要更新。因此,此模块提供了一系列方法来控制此过程。
此模块还提供了一种方法来模拟历史中的 在线策略。这意味着您可以验证您的策略或找到更好的策略。
在不同情况下使用不同训练器的总情况有4种:
情况 |
描述 |
|---|---|
在线 + 训练器 |
当你想进行真实的例行训练时,训练器将帮助你训练模型。它将逐个任务和策略地训练模型。 |
在线 + 延迟训练器 |
延迟训练器将在所有任务通过不同策略准备好之前跳过具体训练。它使用户能够在`例行`或`首次训练`结束时并行训练所有任务。否则,当每个策略准备任务时,这些功能将会卡住。 |
模拟 + 训练器 |
它的行为与`在线 + 训练器`相同。唯一的区别是它用于模拟/回测,而不是在线交易。 |
模拟 + 延迟训练器 |
当你的模型没有任何时间依赖性时,你可以使用延迟训练器来实现多任务处理。这意味着所有例行中的所有任务可以在模拟结束时进行真实训练。信号将在不同的时间段内准备好(基于是否有新的模型在线)。 |
以下是一些伪代码,演示每种情况的工作流程
- 为了简化
策略中只使用一个策略
`update_online_pred`仅在在线模式下调用,且被忽略
在线 + 训练器
tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
trainer.end_train(models)
prepare_signals() # prepare trading signals daily
在线 + 延迟训练器:工作流程与`在线 + 训练器`相同。
模拟 + 延迟训练器
# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()
# 我们能简化当前的工作流程吗?
能减少任务的状态数量吗?
对于每个任务,我们有三个阶段(即任务、部分训练任务、最终训练任务)
- class qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
OnlineManager 可以管理在线模型,使用 在线策略。它还提供了一个历史记录,记录哪些模型在什么时间在线。
- __init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
初始化 OnlineManager。一个 OnlineManager 至少必须有一个 OnlineStrategy。
- 参数:
strategies (Union[OnlineStrategy, List[OnlineStrategy]]) -- 一个 OnlineStrategy 的实例或一个 OnlineStrategy 的列表
begin_time (Union[str,pd.Timestamp], optional) -- OnlineManager 将在此时间开始。默认为 None,以使用最新日期。
trainer (qlib.model.trainer.Trainer) -- 用于训练任务的训练器。None 表示使用 TrainerR。
freq (str, optional) -- 数据频率。默认为 "day"。
- first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})
从每个策略的 first_tasks 方法获取任务并进行训练。如果使用 DelayTrainer,它可以在每个策略的 first_tasks 之后完成所有训练。
- 参数:
strategies (List[OnlineStrategy]) -- 策略列表(添加策略时需要此参数)。None 表示使用默认策略。
model_kwargs (dict) -- 用于 prepare_online_models 的参数
- routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})
每个策略的典型更新过程并记录在线历史。
常规后的典型更新过程,例如逐日或逐月。该过程为:更新预测 -> 准备任务 -> 准备在线模型 -> 准备信号。
如果使用 DelayTrainer,它可以在每个策略的 prepare_tasks 之后完成所有训练。
- 参数:
cur_time (Union[str,pd.Timestamp], optional) -- 在此时间运行常规方法。默认为 None。
task_kwargs (dict) -- 用于 prepare_tasks 的参数
model_kwargs (dict) -- 用于 prepare_online_models 的参数
signal_kwargs (dict) -- 用于 prepare_signals 的参数
- get_collector(**kwargs) MergeCollector
获取 Collector 的实例,以从每个策略收集结果。该收集器可以作为信号准备的基础。
- 参数:
**kwargs -- 用于 get_collector 的参数。
- 返回:
合并其他收集器的收集器。
- 返回类型:
- add_strategy(strategies: OnlineStrategy | List[OnlineStrategy])
向 OnlineManager 添加一些新策略。
- 参数:
strategy (Union[OnlineStrategy, List[OnlineStrategy]]) -- 在线策略列表
- prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)
在准备完最后一次例行数据(箱线图中的一个框)后,这意味着例行的结束,我们可以为下一个例行准备交易信号。
注意:给定一组预测,在这些预测结束时间之前的所有信号将被很好地准备。
即使最新的信号已经存在,最新的计算结果也会被覆盖。
备注
给定某个时间的预测,在此时间之前的所有信号将被很好地准备。
- 参数:
prepare_func (Callable, optional) -- 在收集后从字典中获取信号。默认为 AverageEnsemble(),通过 MergeCollector 收集的结果必须是 {xxx:pred}。
over_write (bool, optional) -- 如果为 True,新信号将覆盖。如果为 False,新信号将附加到信号的末尾。默认为 False。
- 返回:
信号。
- 返回类型:
pd.DataFrame
- get_signals() Series | DataFrame
获取准备好的在线信号。
- 返回:
pd.Series 仅用于每个日期时间的单个信号。pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。
- 返回类型:
Union[pd.Series, pd.DataFrame]
- simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame
从当前时间开始,此方法将模拟 OnlineManager 中的每个例行,直到结束时间。
考虑到并行训练,模型和信号可以在所有例行模拟后准备。
延迟训练方式可以是
DelayTrainer,而延迟准备信号的方式可以是delay_prepare。- 参数:
end_time -- 模拟将结束的时间
frequency -- 日历频率
task_kwargs (dict) -- 用于 prepare_tasks 的参数
model_kwargs (dict) -- 用于 prepare_online_models 的参数
signal_kwargs (dict) -- 用于 prepare_signals 的参数
- 返回:
pd.Series 仅用于每个日期时间的单个信号。pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。
- 返回类型:
Union[pd.Series, pd.DataFrame]
- delay_prepare(model_kwargs={}, signal_kwargs={})
如果有东西在等待准备,则准备所有模型和信号。
- 参数:
model_kwargs -- 用于 end_train 的参数
signal_kwargs -- 用于 prepare_signals 的参数
在线策略
OnlineStrategy 模块是在线服务的一个元素。
- class qlib.workflow.online.strategy.OnlineStrategy(name_id: str)
OnlineStrategy 与 在线管理器 一起工作,响应任务的生成、模型的更新和信号的准备。
- __init__(name_id: str)
初始化在线策略。该模块**必须**使用`Trainer <../reference/api.html#qlib.model.trainer.Trainer>`_来完成模型训练。
- 参数:
name_id (str) -- 一个唯一的名称或ID。
trainer (qlib.model.trainer.Trainer, optional) -- Trainer的实例。默认为None。
- prepare_tasks(cur_time, **kwargs) List[dict]
在例程结束后,检查是否需要根据cur_time(最新为None)准备和训练一些新任务。返回等待训练的新任务。
您可以通过OnlineTool.online_models找到最后的在线模型。
- prepare_online_models(trained_models, cur_time=None) List[object]
从训练模型中选择一些模型并将其设置为在线模型。这是将所有训练模型在线的典型实现,您可以重写它以实现更复杂的方法。如果您仍然需要它们,可以通过OnlineTool.online_models找到最后的在线模型。
注意:将所有在线模型重置为训练模型。如果没有训练模型,则不执行任何操作。
- 注意:
当前实现非常简单。这里有一个更复杂的情况,更接近实际场景。1. 在`test_start`前一天训练新模型(在时间戳`T`)2. 在`test_start`时切换模型(通常在时间戳`T + 1`)
- 参数:
models (list) -- 模型列表。
cur_time (pd.Dataframe) -- 来自OnlineManger的当前时间。最新为None。
- 返回:
在线模型列表。
- 返回类型:
List[object]
- first_tasks() List[dict]
首先生成一系列任务并返回它们。
- class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
该示例策略始终使用最新的滚动模型作为在线模型。
- __init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
初始化滚动策略。
假设:name_id的字符串、实验名称和训练者的实验名称是相同的。
- 参数:
name_id (str) -- 一个唯一的名称或ID。也将是实验的名称。
task_template (Union[dict, List[dict]]) -- 一个任务模板的列表或单个模板,将用于通过 rolling_gen 生成多个任务。
rolling_gen (RollingGen) -- 一个 RollingGen 的实例
- get_collector(process_list=[<qlib.model.ens.group.RollingGroup object>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)
获取 Collector 的实例以收集结果。返回的收集器必须区分不同模型的结果。
假设:模型可以根据模型名称和滚动测试段进行区分。如果您不想使用此假设,请实现您的方法或使用其他 rec_key_func。
- 参数:
rec_key_func (Callable) -- 获取记录器键的函数。如果为None,则使用记录器ID。
rec_filter_func (Callable, optional) -- 通过返回True或False来过滤记录器。默认为None。
artifacts_key (List[str], optional) -- 您想要获取的工件键。如果为None,则获取所有工件。
- first_tasks() List[dict]
使用 rolling_gen 根据任务模板生成不同的任务。
- 返回:
一组任务
- 返回类型:
List[dict]
- prepare_tasks(cur_time) List[dict]
根据 cur_time 准备新任务(None 表示最新)。
您可以通过 OnlineToolR.online_models 找到最后的在线模型。
- 返回:
一个新任务的列表。
- 返回类型:
List[dict]
在线工具
OnlineTool 是一个模块,用于设置和取消一系列 在线 模型。在线 模型是在某些时间点的一些决定性模型,可以随着时间的变化而改变。这使我们能够使用高效的子模型来适应市场风格的变化。
- class qlib.workflow.online.utils.OnlineTool
OnlineTool 将在包含模型记录器的实验中管理 在线 模型。
- __init__()
初始化 OnlineTool。
- set_online_tag(tag, recorder: list | object)
将 tag 设置为模型,以标记是否在线。
- 参数:
tag (str) -- 在 ONLINE_TAG、OFFLINE_TAG 中的标签
recorder (Union[list,object]) -- 模型的记录器
- get_online_tag(recorder: object) str
给定一个模型记录器并返回其在线标签。
- 参数:
recorder (Object) -- 模型的记录器
- 返回:
在线标签
- 返回类型:
str
- reset_online_tag(recorder: list | object)
将所有模型离线并将记录器设置为 '在线'。
- 参数:
recorder (Union[list,object]) -- 您想重置为 '在线' 的记录器。
- online_models() list
获取当前的 在线 模型
- 返回:
一个 在线 模型的列表。
- 返回类型:
list
- update_online_pred(to_date=None)
将 online 模型的预测更新到当前日期。
- 参数:
to_date (pd.Timestamp) -- 在此日期之前的预测将被更新。None 表示更新到最新。
- class qlib.workflow.online.utils.OnlineToolR(default_exp_name: str | None = None)
基于 (R)ecorder 的 OnlineTool 实现。
- __init__(default_exp_name: str | None = None)
初始化 OnlineToolR。
- 参数:
default_exp_name (str) -- 默认实验名称。
- set_online_tag(tag, recorder: Recorder | List)
将 tag 设置为模型的记录器,以标记是否为在线。
- 参数:
tag (str) -- 在 ONLINE_TAG、NEXT_ONLINE_TAG、OFFLINE_TAG 中的标签
recorder (Union[Recorder, List]) -- Recorder 的列表或 Recorder 的实例
- get_online_tag(recorder: Recorder) str
给定一个模型记录器并返回其在线标签。
- 参数:
recorder (Recorder) -- 一个记录器的实例
- 返回:
在线标签
- 返回类型:
str
- reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)
将所有模型离线并将记录器设置为 '在线'。
- 参数:
recorder (Union[Recorder, List]) -- 您想重置为 '在线' 的记录器。
exp_name (str) -- 实验名称。如果为 None,则使用 default_exp_name。
- online_models(exp_name: str | None = None) list
获取当前的 在线 模型
- 参数:
exp_name (str) -- 实验名称。如果为 None,则使用 default_exp_name。
- 返回:
一个 在线 模型的列表。
- 返回类型:
list
- update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)
将在线模型的预测更新到当前日期。
- 参数:
to_date (pd.Timestamp) -- 在此日期之前的预测将被更新。None 表示更新到日历中的最新时间。
exp_name (str) -- 实验名称。如果为 None,则使用 default_exp_name。
记录更新器
Updater 是一个模块,用于在股票数据更新时更新预测等工件。
- class qlib.workflow.online.update.RMDLoader(rec: Recorder)
记录器模型数据集加载器
- get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH
加载、配置和设置数据集。
该数据集用于推断。
- 参数:
start_time -- 基础数据的开始时间
end_time -- 基础数据的结束时间
segments -- 字典,数据集的段配置。由于时间序列数据集 (TSDatasetH),测试段可能与开始时间和结束时间不同
unprepared_dataset -- 可选[DatasetH] 如果用户不想从记录器加载数据集,请指定用户的数据集
- 返回:
DatasetH的实例
- 返回类型:
- class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)
更新特定记录器
- abstract update(*args, **kwargs)
更新特定记录器的信息
- class qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
基于数据集的更新器
提供基于Qlib数据集更新数据的更新功能
假设
基于Qlib数据集
要更新的数据是一个多层索引的pd.DataFrame。例如标签,预测。
LABEL0 datetime instrument 2021-05-10 SH600000 0.006965 SH600004 0.003407 ... ... 2021-05-28 SZ300498 0.015748 SZ300676 -0.001321
- __init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
初始化PredUpdater。
以下情况下的预期行为:
如果`to_date`大于日历中的最大日期,则数据将更新为最新日期
如果在`from_date`之前或`to_date`之后有数据,则仅受`from_date`和`to_date`之间的数据影响。
- 参数:
record -- Recorder
to_date -- 如果to_date为None,则更新预测到`to_date`:数据将更新为最新日期。
from_date -- 更新将从`from_date`开始,如果from_date为None:更新将在历史数据中最新数据后的下一个时间点发生
hist_ref -- int 有时,数据集会有历史依赖。将问题留给用户设置历史依赖的长度。如果用户没有指定此参数,Updater将尝试加载数据集以自动确定hist_ref .. 注意:: start_time不包含在`hist_ref`中;因此在大多数情况下,hist_ref`将是`step_len - 1
loader_cls -- 输入类以加载模型和数据集
- prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH
加载数据集 - 如果指定了unprepared_dataset,则直接准备数据集 - 否则,
将此功能分开将更容易重用数据集
- 返回:
DatasetH的实例
- 返回类型:
- class qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
在记录器中更新预测
- class qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)
在记录器中更新标签
假设 - 标签是从record_temp.SignalRecord生成的。
- __init__(record: Recorder, to_date=None, **kwargs)
初始化PredUpdater。
以下情况下的预期行为:
如果`to_date`大于日历中的最大日期,则数据将更新为最新日期
如果在`from_date`之前或`to_date`之后有数据,则仅受`from_date`和`to_date`之间的数据影响。
- 参数:
record -- Recorder
to_date -- 如果to_date为None,则更新预测到`to_date`:数据将更新为最新日期。
from_date -- 更新将从`from_date`开始,如果from_date为None:更新将在历史数据中最新数据后的下一个时间点发生
hist_ref -- int 有时,数据集会有历史依赖。将问题留给用户设置历史依赖的长度。如果用户没有指定此参数,Updater将尝试加载数据集以自动确定hist_ref .. 注意:: start_time不包含在`hist_ref`中;因此在大多数情况下,hist_ref`将是`step_len - 1
loader_cls -- 输入类以加载模型和数据集
工具
可序列化
- class qlib.utils.serial.Serializable
可序列化将改变pickle的行为。
用于判断在转储时属性是否会被保留或丢弃的规则。优先级较高的规则在顶部 - 在配置属性列表中 -> 始终丢弃 - 在包含属性列表中 -> 始终保留 - 在排除属性列表中 -> 始终丢弃 - 名称不以`_`开头 -> 保留 - 名称以`_`开头 -> 如果`dump_all`为真则保留,否则丢弃
它提供了一种语法糖,用于区分用户不想要的属性。 - 例如,一个可学习的数据处理器只想在转储到磁盘时保存参数而不保存数据
- __init__()
- property dump_all
对象是否会转储所有对象
- config(recursive=False, **kwargs)
配置可序列化对象
- 参数:
keys (kwargs may include following) -- dump_all : bool 对象是否会转储所有对象 exclude : list 哪些属性将不会被转储 include : list 哪些属性将被转储
recursive (bool) -- 配置是否会递归
- to_pickle(path: Path | str, **kwargs)
将自身转储到一个pickle文件。
path (Union[Path, str]): 要转储的路径
kwargs可以包括以下键
- dump_allbool
对象是否会转储所有对象
- excludelist
哪些属性将不会被转储
- includelist
将转储哪个属性
- classmethod load(filepath)
从文件路径加载可序列化类。
- 参数:
filepath (str) -- 文件的路径
- 抛出:
TypeError -- 被pickle的文件必须是`type(cls)`
- 返回:
类型为`type(cls)`的实例
- 返回类型:
type(cls)
- classmethod get_backend()
返回可序列化类的真实后端。pickle_backend值可以是"pickle"或"dill"。
- 返回:
基于pickle_backend的pickle或dill模块
- 返回类型:
module
- static general_dump(obj, path: Path | str)
对象的一般转储方法
- 参数:
obj (object) -- 要转储的对象
path (Union[Path, str]) -- 数据将被转储的目标路径
RL
基础组件
- class qlib.rl.Interpreter
解释器是模拟器生成的状态与RL策略所需状态之间的媒介。解释器是双向的:
从模拟器状态到策略状态(即观察),见:class:StateInterpreter。
从策略动作到模拟器接受的动作,见:class:ActionInterpreter。
继承这两个子类之一以定义您自己的解释器。这个超类仅用于isinstance检查。
建议解释器是无状态的,这意味着在解释器中使用``self.xxx``存储临时信息是反模式。未来,我们可能支持通过调用``self.env.register_state()``来注册一些与解释器相关的状态,但这并不计划在第一次迭代中实现。
- class qlib.rl.StateInterpreter
状态解释器,将qlib执行器的执行结果解释为rl环境状态
- validate(obs: ObsType) None
验证观察是否属于预定义的观察空间。
- interpret(simulator_state: StateType) ObsType
解释模拟器的状态。
- 参数:
simulator_state -- 通过``simulator.get_state()``获取。
- 返回类型:
策略所需的状态。应符合``observation_space``中定义的状态空间。
- class qlib.rl.ActionInterpreter
动作解释器,将rl代理动作解释为qlib订单
- validate(action: PolicyActType) None
验证动作是否属于预定义的动作空间。
- interpret(simulator_state: StateType, action: PolicyActType) ActType
将策略动作转换为模拟器动作。
- 参数:
simulator_state -- 通过``simulator.get_state()``获取。
action -- 策略给出的原始动作。
- 返回类型:
The action needed by simulator,
- class qlib.rl.Reward
奖励计算组件,接受一个参数:模拟器的状态。返回一个实数:奖励。
子类应实现``reward(simulator_state)``以实现自己的奖励计算方案。
- reward(simulator_state: SimulatorState) float
为您的奖励实现此方法。
- class qlib.rl.RewardCombination(rewards: Dict[str, Tuple[Reward, float]])
多个奖励的组合。
- reward(simulator_state: Any) float
为您的奖励实现此方法。
- class qlib.rl.Simulator(initial: InitialStateType, **kwargs: Any)
模拟器通过``__init__``重置,并通过``step(action)``进行转换。
为了使数据流清晰,我们对模拟器做以下限制:
修改模拟器内部状态的唯一方法是使用``step(action)``。
外部模块可以通过``simulator.get_state()``*读取*模拟器的状态,并通过调用``simulator.done()``检查模拟器是否处于结束状态。
模拟器被定义为有三种类型的边界:
不同的模拟器可能共享相同的 StateType。例如,当它们处理相同的任务但使用不同的模拟实现时。使用相同的类型,它们可以安全地共享 MDP 中的其他组件。
模拟器是短暂的。模拟器的生命周期从初始状态开始,并以轨迹结束。换句话说,当轨迹结束时,模拟器被回收。如果模拟器想要在之间共享上下文(例如,为了加速),可以通过访问环境包装器的弱引用来实现。
- env
env-wrapper 的引用,这在某些边缘情况下可能会有用。建议模拟器不要使用此引用,因为它容易引发错误。
- 类型:
Optional[EnvWrapper]
- __init__(initial: InitialStateType, **kwargs: Any) None
- step(action: ActType) None
接收 ActType 的一个动作。
模拟器应更新其内部状态,并返回 None。可以通过
simulator.get_state()检索更新后的状态。
- done() bool
检查模拟器是否处于 "done" 状态。当模拟器处于 "done" 状态时,它不应再接收任何
step请求。由于模拟器是短暂的,要重置模拟器,旧的模拟器应被销毁,并可以创建新的模拟器。
策略
- class qlib.rl.strategy.SingleOrderStrategy(order: Order, trade_range: TradeRange | None = None)
用于生成仅有一个订单的交易决策的策略。
- __init__(order: Order, trade_range: TradeRange | None = None) None
- 参数:
outer_trade_decision (BaseTradeDecision, optional) -- 该策略依赖的外部策略的交易决策,并将在 [start_time, end_time] 之间进行交易,默认值为 None - 如果该策略用于拆分交易决策,则将使用它 - 如果该策略用于投资组合管理,则可以忽略。
level_infra (LevelInfrastructure, optional) -- 用于回测的共享基础设施级别,包括交易日历。
common_infra (CommonInfrastructure, optional) -- 用于回测的公共基础设施,包括交易账户、交易所等。
trade_exchange (Exchange) -- 提供市场信息的交易所,用于处理订单和生成报告 - 如果 trade_exchange 为 None,则 self.trade_exchange 将设置为 common_infra - 允许在不同执行中使用不同的 trade_exchanges。 - 例如: - 在日常执行中,日常交易所和分钟交易所均可用,但推荐使用日常交易所,因为其运行速度更快。 - 在分钟执行中,日常交易所不可用,仅推荐使用分钟交易所。
- generate_trade_decision(execute_result: list | None = None) TradeDecisionWO
在每个交易条生成交易决策
- 参数:
execute_result (List[object], optional) -- 交易决策的执行结果,默认值为 None - 当第一次调用 generate_trade_decision 时,execute_result 可能为 None