找回密码
 立即注册
首页 业界区 业界 为workflow-core扩展外抛事件

为workflow-core扩展外抛事件

拓炊羡 2025-8-22 10:14:04
workflow-core自带事件的局限性

众所周知,workflow-core只能从外向内抛事件
比如在api接口中引发事件,在工作流中等待事件完成
  1. //第一步,开启流程
  2. public async Task<IActionResult> Add([FromBody] Dto parm)
  3. {
  4.     //启动一个新的工作流
  5.     //由 Workflow-Core 自动生成(默认是 GUID 格式,唯一标识一个正在运行的工作流实例
  6.     //通过该ID可以控制工作流生命周期
  7.     var workflowId = await _workflowHost.StartWorkflow("xxxFlow", 1, parm);
  8.     return SUCCESS(1);
  9. }
  10. //第二步,审批,引发事件
  11. public async Task<IActionResult> Audit([FromBody] Dto parm)
  12. {
  13.     var modal = parm.Adapt<Dto>().ToUpdate(HttpContext);
  14.     //通知工作流继续执行
  15.     await _workflowHost.PublishEvent(
  16.         "audit",  // 必须与WaitForEvent的事件名称匹配
  17.         parm.WorkflowId,  // 必须与WaitForEvent的事件Key匹配
  18.         parm//提供的数据
  19.     );
  20.     return SUCCESS(1);
  21. }
复制代码
但是通常来说,我们在前端调用一个接口,要获得返回数据。比如Add后要刷新界面,看到表格中新添加的一行记录。
然而由于我们缺乏这种手段来从流程中获得反馈,而且是等待式await的反馈。所以Add就直接俄返回了,界面上刷新时,流程可能还没跑到插入数据库那一步,于是界面上也看不到新数据。
事件扩展

我研究了一天后,利用C#的扩展方法语法,给workflow-core扩展了外抛事件,这样可以在接口中等待流程中某个步骤完成。
设想

设想调用方法应该这样
  1. //第一步,开启流程
  2. public async Task<IActionResult> Add([FromBody] Dto parm)
  3. {
  4.     //启动一个新的工作流
  5.     //由 Workflow-Core 自动生成(默认是 GUID 格式,唯一标识一个正在运行的工作流实例
  6.     //通过该ID可以控制工作流生命周期
  7.     var workflowId = await _workflowHost.StartWorkflow("xxxFlow", 1, parm);
  8.     //等待瞬时事件,SubmitProblemStep步骤执行完成
  9.     var response = await _workflowHost.WaitEvent<Dto>("SubmitStep", workflowId);
  10.     return SUCCESS(response);
  11. }
复制代码
而流程中这样引发事件比较优雅
  1. Action<WaitFor, Dto> outputAction = (edata, data) => {
  2.     (edata.EventData as Dto).Adapt(data);//保证引用对象data不丢失的情况下更新对象data
  3. };
  4. //这样引发事件
  5. builder
  6.     // 提交记录
  7.     .StartWith<SubmitStep>()
  8.         .Input(step=>step.Entity, data=>data)
  9.         .Output((step,data)=> data=step.Entity)//将WorkflowId赋值给流程,便于WaitFor监听事件
  10.     //RaiseEvent扩展的向外抛事件,嵌入原来fluentApi中引发事件
  11.     //执行到这里,表示记录已在SubmitStep中插入到数据库了
  12.     .RaiseEvent("SubmitStep", data=>data.WorkflowId)
  13.     //workflow-core自带的向内抛事件
  14.     .WaitFor("AuditStep", data => data.WorkflowId, date => DateTime.Now)
复制代码
实现

我观察了已有的.Then,.Input这些方法,终于搞清楚实现哪个接口,扩展我们的方法。按照设想,需要暴露两个方法

  • RaiseEvent 给内部流程用
  • WaitEvent 给外部接口用
  1. /// <summary>
  2. /// 工作流从内->外抛出事件扩展
  3. /// </summary>
  4. public static class WorkFlowEventExtensions
  5. {
  6.     /// <summary>
  7.     /// 瞬时事件集合
  8.     /// </summary>
  9.     public static List<EventSource> events = new List<EventSource>();
  10.     /// <summary>
  11.     /// 抛出事件扩展
  12.     /// </summary>
  13.     /// <typeparam name="TData"></typeparam>
  14.     /// <typeparam name="TStepBody"></typeparam>
  15.     /// <param name="builder"></param>
  16.     /// <param name="eventName"></param>
  17.     /// <param name="eventKey"></param>
  18.     /// <returns></returns>
  19.     public static IStepBuilder<TData, ActionStepBody> RaiseEvent<TData, TStepBody>(this IWorkflowModifier<TData, TStepBody> builder,
  20.         string eventName, Func<TData, string> eventKey) where TStepBody : IStepBody
  21.     {
  22.         return builder
  23.             .Delay(d => TimeSpan.FromMilliseconds(100))//延迟流程小会儿,等待事件events先添加成功
  24.             .Then(ctx =>
  25.             {
  26.                 var key = eventKey.Invoke((TData)ctx.Workflow.Data);
  27.                 //释放互斥锁,让WaitEvent继续执行下去
  28.                 var mu = events.FirstOrDefault(x => x.EventName == eventName && x.EventKey == key);
  29.                 if (mu != null)
  30.                 {
  31.                     mu.Data = ctx.Workflow.Data;
  32.                     //完成任务
  33.                     mu.Cts.TrySetResult(true);
  34.                 }
  35.             });
  36.     }
  37.     /// <summary>
  38.     /// 等待内部事件扩展
  39.     /// </summary>
  40.     /// <typeparam name="TData">可以是workflow的实体类,比如IWorkflow<TData>中的TData类</typeparam>
  41.     /// <param name="host"></param>
  42.     /// <param name="eventName">事件名</param>
  43.     /// <param name="eventKey">事件key,一般是WorkflowId,流程实例Id</param>
  44.     /// <returns></returns>
  45.     public static async Task<TData> WaitEvent<TData>(this IWorkflowHost host, string eventName, string eventKey)
  46.     {
  47.         TData data = default(TData);
  48.         //等待任务完成,说明步骤已执行,可以得到需要的数据
  49.         await Task.Run(async () =>
  50.         {
  51.             //向任务集合添加一个任务
  52.             EventSource eventCts = new EventSource
  53.             {
  54.                 EventName = eventName,
  55.                 EventKey = eventKey,
  56.                 Cts = new TaskCompletionSource<bool>(),
  57.             };
  58.             events.Add(eventCts);
  59.             //等待任务完成
  60.             await eventCts.Cts.Task;
  61.             //退出等待
  62.             if (eventCts.Data != null && eventCts.Data is TData)
  63.             {
  64.                 //取得数据
  65.                 data = (TData)eventCts.Data;
  66.             }
  67.             events.Remove(eventCts);
  68.         });
  69.         return data;
  70.     }
  71. }
  72. /// <summary>
  73. /// 事件源
  74. /// </summary>
  75. public class EventSource
  76. {
  77.     public string EventName { get; set; }
  78.     public string EventKey { get; set; }
  79.     /// <summary>
  80.     /// 任务完成源
  81.     /// </summary>
  82.     public TaskCompletionSource<bool> Cts { get; set; }
  83.     public object Data { get; set; }
  84. }
复制代码
结果


curd了一下,发现当然是达到了想要的结果
来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除

相关推荐

您需要登录后才可以回帖 登录 | 立即注册