任务管理

介绍

工作流 <../component/introduction.html> 部分介绍了如何以松耦合的方式运行研究工作流。但当您使用 qrun 时,它只能执行一个 任务。为了自动生成和执行不同的任务,任务管理 提供了一个完整的过程,包括 任务生成任务存储任务训练任务收集。通过此模块,用户可以在不同的时间段、不同的损失,甚至通过不同的模型自动运行他们的 任务。任务生成、模型训练和数据合并与收集的过程如以下图所示。

../_images/Task-Gen-Recorder-Collector.svg

整个过程可以用于 在线服务

整个过程的示例请见 here

任务生成

一个 taskModelDatasetRecord 或用户添加的任何内容组成。具体的任务模板可以在 Task Section 中查看。尽管任务模板是固定的,用户可以自定义他们的 TaskGen 以根据任务模板生成不同的 task

这是 TaskGen 的基类:

class qlib.workflow.task.gen.TaskGen

生成不同任务的基类

示例1:

输入:一个特定的任务模板和滚动步骤

输出:任务的滚动版本

示例2:

输入:一个特定的任务模板和损失列表

输出:一组具有不同损失的任务

abstract generate(task: dict) List[dict]

基于任务模板生成不同的任务

参数:

task (dict) -- 一个任务模板

返回:

一组任务

返回类型:

List[dict]

Qlib 提供了一个类 RollingGen,用于生成不同日期区间的数据集的 task 列表。这个类允许用户在一个实验中验证来自不同时间段的数据对模型的影响。更多信息请见 这里

任务存储

To achieve higher efficiency and the possibility of cluster operation, Task Manager will store all tasks in MongoDB. TaskManager can fetch undone tasks automatically and manage the lifecycle of a set of tasks with error handling. Users MUST finish the configuration of MongoDB when using this module.

用户需要在 初始化 时提供 MongoDB URL 和数据库名称,或者像这样声明。

from qlib.config import C
C["mongo"] = {
    "task_url" : "mongodb://localhost:27017/", # your MongoDB url
    "task_db_name" : "rolling_db" # database name
}
class qlib.workflow.task.manage.TaskManager(task_pool: str)

以下是任务管理器创建的任务的样子

{
    'def': pickle serialized task definition.  using pickle will make it easier
    'filter': json-like data. This is for filtering the tasks.
    'status': 'waiting' | 'running' | 'done'
    'res': pickle serialized task result,
}

任务管理器假设您只会更新您获取的任务。Mongo 的获取和更新将使日期更新安全。

此类可以作为命令行工具使用。以下是几个示例。您可以使用以下命令查看管理模块的帮助:python -m qlib.workflow.task.manage -h # 显示管理模块 CLI 的手册 python -m qlib.workflow.task.manage wait -h # 显示管理命令的手册

python -m qlib.workflow.task.manage -t <pool_name> wait
python -m qlib.workflow.task.manage -t <pool_name> task_stat

备注

假设:MongoDB 中的数据已编码,MongoDB 中的数据已解码

以下是四种状态:

STATUS_WAITING:等待训练

STATUS_RUNNING:训练中

STATUS_PART_DONE:完成某些步骤并等待下一步

STATUS_DONE:所有工作完成

__init__(task_pool: str)

初始化任务管理器,记得首先声明 MongoDB 的 URL 和数据库名称。一个 TaskManager 实例服务于特定的任务池。该模块的静态方法服务于整个 MongoDB。

参数:

task_pool (str) -- MongoDB 中的集合名称

static list() list

列出数据库的所有集合(task_pool)。

返回:

list

replace_task(task, new_task)

用一个新任务替换一个旧任务

参数:
  • task -- 旧任务

  • new_task -- 新任务

insert_task(task)

插入一个任务。

参数:

task -- 等待插入的任务

返回:

pymongo.results.InsertOneResult

insert_task_def(task_def)

将一个任务插入到task_pool

参数:

task_def (dict) -- 任务定义

返回类型:

pymongo.results.InsertOneResult

create_task(task_def_l, dry_run=False, print_nt=False) List[str]

如果task_def_l中的任务是新的,则将新任务插入到task_pool,并记录inserted_id。如果任务不是新的,则仅查询其_id。

参数:
  • task_def_l (list) -- 任务列表

  • dry_run (bool) -- 如果将这些新任务插入到任务池

  • print_nt (bool) -- 如果打印新任务

返回:

task_def_l的_id列表

返回类型:

List[str]

fetch_task(query={}, status='waiting') dict

使用查询获取任务。

参数:
  • query (dict, optional) -- 查询字典。默认为{}。

  • status (str, optional) -- [描述]。默认为STATUS_WAITING。

返回:

解码后的任务(集合中的文档)

返回类型:

dict

safe_fetch_task(query={}, status='waiting')

使用上下文管理器从task_pool中获取任务

参数:

query (dict) -- 查询的字典

返回:

字典

返回类型:

a task(document in collection) after decoding

query(query={}, decode=True)

在集合中查询任务。如果迭代生成器的时间过长,此函数可能会引发异常 pymongo.errors.CursorNotFound: cursor id not found

python -m qlib.workflow.task.manage -t <你的任务池> query '{"_id": "615498be837d0053acbc5d58"}'

参数:
  • query (dict) -- 查询的字典

  • decode (bool)

返回:

字典

返回类型:

a task(document in collection) after decoding

re_query(_id) dict

使用 _id 查询任务。

参数:

_id (str) -- 文档的 _id

返回:

解码后的任务(集合中的文档)

返回类型:

dict

commit_task_res(task, res, status='done')

将结果提交到 task['res']。

参数:
  • task ([type]) -- [描述]

  • res (object) -- 你想要保存的结果

  • status (str, optional) -- STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_DONE。

return_task(task, status='waiting')

将任务返回到状态。始终在错误处理时使用。

参数:
  • task ([type]) -- [描述]

  • status (str, optional) -- STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_WAITING。

remove(query={})

使用查询删除任务

参数:

query (dict) -- 查询的字典

task_stat(query={}) dict

计算每个状态中的任务数量。

参数:

query (dict, optional) -- 查询字典。默认为 {}。

返回:

字典

reset_waiting(query={})

将所有正在运行的任务重置为等待状态。当某些正在运行的任务意外退出时,可以使用。

参数:

query (dict, optional) -- 查询字典。默认为 {}。

prioritize(task, priority: int)

为任务设置优先级

参数:
  • task (dict) -- 从数据库中查询任务

  • priority (int) -- 目标优先级

wait(query={})

在多进程时,主进程可能无法从 TaskManager 获取任何内容,因为仍然有一些正在运行的任务。因此,主进程应该等待,直到所有任务都被其他进程或机器训练好。

参数:

query (dict, optional) -- 查询字典。默认为 {}。

有关 Task Manager 的更多信息可以在 这里 找到。

任务训练

在生成和存储这些 task 后,是时候运行处于 WAITING 状态的 taskQlib 提供了一种名为 run_task 的方法来运行任务池中的这些 task,然而,用户也可以自定义任务的执行方式。获取 task_func 的简单方法是直接使用 qlib.model.trainer.task_train。它将运行由 task 定义的整个工作流程,包括 ModelDatasetRecord

qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)

当任务池不为空(有等待任务)时,使用 task_func 从 task_pool 获取并运行任务

运行此方法后,以下是4种情况(before_status -> after_status):

STATUS_WAITING -> STATUS_DONE:使用 task["def"] 作为 task_func 参数,这意味着任务尚未开始

STATUS_WAITING -> STATUS_PART_DONE:使用 task["def"] 作为 task_func 参数

STATUS_PART_DONE -> STATUS_PART_DONE:使用 task["res"] 作为 task_func 参数,这意味着任务已开始但未完成

STATUS_PART_DONE -> STATUS_DONE:使用 task["res"] 作为 task_func 参数

参数:
  • task_func (Callable) -- def (task_def, **kwargs) -> <res which will be committed> 执行任务的函数

  • task_pool (str) -- 任务池的名称(MongoDB中的集合)

  • query (dict) -- 在获取任务时将使用此字典查询任务池

  • force_release (bool) -- 程序是否会强制释放资源

  • before_status (str:) -- 在 before_status 中的任务将被获取并训练。可以是 STATUS_WAITING 或 STATUS_PART_DONE。

  • after_status (str:) -- 训练后的任务将变为 after_status。可以是 STATUS_WAITING 或 STATUS_PART_DONE。

  • kwargs -- 用于 task_func 的参数

同时,Qlib 提供了一个名为 Trainer 的模块。

class qlib.model.trainer.Trainer

训练器可以训练一系列模型。有Trainer和DelayTrainer,可以通过完成真实训练的时间来区分。

__init__()
train(tasks: list, *args, **kwargs) list

给定一系列任务定义,开始训练,并返回模型。

对于Trainer,它在此方法中完成真实训练。对于DelayTrainer,它仅在此方法中做一些准备。

参数:

tasks -- 一组任务

返回:

一系列模型

返回类型:

list

end_train(models: list, *args, **kwargs) list

给定一系列模型,如果需要,在训练结束时完成某些操作。模型可以是记录器、txt文件、数据库等。

对于Trainer,它在此方法中做一些收尾工作。对于DelayTrainer,它在此方法中完成真实训练。

参数:

models -- 一系列模型

返回:

一系列模型

返回类型:

list

is_delay() bool

如果Trainer将延迟完成`end_train`。

返回:

如果是DelayTrainer

返回类型:

bool

has_worker() bool

某些训练器有后端工作者以支持并行训练。此方法可以判断工作者是否启用。

返回:

如果工作者已启用

返回类型:

bool

worker()

启动工作者

抛出:

NotImplementedError: -- 如果不支持该工作者

Trainer 将训练一系列任务并返回一系列模型记录器。Qlib 提供两种类型的 Trainer,TrainerR 是最简单的方式,而 TrainerRM 基于 TaskManager 自动帮助管理任务的生命周期。如果您不想使用 Task Manager 来管理任务,那么使用 TrainerR 来训练由 TaskGen 生成的任务列表就足够了。有关不同 Trainer 的详细信息请见 这里

任务收集

在收集模型训练结果之前,您需要使用 qlib.init 指定 mlruns 的路径。

要在训练后收集 task 的结果,Qlib 提供了 CollectorGroupEnsemble,以可读、可扩展和松耦合的方式收集结果。

Collector 可以从任何地方收集对象并处理它们,例如合并、分组、平均等。它有两个步骤,包括 ``collect``(在字典中收集任何内容)和 ``process_collect``(处理收集的字典)。

Group 也有两个步骤,包括 group``(可以根据 `group_func` 对一组对象进行分组并将其转换为字典)和 ``reduce``(可以根据某些规则将字典变为一个集合)。例如:{(A,B,C1): object, (A,B,C2): object} ---``group---> {(A,B): {C1: object, C2: object}} ---reduce---> {(A,B): object}

Ensemble 可以合并集合中的对象。例如:{C1: object, C2: object} ---Ensemble---> object。您可以在 Collector 的 process_list 中设置所需的集合。常见的集合包括 AverageEnsembleRollingEnsemble。平均集合用于在同一时间段内集合不同模型的结果。Rollingensemble 用于在同一时间段内集合不同模型的结果。

因此,Collector 的第二步对应于 Group。而 Group 的第二步对应于 Ensemble

有关更多信息,请参见 CollectorGroupEnsemble,或查看 示例