DAOS_TSE(TaskSchedulerEngine)任务调度引擎流程及源码分析

简介

TSE 是一个通用库,用于创建具有函数回调的通用任务,可选地添加这些任务之间的依赖关系,并将它们安排在一个引擎中,该引擎按照插入它们的依赖关系图确定的顺序执行这些任务。任务依赖图是调度程序的组成部分,允许用户创建多个任务并以非阻塞方式进行。

TSE 不是 DAOS 特定的,但曾经是 DAOS 核心的一部分,后来作为独立 API 被提取到 common src 中。API 是通用的,允许在没有任何 DAOS 特定功能的引擎中创建任务。DAOS 库确实提供了一个构建在 TSE 之上的任务 API。有关这方面的更多信息,请参见此处(https://github.com/ssbandjl/daos/blob/master/src/client/api/README.md)。此外,DAOS 在内部使用 TSE 来跟踪和处理与 API 事件关联的所有 API 任务,并且在某些情况下,调度与单个 API 任务相对应的多个飞行中的“子”任务,并添加对该任务的依赖以进行跟踪所有那些飞行中的“子”任务。DAOS 库中的 Array API 和具有多个副本的对象更新就是一个例子。

调度程序 API

调度程序 API 允许用户创建通用调度程序并向其添加任务。在创建调度器时,用户可以注册一个完成回调,以便在调度器完成时调用。

添加到调度程序的任务不会自行进行。必须在调度程序上显式调用进度函数 (daos_sched_progress) 才能在引擎中的任务上取得进展。用户可以在他们的程序中偶尔调用这个进度函数,或者可以派生一个重复调用进度函数的线程。

任务 API

任务 API 允许创建具有通用主体功能的任务并将它们添加到调度程序。一旦在调度程序中创建了一个任务,如果没有用户对任务调度函数的显式调用,它就不会被实际调度运行,除非它是任务依赖关系图的一部分,在这种情况下,显式调度调用只需要图中的第一个任务。创建任务后,用户可以为任务注册任意数量的依赖项,这些依赖项需要在计划运行之前完成。此外,用户将能够在任务上注册准备和完成回调:

  • 准备回调在任务准备好运行但尚未执行时执行,这意味着创建任务的依赖关系已完成并且调度程序已准备好调度任务。当要调度的任务需要在创建任务时不可用但在任务的依赖关系完成后可用的信息时,这很有用;例如为任务主体功能设置一些输入参数。
  • 完成回调在任务完成执行时执行,并且当发生这种情况时用户需要做更多的工作或处理。这很有用的一个示例是设置在 TSE 之上构建的更高级别的事件或请求的完成,或者跟踪依赖项列表中多个任务的错误状态。

任务 API 上的其他几个功能可以支持:

  • 在任务本身上设置一些可以查询的私有数据。
  • 在没有数据复制的情况下向/从任务堆栈空间推送和弹出数据
  • 通用任务列表

有关该功能的更多详细信息,请参见此处DAOS 代码中的 TSE 头文件(https://github.com/ssbandjl/daos/blob/master/src/include/daos/tse.h)

流程图

daos_tse

源码分析

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
创建任务:
dc_task_create
  sched = daos_ev2sched(ev) 事件转调度器
  tse_task_create(func, sched, NULL, &task) -> 初始化 tse_task 该任务(task)会被添加到调度器(sched)任务列表中,稍后被调度,如果提供了依赖任务,则该任务将被添加到依赖任务的dep列表中,一旦依赖任务完成,则添加该任务到调度程序列表(先完成依赖任务, 然后添加主任务)
  task_ptr2args 指针转参数
    D_INIT_LIST_HEAD(&dtp->dtp_list) -> 初始化任务链表, 调度时插入到调度器的初始化队列(dsp_init_list) -> d_list_add_tail(&dtp->dtp_list, &dsp->dsp_init_list)
    D_INIT_LIST_HEAD(&dtp->dtp_task_list)
  	D_INIT_LIST_HEAD(&dtp->dtp_dep_list)
	D_INIT_LIST_HEAD(&dtp->dtp_comp_cb_list)
	D_INIT_LIST_HEAD(&dtp->dtp_prep_cb_list)
    ...
    dtp->dtp_func	  = task_func
    ...
    tse_task_buf_embedded 获取任务的嵌入式缓冲区,用户可以使用它来携带功能参数。 任务的嵌入式缓冲区有大小限制,如果 buf_size 大于限制,此函数将返回 NULL 用户应通过 tse_task_set_priv() 使用私有数据来传递大参数。 MSC - 我将其更改为只是一个缓冲区,而不是像以前那样, 不断给一个额外的指针指向大的预涂层缓冲区。 以前的方式不适用于公共用途。我们现在应该使它更简单,更通用,如下面的评论
      tse_task_buf_size
        return (size + 7) & ~0x7
  args->ta_magic = DAOS_TASK_MAGIC
  tse_task_register_comp_cb(task, task_comp_event, NULL, 0) -> dtc->dtc_cb = cb task_comp_event 注册完成回调
    register_cb(task, true, comp_cb, arg, arg_size)
      d_list_add(&dtc->dtc_list, &dtp->dtp_comp_cb_list) 插入到列表开始处
  failed -> tse_task_decref(task) -> 如果失败则判断引用计数
    

dfuse用户态文件系统为例    
dfuse_do_work
  dfuse_do_work
    dfuse_cb_read dfuse_cb_read
    dfs_read
    dfs_read_int dfs_read_int
      dc_task_create(dc_array_read, NULL, ev, &task) 创建任务
      tse_task_register_cbs(task, NULL, NULL, 0, read_cb, NULL, 0)
      dc_task_schedule(task, true) -> 立即调度任务
        tse_task_schedule
          tse_task_schedule_with_delay
    dc_array_read DAOS_OPC_ARRAY_READ -> dc_array_io   task1
      daos_task_create(DAOS_OPC_ARRAY_GET_SIZE, tse_task2sched(task)
        dc_task_create(dc_funcs[opc].task_func, sched, NULL, &task)
        dc_task_depend(task, num_deps, dep_tasks)
      
      ...
    tse_sched_progress  task2
    tse_sched_run
      processed += tse_sched_process_init(dsp) -> 处理调度程序的初始化列表和休眠列表。 这首先将所有现在应该醒来的任务从睡眠列表移动到初始化列表的尾部,然后执行调度程序初始化列表中没有依赖项的所有任务的所有主体函数, 调度器主函数, 在队列间移动任务
      processed += tse_sched_process_complete(dsp)
        d_list_splice_init(&dsp->dsp_complete_list, &comp_list) -> 连接两个列表并重新初始化空列表。 列表的内容添加到头部的开头。 返回时列表为初始化为空(可理解为从一个队列移动到新的队列)
        d_list_for_each_entry_safe(dtp, tmp, &comp_list, dtp_list) -> 遍历新的完成队列
        d_list_del_init(&dtp->dtp_list)
        tse_task_post_process(task) -> 检查完成列表中的任务、依赖任务状态检查、调度状态更新等。此后任务将移至完成列表
          while (!d_list_empty(&dtp->dtp_dep_list))  -> 任务依赖队列不为空(还有依赖任务)
            ...
      completed = tse_sched_check_complete(sched) -> 检查调度器是否是完成状态, : 初始队列和睡眠队列都为空, 且无飞行的任务
    tse_task_prep_callback
    check_short_read_cb
    tse_task_complete    
    
                       
查找
df_ll_lookup
cb_lookup
lookupx
lookup_rel_int
array_get_size
  dc_task_create(dc_array_get_size, NULL, ev, &task)
  dc_task_schedule
    daos_event_priv_wait 如果关联事件是私有事件,这个函数会等到完成,否则它立即返回
      ev_progress_cb
        tse_sched_progress
          dtp_func -> dc_obj_query_key
                       
                       
任务完成回调参考堆栈:
#0  dc_rw_cb (task=0x7f9198004530, arg=<optimized out>) at src/object/cli_shard.c:952
...
#0  tse_task_complete_callback (task=task@entry=0x7f68ec004d20) at src/common/tse.c:515
...
#0  tse_task_complete (task=0x7f68ec004d20, ret=0) at src/common/tse.c:858
#1  0x00007f693ce7cf48 in daos_rpc_cb (cb_info=<optimized out>) at src/client/api/rpc.c:24
#2  0x00007f693b13bd8a in crt_hg_req_send_cb (hg_cbinfo=<optimized out>) at src/cart/crt_hg.c:1364
#3  0x00007f693a2e34ce in hg_core_forward_cb (callback_info=<optimized out>) at /home/xb/project/stor/daos/main/daos/build/external/debug/mercury/src/mercury.c:990
#4  0x00007f693a2f3034 in hg_core_trigger_entry (hg_core_handle=0x5608ec45c780) at /home/xb/project/stor/daos/main/daos/build/external/debug/mercury/src/mercury_core.c:5406
#5  hg_core_trigger (context=0x5608ec2c9500, timeout_ms=timeout_ms@entry=0, max_count=max_count@entry=256, actual_count_p=actual_count_p@entry=0x7f68f3ffda54) at /home/xb/project/stor/daos/main/daos/build/external/debug/mercury/src/mercury_core.c:5267
#6  0x00007f693a2fbdbb in HG_Core_trigger (context=<optimized out>, timeout=timeout@entry=0, max_count=max_count@entry=256, actual_count_p=actual_count_p@entry=0x7f68f3ffda54)
    at /home/xb/project/stor/daos/main/daos/build/external/debug/mercury/src/mercury_core.c:6419
#7  0x00007f693a2e748e in HG_Trigger (context=context@entry=0x5608ec266b70, timeout=timeout@entry=0, max_count=max_count@entry=256, actual_count_p=actual_count_p@entry=0x7f68f3ffda54)
    at /home/xb/project/stor/daos/main/daos/build/external/debug/mercury/src/mercury.c:2197 -> ibv_poll_cq
#8  0x00007f693b1490e9 in crt_hg_progress (hg_ctx=hg_ctx@entry=0x5608ec253b18, timeout=timeout@entry=0) at src/cart/crt_hg.c:1540
#9  0x00007f693b0f9eea in crt_progress_cond (crt_ctx=0x5608ec253b00, timeout=timeout@entry=0, cond_cb=cond_cb@entry=0x7f693ce6732b <ev_progress_cb>, arg=arg@entry=0x7f68f3ffdb00) at src/cart/crt_context.c:1668
#10 0x00007f693ce712f3 in daos_event_priv_wait () at src/client/api/event.c:1276 ->  废弃 -> dc_task_new + dc_task_schedule, DAOS-6958 事件:出现进度错误时正确的事件处理 (#4911)crt_progress 有时会返回非超时的错误。 这些在同步 IO 调用中没有得到正确处理,导致私有事件对于后续 IO 处于不可用状态。此 PR 检查购物车中的错误,并在返回错误时重新初始化私有事件,以便可以重新使用它,而不是返回给用户
#11 0x00007f693ce7e93c in dc_task_schedule (task=<optimized out>, instant=instant@entry=true) at src/client/api/task.c:124  -> 执行任务主体函数: dc_obj_fetch_task -> 请求扇出 -> 走网络send接口, 发送完成后, 通过trigger调发送回调: daos_rpc_cb
# dc_obj_fetch_task_create -> dc_task_create(dc_obj_fetch_task, tse, ev, task)
#12 0x00007f693ce77e72 in daos_obj_fetch (oh=..., oh@entry=..., th=..., th@entry=..., flags=flags@entry=8, dkey=dkey@entry=0x7f68f3ffdc30, nr=nr@entry=1, iods=<optimized out>, sgls=0x7f68f3ffdc10, maps=0x0, ev=0x0) at src/client/api/object.c:170
#13 0x00007f693c7377b7 in fetch_entry (ver=<optimized out>, oh=..., th=..., th@entry=..., name=<optimized out>, name@entry=0x7f68ec0045c0 "write_test_file", len=len@entry=15, fetch_sym=fetch_sym@entry=false, exists=0x7f68f3ffde47, entry=0x7f68f3ffde60, 
    xnr=0, xnames=0x0, xvals=0x0, xsizes=0x0) at src/client/dfs/dfs.c:658
#14 0x00007f693c73d5a2 in entry_stat (dfs=dfs@entry=0x5608ec71f9e0, th=th@entry=..., oh=..., name=name@entry=0x7f68ec0045c0 "write_test_file", len=len@entry=15, obj=obj@entry=0x7f68ec004590, get_size=<optimized out>, stbuf=<optimized out>, 
    obj_hlc=<optimized out>) at src/client/dfs/dfs.c:1000
#15 0x00007f693c75440e in dfs_osetattr (dfs=0x5608ec71f9e0, obj=0x7f68ec004590, stbuf=stbuf@entry=0x7f68f3ffe2f0, flags=flags@entry=4) at src/client/dfs/dfs.c:5344
#16 0x00005608ea254967 in dfuse_cb_setattr (req=0x7f68ec0042f0, ie=0x7f68ec003830, attr=0x7f68f3ffe2f0, to_set=0) at src/client/dfuse/ops/setattr.c:108
#17 0x00005608ea221ba7 in df_ll_setattr (req=0x7f68ec0042f0, ino=<optimized out>, attr=0x7f68f3ffe2f0, to_set=1056, fi=<optimized out>) at src/client/dfuse/dfuse_fuseops.c:245
#18 0x00007f693c09ef6a in do_setattr () from /lib64/libfuse3.so.3
#19 0x00007f693c0a0dba in fuse_session_process_buf_int () from /lib64/libfuse3.so.3
#20 0x00005608ea22599a in dfuse_do_work (arg=0x5608ec70e470) at src/client/dfuse/dfuse_thread.c:60
#21 0x00007f693d2361ca in start_thread () from /lib64/libpthread.so.0
#22 0x00007f693ba9de73 in clone () from /lib64/libc.so.6
        

任务调度执行堆栈:
#0  tse_sched_run (sched=sched@entry=0x7f91aedd6ba0 <daos_sched_g>) at src/common/tse.c:755
#1  0x00007f91ae789065 in tse_sched_progress (sched=0x7f91aedd6ba0 <daos_sched_g>) at src/common/tse.c:791
#2  0x00007f91aea1033f in ev_progress_cb (arg=arg@entry=0x7f919fffe180) at src/client/api/event.c:516
#3  0x00007f91acca2eb2 in crt_progress_cond (crt_ctx=0x5602ff224b00, timeout=timeout@entry=0, cond_cb=cond_cb@entry=0x7f91aea1032b <ev_progress_cb>, arg=arg@entry=0x7f919fffe180) at src/cart/crt_context.c:1648
#4  0x00007f91aea1a2f3 in daos_event_priv_wait () at src/client/api/event.c:1276
#5  0x00007f91aea2793c in dc_task_schedule (task=<optimized out>, instant=instant@entry=true) at src/client/api/task.c:124
#6  0x00007f91aea209c1 in daos_obj_open (coh=..., oid=..., mode=mode@entry=2, oh=oh@entry=0x7f919fffe200, ev=ev@entry=0x0) at src/client/api/object.c:49
#7  0x00007f91ae2faeb5 in dfs_ostat (dfs=0x5602ff6f09e0, obj=0x5602ff6efb80, stbuf=stbuf@entry=0x7f919fffe250) at src/client/dfs/dfs.c:4928
#8  0x00005602fd14ed99 in dfuse_cb_getattr (req=0x7f9198000c10, ie=0x5602ff6ef960) at src/client/dfuse/ops/fgetattr.c:22
#9  0x00005602fd1429fc in df_ll_getattr (req=0x7f9198000c10, ino=<optimized out>, fi=<optimized out>) at src/client/dfuse/dfuse_fuseops.c:204
#10 0x00007f91adc48009 in do_getattr () from /lib64/libfuse3.so.3
#11 0x00007f91adc49dba in fuse_session_process_buf_int () from /lib64/libfuse3.so.3
#12 0x00005602fd14599a in dfuse_do_work (arg=0x5602ff6f0090) at src/client/dfuse/dfuse_thread.c:60
#13 0x00007f91aeddf1ca in start_thread () from /lib64/libpthread.so.0
#14 0x00007f91ad646e73 in clone () from /lib64/libc.so.6
        

总结

  • 核心对象是任务与调度器, 以及流程图中任务和调度器下的那几个队列
  • 为了实现延迟调用, 服务终止等业务, 在运行队列与休眠队列间移动任务
  • 为了实现主任务依赖多个子任务的业务, 用完成队列对依赖任务进行管理
  • 该TSE任务调度引擎可以独立于DAOS项目作为第三方组件适配和使用

参考

DAOS公共模块: https://github.com/ssbandjl/daos/tree/master/src/common

DAOS内部组件: https://github.com/ssbandjl/daos/blob/xb/docs_xb/DAOS%E5%86%85%E9%83%A8%E7%BB%93%E6%9E%84.md

晓兵

博客: https://logread.cn | https://blog.csdn.net/ssbandjl | https://cloud.tencent.com/developer/user/5060293/articles

weixin: ssbandjl

公众号: 云原生云

云原生云