.NET Core 工作流WorkFlowCore

前言

WorkFlowCore 是一个针对.NetCore 的轻量级的工作流引擎,提供了 FluentAPI、多任务、持久化以及并行处理的功能,适合于小型工作流、责任链的需求开发。支持工作流长期运行,提供了各种持久化方式。
本篇开发环境为.NET 7,此处不演示 Json 和 yaml 配置,详细文档请查看官方文档:https://workflow-core.readthedocs.io/en/latest/getting-started/和项目源码地址:https://github.com/danielgerlag/workflow-core

一、安装与基础使用

通过以下命令安装

Install-Package WorkflowCore

然后注入 WorkFlowCore

builder.Services.AddWorkflow();

WorkFlowCore 主要分为两部分:步骤和工作流

步骤

多个步骤组成一个工作流,每个步骤都可以有输入并产生输出,这些输出可以传递回其所在的工作流。通过创建继承抽象类 StepBody 或 StepBodyAsync 的类,并且实现 Run 或 RunAsync 方法来定义步骤,很明显它们的区别是是否异步

public class FirstStepBody: StepBody
{
    public override ExecutionResult Run(IStepExecutionContext context)
    {
        Console.WriteLine("Hello world!First");
        return ExecutionResult.Next();
    }
}

工作流
通过继承 IWorkflow 接口定义一个工作流,接口只有 Id、Version 和 Build 方法(内部可以执行多个步骤),工作流主机使用这些信息来标识工作流

public class MyWorkflow :IWorkflow
{
    public string Id => "HelloWorld";
    public int Version => 1;
    public void Build(IWorkflowBuilder<object> builder)
    {
        builder
            .StartWith<FirstStepBody>()
            .Then<FirstStepBody>();
    }
}

工作流如果想使用必须在工作流主机中通过 RegisterWorkflow()方法注册,并且通过 Start()方法启动主机,当然也可以通过 Stop()方法停止工作流。执行工作流需要使用 StartWorkflow()方法,参数为工作流类的 Id,如下

[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{
    private readonly IWorkflowHost _workflowHost;
    public WeatherForecastController(IWorkflowHost workflowHost)
    {
        _workflowHost = workflowHost;
    }
    [HttpGet(Name = "get")]
    public ContentResult Get()
    {
        if (!_workflowHost.Registry.IsRegistered("HelloWorld",1))
        {
            _workflowHost.RegisterWorkflow<MyWorkflow>();
        }
        _workflowHost.Start();
        _workflowHost.StartWorkflow("HelloWorld");
        //host.Stop();
        return Content("ok");
    }
}

当然也可以在构建 web 服务的时候统一注册,然后就可以直接执行啦

var host = app.Services.GetService<IWorkflowHost>();
host.RegisterWorkflow<MyWorkflow>();
host.Start();

二、在步骤之间传递参数
每个步骤都是一个黑盒,因此它们支持输入和输出。这些输入和输出可以映射到一个数据类,该数据类定义与每个工作流实例相关的自定义数据。

以下示例显示了如何定义步骤的输入和输出,然后显示了如何使用内部数据的类型化类定义工作流,以及如何将输入和输出映射到自定义数据类的属性。

//步骤包含属性,并且计算
public class FirstStepBody: StepBody
{
     public int Input1 { get; set; }
     public int Input2 { get; set; }
     public int Output { get; set; }
     public override ExecutionResult Run(IStepExecutionContext context)
     {
         Output = Input1 + Input2;
         Console.WriteLine(Output);
         return ExecutionResult.Next();
     }
 }
 //工作流包含输入输出的赋值
 public class MyWorkflow :IWorkflow<MyDataClass>
 {
     public string Id => "HelloWorld";
     public int Version => 1;
     public void Build(IWorkflowBuilder<MyDataClass> builder)
     {
         builder
             .StartWith<FirstStepBody>()
             .Input(step => step.Input1,data => data.Value1)
             .Input(step => step.Input2, data => 100)
             .Output(data => data.Answer, step => step.Output)
             .Then<FirstStepBody>()
             .Input(step => step.Input1, data => data.Value1)
             .Input(step => step.Input2, data => data.Answer)
             .Output(data => data.Answer, step => step.Output);
     }
 }
 //工作流的属性类
 public class MyDataClass
 {
     public int Value1 { get; set; }
     public int Value2 { get; set; }
     public int Answer { get; set; }
 }
 //执行工作流传入参数
 MyDataClass myDataClass = new MyDataClass();
 myDataClass.Value1 = 100;
 myDataClass.Value2 = 200;
 //不传入 myDataClass 则每次执行都是新的数据对象
 _workflowHost.StartWorkflow("HelloWorld", myDataClass);

 

从上述例子可以看到工作流可以定义一个初始的类作为参数传入,每个步骤可以有自己的属性字段去接收参数(可以是工作流类的字段,也可以是固定值),可以用 Input 方法传入,Output 方法输出赋值。如果在工作流执行时不传入参数每次执行都是新的对象的默认值,比如在 StartWorkflow 方法中不传 myDataClass,运行结果是 100 和 100,否则是 200 和 300

三、外部事件

工作流可以使用 WaitFor 方法进行等待,通过外部触发此事件,将事件产生的数据传递给工作流,并且让工作流继续执行下面的步骤。示例如下:

public class MyWorkflow :IWorkflow<MyDataClass>
{
    //省略。。。。
    public void Build(IWorkflowBuilder<MyDataClass> builder)
    {
        builder
            .StartWith<FirstStepBody>()
            .Input(step => step.Input1,data => data.Value1)
            .Input(step => step.Input2, data => 100)
            .Output(data => data.Answer, step => step.Output)
            .WaitFor("MyEvent",key => "EventKey")
            .Output(data => data.Answer,step => step.EventData)
            .Then<FirstStepBody>()
            .Input(step => step.Input1, data => data.Value1)
            .Input(step => step.Input2, data => data.Answer)
            .Output(data => data.Answer, step => step.Output);
    }
}
//。。。
[HttpGet(Name = "get")]
public ContentResult Get()
{
        MyDataClass myDataClass = new MyDataClass();
        myDataClass.Value1 = 100;
        myDataClass.Value2 = 200;
        _workflowHost.StartWorkflow("HelloWorld", myDataClass);
            return Content("ok");
        }
[HttpPost(Name = "event")]
public ContentResult PublishEvent()
{
  _workflowHost.PublishEvent("MyEvent", "EventKey", 200);
  return Content("ok");
}

使用 WaitFor 方法可以使工作流等待监听指定事件的执行,有两个入参事件名称和事件关键字。

通过工作流主机去触发 PublishEvent 执行指定的事件,有三个入参触发事件名称、触发事件关键字和事件参数。

需要执行事件,工作流才会继续下一步,如下动图演示:

.NET Core 工作流 WorkFlowCore

 

可以为等待事件设置有效时间,在有效时间之前执行事件是不会继续下一步流程的,只有当大于有效时间之后执行事件才会继续下一步步骤。如下代码设置,为工作流执行时间一天后执行事件才会继续执行,否则就等待不动。

WaitFor("MyEvent",key => "EventKey", data => DateTime.Now.AddDays(1))

四、活动
活动被定义为在工作流中可以被等待的外部工作队列中的步骤。

在本例中,工作流将等待活动 activity-1,直到活动完成才继续工作流。它还将 data.Value1 的值传递给活动,然后将活动的结果映射到 data.Value2。

然后我们创建一个 worker 来处理活动项的队列。它使用 GetPendingActivity 方法来获取工作流正在等待的活动和数据。

//.....
builder
 .StartWith<FirstStepBody>()
 .Input(step => step.Input1,data => data.Value1)
 .Input(step => step.Input2, data => 100)
 .Output(data => data.Answer, step => step.Output)
 .Activity("activity-1", (data) => data.Value1)
 .Output(data => data.Value2, step => step.Result)
 .Then<FirstStepBody>()
 .Input(step => step.Input1, data => data.Value1)
 .Input(step => step.Input2, data => data.Answer)
 .Output(data => data.Answer, step => step.Output);
 //....
[HttpPost(Name = "active")]
public ContentResult PublishEvent()
{
 var activity = _workflowHost.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
 if (activity != null)
 {
   Console.WriteLine(activity.Parameters);
   _workflowHost.SubmitActivitySuccess(activity.Token, 100);
 }
 return Content("ok");
}

活动可以看作一个等待的步骤可以传入参数和输出参数,和事件的区别是事件不能输入参数而是单纯的等待。

五、错误处理

每个步骤都可以配置自己的错误处理行为,可以在以后重试、挂起工作流或终止工作流。

public void Build(IWorkflowBuilder<object> builder)
{
    builder                
        .StartWith<HelloWorld>()
            .OnError(WorkflowErrorHandling.Retry,TimeSpan.FromMinutes(10))
        .Then<GoodbyeWorld>();
}

六、流程控制
工作流的流程控制包括分支、循环等各种操作

决策分支

在工作流中定义多个独立分支,并根据表达式值选择满足条件的分支执行。

使用 IWorkflowBuilder 的 CreateBranch 方法定义分支。然后我们可以使用 branch 方法选择一个分支。

选择表达式将与通过 branch 方法列出的分支相匹配,匹配的分支将安排执行。匹配多个分支将导致并行分支运行。

如果 data.Value1 的值为 1,则此工作流将选择 branch1,如果为 2,则选择 branch2。

var branch1 = builder.CreateBranch()
  .StartWith<PrintMessage>()
      .Input(step => step.Message, data => "hi from 1")
  .Then<PrintMessage>()
      .Input(step => step.Message, data => "bye from 1");

var branch2 = builder.CreateBranch()
    .StartWith<PrintMessage>()
        .Input(step => step.Message, data => "hi from 2")
    .Then<PrintMessage>()
        .Input(step => step.Message, data => "bye from 2");
  builder
    .StartWith<HelloWorld>()
    .Decide(data => data.Value1)
        .Branch((data, outcome) => data.Value1 == "one", branch1)
        .Branch((data, outcome) => data.Value1 == "two", branch2);

并行 ForEach
使用 ForEach 方法启动并行 for 循环

public class ForEachWorkflow : IWorkflow
{
    public string Id => "Foreach";
    public int Version => 1;
    public void Build(IWorkflowBuilder<object> builder)
    {
        builder
            .StartWith<SayHello>()
            .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                .Do(x => x
                    .StartWith<DisplayContext>()
                        .Input(step => step.Message, (data, context) => context.Item)
                    .Then<DoSomething>())
            .Then<SayGoodbye>();
    }        
}

While 循环
使用 While 方法启动 while 循环

public class WhileWorkflow : IWorkflow<MyData>
{
    public string Id => "While";
    public int Version => 1;
    public void Build(IWorkflowBuilder<MyData> builder)
    {
        builder
            .StartWith<SayHello>()
            .While(data => data.Counter < 3)
                .Do(x => x
                    .StartWith<DoSomething>()
                    .Then<IncrementStep>()
                        .Input(step => step.Value1, data => data.Counter)
                        .Output(data => data.Counter, step => step.Value2))
            .Then<SayGoodbye>();
    }        
}

If 判断
使用 If 方法执行 if 判断

public class IfWorkflow : IWorkflow<MyData>
{ 
    public void Build(IWorkflowBuilder<MyData> builder)
    {
        builder
            .StartWith<SayHello>()
            .If(data => data.Counter < 3).Do(then => then
                .StartWith<PrintMessage>()
                    .Input(step => step.Message, data => "Value is less than 3")
            )
            .If(data => data.Counter < 5).Do(then => then
                .StartWith<PrintMessage>()
                    .Input(step => step.Message, data => "Value is less than 5")
            )
            .Then<SayGoodbye>();
    }        
}

并行
使用 Parallel 方法并行执行任务

public class ParallelWorkflow : IWorkflow<MyData>
{
      public string Id => "parallel-sample";
      public int Version => 1;
      public void Build(IWorkflowBuilder<MyData> builder)
      {
          builder
              .StartWith<SayHello>()
              .Parallel()
                  .Do(then => 
                      then.StartWith<Task1dot1>()
                          .Then<Task1dot2>()
                  .Do(then =>
                      then.StartWith<Task2dot1>()
                          .Then<Task2dot2>()
              .Join()
              .Then<SayGoodbye>();
    }        
}

Schedule
使用 Schedule 方法在工作流中注册在指定时间后执行的异步方法

builder
    .StartWith(context => Console.WriteLine("Hello"))
    .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
        .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
    )
    .Then(context => Console.WriteLine("Doing normal tasks"));

Recur
使用 Recure 方法在工作流中设置一组重复的后台步骤,直到满足特定条件为止

builder
    .StartWith(context => Console.WriteLine("Hello"))
    .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur
        .StartWith(context => Console.WriteLine("Doing recurring task"))
    )
    .Then(context => Console.WriteLine("Carry on"));

七、Saga transaction
saga 允许在 saga transaction 中封装一系列步骤,并为每一个步骤提供补偿步骤,使用 CompensateWith 方法在对应的步骤后面添加补偿步骤,补偿步骤将会在步骤抛出异常的时候触发。

如下示例,步骤 Task2 如果抛出一个异常,那么补偿步骤 UndoTask2 和 UndoTask1 将被触发。

builder
    .StartWith(context => Console.WriteLine("Begin"))
    .Saga(saga => saga
        .StartWith<Task1>()
            .CompensateWith<UndoTask1>()
        .Then<Task2>()
            .CompensateWith<UndoTask2>()
        .Then<Task3>()
            .CompensateWith<UndoTask3>()
    )
        .CompensateWith<CleanUp>()
    .Then(context => Console.WriteLine("End"));

也可以指定重试策略,在指定时间间隔后重试。

builder
    .StartWith(context => Console.WriteLine("Begin"))
    .Saga(saga => saga
        .StartWith<Task1>()
            .CompensateWith<UndoTask1>()
        .Then<Task2>()
            .CompensateWith<UndoTask2>()
        .Then<Task3>()
            .CompensateWith<UndoTask3>()
    )
    .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
    .Then(context => Console.WriteLine("End"));

八、持久化
可以使用 Redis、Mongdb、Sqlserver 等持久化,具体可以看文档,此处使用 Redis,先安装 nuget 包

 

Install-Package WorkflowCore.Providers.Redis

然后注入就可以了

builder.Services.AddWorkflow(cfg =>
{
    cfg.UseRedisPersistence("localhost:6379", "app-name");
    cfg.UseRedisLocking("localhost:6379");
    cfg.UseRedisQueues("localhost:6379", "app-name");
    cfg.UseRedisEventHub("localhost:6379", "channel-name");
    //cfg.UseMongoDB(@"mongodb://mongo:27017", "workflow");
    //cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://elastic:9200")), "workflows");
});

运行打开可以看到

.NET Core 工作流 WorkFlowCore

 

 


来源:cnblogs.com/xwc1996/p/17306568.html

© 版权声明

☆ END ☆
喜欢就点个赞吧
点赞0 分享
图片正在生成中,请稍后...