1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
创建任务:
dc_task_create
sched = daos_ev2sched(ev) 事件转调度器
tse_task_create(func, sched, NULL, &task) -> 初始化 tse_task。 该任务(task)会被添加到调度器(sched)任务列表中,稍后被调度,如果提供了依赖任务,则该任务将被添加到依赖任务的dep列表中,一旦依赖任务完成,则添加该任务到调度程序列表(先完成依赖任务, 然后添加主任务)
task_ptr2args 指针转参数
D_INIT_LIST_HEAD(&dtp->dtp_list) -> 初始化任务链表, 调度时插入到调度器的初始化队列(dsp_init_list) -> d_list_add_tail(&dtp->dtp_list, &dsp->dsp_init_list)
D_INIT_LIST_HEAD(&dtp->dtp_task_list)
D_INIT_LIST_HEAD(&dtp->dtp_dep_list)
D_INIT_LIST_HEAD(&dtp->dtp_comp_cb_list)
D_INIT_LIST_HEAD(&dtp->dtp_prep_cb_list)
...
dtp->dtp_func = task_func
...
tse_task_buf_embedded 获取任务的嵌入式缓冲区,用户可以使用它来携带功能参数。 任务的嵌入式缓冲区有大小限制,如果 buf_size 大于限制,此函数将返回 NULL。 用户应通过 tse_task_set_priv() 使用私有数据来传递大参数。 MSC - 我将其更改为只是一个缓冲区,而不是像以前那样, 不断给一个额外的指针指向大的预涂层缓冲区。 以前的方式不适用于公共用途。我们现在应该使它更简单,更通用,如下面的评论
tse_task_buf_size
return (size + 7) & ~0x7
args->ta_magic = DAOS_TASK_MAGIC
tse_task_register_comp_cb(task, task_comp_event, NULL, 0) -> dtc->dtc_cb = cb task_comp_event 注册完成回调
register_cb(task, true, comp_cb, arg, arg_size)
d_list_add(&dtc->dtc_list, &dtp->dtp_comp_cb_list) 插入到列表开始处
failed -> tse_task_decref(task) -> 如果失败则判断引用计数
以dfuse用户态文件系统为例
dfuse_do_work
dfuse_do_work
dfuse_cb_read dfuse_cb_read
dfs_read
dfs_read_int dfs_read_int
dc_task_create(dc_array_read, NULL, ev, &task) 创建任务
tse_task_register_cbs(task, NULL, NULL, 0, read_cb, NULL, 0)
dc_task_schedule(task, true) -> 立即调度任务
tse_task_schedule
tse_task_schedule_with_delay
dc_array_read DAOS_OPC_ARRAY_READ -> dc_array_io task1
daos_task_create(DAOS_OPC_ARRAY_GET_SIZE, tse_task2sched(task)
dc_task_create(dc_funcs[opc].task_func, sched, NULL, &task)
dc_task_depend(task, num_deps, dep_tasks)
...
tse_sched_progress task2
tse_sched_run
processed += tse_sched_process_init(dsp) -> 处理调度程序的初始化列表和休眠列表。 这首先将所有现在应该醒来的任务从睡眠列表移动到初始化列表的尾部,然后执行调度程序初始化列表中没有依赖项的所有任务的所有主体函数, 调度器主函数, 在队列间移动任务
processed += tse_sched_process_complete(dsp)
d_list_splice_init(&dsp->dsp_complete_list, &comp_list) -> 连接两个列表并重新初始化空列表。 列表的内容添加到头部的开头。 返回时列表为初始化为空(可理解为从一个队列移动到新的队列)
d_list_for_each_entry_safe(dtp, tmp, &comp_list, dtp_list) -> 遍历新的完成队列
d_list_del_init(&dtp->dtp_list)
tse_task_post_process(task) -> 检查完成列表中的任务、依赖任务状态检查、调度状态更新等。此后任务将移至完成列表
while (!d_list_empty(&dtp->dtp_dep_list)) -> 任务依赖队列不为空(还有依赖任务)
...
completed = tse_sched_check_complete(sched) -> 检查调度器是否是完成状态, 即: 初始队列和睡眠队列都为空, 且无飞行的任务
tse_task_prep_callback
check_short_read_cb
tse_task_complete
查找
df_ll_lookup
cb_lookup
lookupx
lookup_rel_int
array_get_size
dc_task_create(dc_array_get_size, NULL, ev, &task)
dc_task_schedule
daos_event_priv_wait 如果关联事件是私有事件,这个函数会等到完成,否则它立即返回
ev_progress_cb
tse_sched_progress
dtp_func -> dc_obj_query_key
任务完成回调参考堆栈:
#0 dc_rw_cb (task=0x7f9198004530, arg=<optimized out>) at src/object/cli_shard.c:952
...
#0 tse_task_complete_callback (task=task@entry=0x7f68ec004d20) at src/common/tse.c:515
...
#0 tse_task_complete (task=0x7f68ec004d20, ret=0) at src/common/tse.c:858
#1 0x00007f693ce7cf48 in daos_rpc_cb (cb_info=<optimized out>) at src/client/api/rpc.c:24
#2 0x00007f693b13bd8a in crt_hg_req_send_cb (hg_cbinfo=<optimized out>) at src/cart/crt_hg.c:1364
#3 0x00007f693a2e34ce in hg_core_forward_cb (callback_info=<optimized out>) at /home/xb/project/stor/daos/main/daos/build/external/debug/mercury/src/mercury.c:990
#4 0x00007f693a2f3034 in hg_core_trigger_entry (hg_core_handle=0x5608ec45c780) at /home/xb/project/stor/daos/main/daos/build/external/debug/mercury/src/mercury_core.c:5406
#5 hg_core_trigger (context=0x5608ec2c9500, timeout_ms=timeout_ms@entry=0, max_count=max_count@entry=256, actual_count_p=actual_count_p@entry=0x7f68f3ffda54) at /home/xb/project/stor/daos/main/daos/build/external/debug/mercury/src/mercury_core.c:5267
#6 0x00007f693a2fbdbb in HG_Core_trigger (context=<optimized out>, timeout=timeout@entry=0, max_count=max_count@entry=256, actual_count_p=actual_count_p@entry=0x7f68f3ffda54)
at /home/xb/project/stor/daos/main/daos/build/external/debug/mercury/src/mercury_core.c:6419
#7 0x00007f693a2e748e in HG_Trigger (context=context@entry=0x5608ec266b70, timeout=timeout@entry=0, max_count=max_count@entry=256, actual_count_p=actual_count_p@entry=0x7f68f3ffda54)
at /home/xb/project/stor/daos/main/daos/build/external/debug/mercury/src/mercury.c:2197 -> ibv_poll_cq
#8 0x00007f693b1490e9 in crt_hg_progress (hg_ctx=hg_ctx@entry=0x5608ec253b18, timeout=timeout@entry=0) at src/cart/crt_hg.c:1540
#9 0x00007f693b0f9eea in crt_progress_cond (crt_ctx=0x5608ec253b00, timeout=timeout@entry=0, cond_cb=cond_cb@entry=0x7f693ce6732b <ev_progress_cb>, arg=arg@entry=0x7f68f3ffdb00) at src/cart/crt_context.c:1668
#10 0x00007f693ce712f3 in daos_event_priv_wait () at src/client/api/event.c:1276 -> 废弃 -> dc_task_new + dc_task_schedule, DAOS-6958 事件:出现进度错误时正确的事件处理 (#4911)crt_progress 有时会返回非超时的错误。 这些在同步 IO 调用中没有得到正确处理,导致私有事件对于后续 IO 处于不可用状态。此 PR 检查购物车中的错误,并在返回错误时重新初始化私有事件,以便可以重新使用它,而不是返回给用户
#11 0x00007f693ce7e93c in dc_task_schedule (task=<optimized out>, instant=instant@entry=true) at src/client/api/task.c:124 -> 执行任务主体函数: dc_obj_fetch_task -> 请求扇出 -> 走网络send接口, 发送完成后, 通过trigger调发送回调: daos_rpc_cb
# dc_obj_fetch_task_create -> dc_task_create(dc_obj_fetch_task, tse, ev, task)
#12 0x00007f693ce77e72 in daos_obj_fetch (oh=..., oh@entry=..., th=..., th@entry=..., flags=flags@entry=8, dkey=dkey@entry=0x7f68f3ffdc30, nr=nr@entry=1, iods=<optimized out>, sgls=0x7f68f3ffdc10, maps=0x0, ev=0x0) at src/client/api/object.c:170
#13 0x00007f693c7377b7 in fetch_entry (ver=<optimized out>, oh=..., th=..., th@entry=..., name=<optimized out>, name@entry=0x7f68ec0045c0 "write_test_file", len=len@entry=15, fetch_sym=fetch_sym@entry=false, exists=0x7f68f3ffde47, entry=0x7f68f3ffde60,
xnr=0, xnames=0x0, xvals=0x0, xsizes=0x0) at src/client/dfs/dfs.c:658
#14 0x00007f693c73d5a2 in entry_stat (dfs=dfs@entry=0x5608ec71f9e0, th=th@entry=..., oh=..., name=name@entry=0x7f68ec0045c0 "write_test_file", len=len@entry=15, obj=obj@entry=0x7f68ec004590, get_size=<optimized out>, stbuf=<optimized out>,
obj_hlc=<optimized out>) at src/client/dfs/dfs.c:1000
#15 0x00007f693c75440e in dfs_osetattr (dfs=0x5608ec71f9e0, obj=0x7f68ec004590, stbuf=stbuf@entry=0x7f68f3ffe2f0, flags=flags@entry=4) at src/client/dfs/dfs.c:5344
#16 0x00005608ea254967 in dfuse_cb_setattr (req=0x7f68ec0042f0, ie=0x7f68ec003830, attr=0x7f68f3ffe2f0, to_set=0) at src/client/dfuse/ops/setattr.c:108
#17 0x00005608ea221ba7 in df_ll_setattr (req=0x7f68ec0042f0, ino=<optimized out>, attr=0x7f68f3ffe2f0, to_set=1056, fi=<optimized out>) at src/client/dfuse/dfuse_fuseops.c:245
#18 0x00007f693c09ef6a in do_setattr () from /lib64/libfuse3.so.3
#19 0x00007f693c0a0dba in fuse_session_process_buf_int () from /lib64/libfuse3.so.3
#20 0x00005608ea22599a in dfuse_do_work (arg=0x5608ec70e470) at src/client/dfuse/dfuse_thread.c:60
#21 0x00007f693d2361ca in start_thread () from /lib64/libpthread.so.0
#22 0x00007f693ba9de73 in clone () from /lib64/libc.so.6
任务调度执行堆栈:
#0 tse_sched_run (sched=sched@entry=0x7f91aedd6ba0 <daos_sched_g>) at src/common/tse.c:755
#1 0x00007f91ae789065 in tse_sched_progress (sched=0x7f91aedd6ba0 <daos_sched_g>) at src/common/tse.c:791
#2 0x00007f91aea1033f in ev_progress_cb (arg=arg@entry=0x7f919fffe180) at src/client/api/event.c:516
#3 0x00007f91acca2eb2 in crt_progress_cond (crt_ctx=0x5602ff224b00, timeout=timeout@entry=0, cond_cb=cond_cb@entry=0x7f91aea1032b <ev_progress_cb>, arg=arg@entry=0x7f919fffe180) at src/cart/crt_context.c:1648
#4 0x00007f91aea1a2f3 in daos_event_priv_wait () at src/client/api/event.c:1276
#5 0x00007f91aea2793c in dc_task_schedule (task=<optimized out>, instant=instant@entry=true) at src/client/api/task.c:124
#6 0x00007f91aea209c1 in daos_obj_open (coh=..., oid=..., mode=mode@entry=2, oh=oh@entry=0x7f919fffe200, ev=ev@entry=0x0) at src/client/api/object.c:49
#7 0x00007f91ae2faeb5 in dfs_ostat (dfs=0x5602ff6f09e0, obj=0x5602ff6efb80, stbuf=stbuf@entry=0x7f919fffe250) at src/client/dfs/dfs.c:4928
#8 0x00005602fd14ed99 in dfuse_cb_getattr (req=0x7f9198000c10, ie=0x5602ff6ef960) at src/client/dfuse/ops/fgetattr.c:22
#9 0x00005602fd1429fc in df_ll_getattr (req=0x7f9198000c10, ino=<optimized out>, fi=<optimized out>) at src/client/dfuse/dfuse_fuseops.c:204
#10 0x00007f91adc48009 in do_getattr () from /lib64/libfuse3.so.3
#11 0x00007f91adc49dba in fuse_session_process_buf_int () from /lib64/libfuse3.so.3
#12 0x00005602fd14599a in dfuse_do_work (arg=0x5602ff6f0090) at src/client/dfuse/dfuse_thread.c:60
#13 0x00007f91aeddf1ca in start_thread () from /lib64/libpthread.so.0
#14 0x00007f91ad646e73 in clone () from /lib64/libc.so.6
|