WorkFlowCore
是一个针对 .NetCore
的轻量级的工作流引擎,提供了FluentAPI、多任务、持久化以及并行处理的功能,适合于小型工作流、责任链的需求开发。支持工作流长期运行,提供了各种持久化方式。
本篇开发环境为 .Net7
,此处不演示 Json
和 yaml
配置,详细文档请查看 官方文档 和 项目源码地址
通过以下命令安装
Install-Package WorkflowCore
然后注入 WorkFlowCore
builder.Services.AddWorkflow();
WorkFlowCore
主要分为两部分:步骤和工作流
** 步骤**
** **多个步骤组成一个工作流,每个步骤都可以有输入并产生输出,这些输出可以传递回其所在的工作流。通过创建继承抽象类StepBody或StepBodyAsync的类,并且实现Run或RunAsync方法来定义步骤,很明显它们的区别是是否异步
public classFirstStepBody: StepBody
{
public overrideExecutionResult Run(IStepExecutionContext context)
{
Console.WriteLine("Hello world!First");
returnExecutionResult.Next();
}
}
工作流
通过继承 IWorkflow
接口定义一个工作流,接口只有 Id
、 Version
和 Build
方法(内部可以执行多个步骤),工作流主机使用这些信息来标识工作流
public classMyWorkflow :IWorkflow
{
public string Id => "HelloWorld";
public int Version => 1;
public void Build(IWorkflowBuilder<object>builder)
{
builder
.StartWith<FirstStepBody>()
.Then<FirstStepBody>();
}
}
工作流如果想使用必须在工作流主机中通过 RegisterWorkflow()
方法注册,并且通过 Start()
方法启动主机,当然也可以通过 Stop()
方法停止工作流。执行工作流需要使用 StartWorkflow()
方法,参数为工作流类的 Id
,如下
[ApiController]
[Route("[controller]")]
public classWeatherForecastController : ControllerBase
{
private readonlyIWorkflowHost _workflowHost;
publicWeatherForecastController(IWorkflowHost workflowHost)
{
_workflowHost =workflowHost;
}
[HttpGet(Name = "get")]
publicContentResult Get()
{
if (!_workflowHost.Registry.IsRegistered("HelloWorld",1))
{
_workflowHost.RegisterWorkflow<MyWorkflow>();
}
_workflowHost.Start();
_workflowHost.StartWorkflow("HelloWorld");
//host.Stop();
return Content("ok");
}
}
当然也可以在构建 web
服务的时候统一注册,然后就可以直接执行啦
var host = app.Services.GetService<IWorkflowHost>();
host.RegisterWorkflow<MyWorkflow>();
host.Start();
每个步骤都是一个黑盒,因此它们支持输入和输出。这些输入和输出可以映射到一个数据类,该数据类定义与每个工作流实例相关的自定义数据。
以下示例显示了如何定义步骤的输入和输出,然后显示了如何使用内部数据的类型化类定义工作流,以及如何将输入和输出映射到自定义数据类的属性。
//步骤包含属性,并且计算
public classFirstStepBody: StepBody
{
public int Input1 { get; set; }
public int Input2 { get; set; }
public int Output { get; set; }
public overrideExecutionResult Run(IStepExecutionContext context)
{
Output = Input1 +Input2;
Console.WriteLine(Output);
returnExecutionResult.Next();
}
}
//工作流包含输入输出的赋值
public class MyWorkflow :IWorkflow<MyDataClass>{
public string Id => "HelloWorld";
public int Version => 1;
public void Build(IWorkflowBuilder<MyDataClass>builder)
{
builder
.StartWith<FirstStepBody>()
.Input(step => step.Input1,data =>data.Value1)
.Input(step => step.Input2, data => 100)
.Output(data => data.Answer, step =>step.Output)
.Then<FirstStepBody>()
.Input(step => step.Input1, data =>data.Value1)
.Input(step => step.Input2, data =>data.Answer)
.Output(data => data.Answer, step =>step.Output);
}
}
//工作流的属性类
public classMyDataClass
{
public int Value1 { get; set; }
public int Value2 { get; set; }
public int Answer { get; set; }
}
//执行工作流传入参数
MyDataClass myDataClass = newMyDataClass();
myDataClass.Value1 = 100;
myDataClass.Value2 = 200;
//不传入myDataClass则每次执行都是新的数据对象
_workflowHost.StartWorkflow("HelloWorld", myDataClass);
从上述例子可以看到工作流可以定义一个初始的类作为参数传入,每个步骤可以有自己的属性字段去接收参数(可以是工作流类的字段,也可以是固定值),可以用 Input
方法传入, Output
方法输出赋值。如果在工作流执行时不传入参数每次执行都是新的对象的默认值,比如在 StartWorkflow
方法中不传 myDataClass
,运行结果是 100
和 100
,否则是 200
和 300
工作流可以使用 WaitFor
方法进行等待,通过外部触发此事件,将事件产生的数据传递给工作流,并且让工作流继续执行下面的步骤。示例如下:
public class MyWorkflow :IWorkflow<MyDataClass>{
//省略。。。。
public void Build(IWorkflowBuilder<MyDataClass>builder)
{
builder
.StartWith<FirstStepBody>()
.Input(step => step.Input1,data =>data.Value1)
.Input(step => step.Input2, data => 100)
.Output(data => data.Answer, step =>step.Output)
.WaitFor("MyEvent",key => "EventKey")
.Output(data => data.Answer,step =>step.EventData)
.Then<FirstStepBody>()
.Input(step => step.Input1, data =>data.Value1)
.Input(step => step.Input2, data =>data.Answer)
.Output(data => data.Answer, step =>step.Output);
}
}
//。。。
[HttpGet(Name = "get")]
publicContentResult Get()
{
MyDataClass myDataClass = newMyDataClass();
myDataClass.Value1 = 100;
myDataClass.Value2 = 200;
_workflowHost.StartWorkflow("HelloWorld", myDataClass);
return Content("ok");
}
[HttpPost(Name = "event")]
publicContentResult PublishEvent()
{
_workflowHost.PublishEvent("MyEvent", "EventKey", 200);
return Content("ok");
}
使用 WaitFor
方法可以使工作流等待监听指定事件的执行,有两个入参事件名称和事件关键字。通过工作流主机去触发 PublishEvent
执行指定的事件,有三个入参触发事件名称、触发事件关键字和事件参数。
需要执行事件,工作流才会继续下一步,如下动图演示:
可以为等待事件设置有效时间,在有效时间之前执行事件是不会继续下一步流程的,只有当大于有效时间之后执行事件才会继续下一步步骤。如下代码设置,为工作流执行时间一天后执行事件才会继续执行,否则就等待不动。
WaitFor("MyEvent",key => "EventKey", data => DateTime.Now.AddDays(1))
活动被定义为在工作流中可以被等待的外部工作队列中的步骤。
在本例中,工作流将等待活动 activity-1
,直到活动完成才继续工作流。它还将 data.Value1
的值传递给活动,然后将活动的结果映射到 data.Value2
。
然后我们创建一个 worker
来处理活动项的队列。它使用 GetPendingActivity
方法来获取工作流正在等待的活动和数据。
//.....
builder
.StartWith<FirstStepBody>()
.Input(step => step.Input1,data =>data.Value1)
.Input(step => step.Input2, data => 100)
.Output(data => data.Answer, step =>step.Output)
.Activity("activity-1", (data) =>data.Value1)
.Output(data => data.Value2, step =>step.Result)
.Then<FirstStepBody>()
.Input(step => step.Input1, data =>data.Value1)
.Input(step => step.Input2, data =>data.Answer)
.Output(data => data.Answer, step =>step.Output);
//....
[HttpPost(Name = "active")]
publicContentResult PublishEvent()
{
var activity = _workflowHost.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine(activity.Parameters);
_workflowHost.SubmitActivitySuccess(activity.Token, 100);
}
return Content("ok");
}
活动可以看作一个等待的步骤可以传入参数和输出参数,和事件的区别是事件不能输入参数而是单纯的等待。
每个步骤都可以配置自己的错误处理行为,可以在以后重试、挂起工作流或终止工作流。
public void Build(IWorkflowBuilder<object>builder)
{
builder
.StartWith<HelloWorld>()
.OnError(WorkflowErrorHandling.Retry,TimeSpan.FromMinutes(10))
.Then<GoodbyeWorld>();
}
工作流的流程控制包括分支、循环等各种操作
决策分支
在工作流中定义多个独立分支,并根据表达式值选择满足条件的分支执行。
使用 IWorkflowBuilder
的 CreateBranch
方法定义分支。然后我们可以使用 branch
方法选择一个分支。
选择表达式将与通过 branch
方法列出的分支相匹配,匹配的分支将安排执行。匹配多个分支将导致并行分支运行。
如果 data.Value1
的值为 1
,则此工作流将选择 branch1
,如果为 2
,则选择 branch2
。
var branch1 =builder.CreateBranch()
.StartWith<PrintMessage>()
.Input(step => step.Message, data => "hi from 1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "bye from 1");
var branch2 =builder.CreateBranch()
.StartWith<PrintMessage>()
.Input(step => step.Message, data => "hi from 2")
.Then<PrintMessage>()
.Input(step => step.Message, data => "bye from 2");
builder
.StartWith<HelloWorld>()
.Decide(data =>data.Value1)
.Branch((data, outcome) => data.Value1 == "one", branch1)
.Branch((data, outcome) => data.Value1 == "two", branch2);
并行ForEach
使用 ForEach
方法启动并行 for
循环
public classForEachWorkflow : IWorkflow
{
public string Id => "Foreach";
public int Version => 1;
public void Build(IWorkflowBuilder<object>builder)
{
builder
.StartWith<SayHello>()
.ForEach(data => new List<int>() { 1, 2, 3, 4})
.Do(x =>x
.StartWith<DisplayContext>()
.Input(step => step.Message, (data, context) =>context.Item)
.Then<DoSomething>())
.Then<SayGoodbye>();
}
}
While循环
使用 While
方法启动 while
循环
public class WhileWorkflow : IWorkflow<MyData>{
public string Id => "While";
public int Version => 1;
public void Build(IWorkflowBuilder<MyData>builder)
{
builder
.StartWith<SayHello>()
.While(data => data.Counter < 3)
.Do(x =>x
.StartWith<DoSomething>()
.Then<IncrementStep>()
.Input(step => step.Value1, data =>data.Counter)
.Output(data => data.Counter, step =>step.Value2))
.Then<SayGoodbye>();
}
}
If判断
使用 If
方法执行 if
判断
public class IfWorkflow : IWorkflow<MyData>{
public void Build(IWorkflowBuilder<MyData>builder)
{
builder
.StartWith<SayHello>()
.If(data => data.Counter < 3).Do(then =>then
.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Value is less than 3")
)
.If(data => data.Counter < 5).Do(then =>then
.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Value is less than 5")
)
.Then<SayGoodbye>();
}
}
并行
使用 Parallel
方法并行执行任务
public class ParallelWorkflow : IWorkflow<MyData>{
public string Id => "parallel-sample";
public int Version => 1;
public void Build(IWorkflowBuilder<MyData>builder)
{
builder
.StartWith<SayHello>()
.Parallel()
.Do(then =>then.StartWith<Task1dot1>()
.Then<Task1dot2>()
.Do(then =>then.StartWith<Task2dot1>()
.Then<Task2dot2>().Join()
.Then<SayGoodbye>();
}
}
Schedule
使用 Schedule
方法在工作流中注册在指定时间后执行的异步方法
builder
.StartWith(context => Console.WriteLine("Hello"))
.Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule =>schedule
.StartWith(context => Console.WriteLine("Doing scheduled tasks"))
)
.Then(context => Console.WriteLine("Doing normal tasks"));
Recur
使用 Recure
方法在工作流中设置一组重复的后台步骤,直到满足特定条件为止
builder
.StartWith(context => Console.WriteLine("Hello"))
.Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur =>recur
.StartWith(context => Console.WriteLine("Doing recurring task"))
)
.Then(context => Console.WriteLine("Carry on"));
saga
允许在 saga transaction
中封装一系列步骤,并为每一个步骤提供补偿步骤,使用 CompensateWith
方法在对应的步骤后面添加补偿步骤,补偿步骤将会在步骤抛出异常的时候触发。
如下示例,步骤 Task2
如果抛出一个异常,那么补偿步骤 UndoTask2
和 UndoTask1
将被触发。
builder
.StartWith(context => Console.WriteLine("Begin"))
.Saga(saga =>saga
.StartWith<Task1>()
.CompensateWith<UndoTask1>()
.Then<Task2>()
.CompensateWith<UndoTask2>()
.Then<Task3>()
.CompensateWith<UndoTask3>()
)
.CompensateWith<CleanUp>()
.Then(context => Console.WriteLine("End"));
也可以指定重试策略,在指定时间间隔后重试。
builder
.StartWith(context => Console.WriteLine("Begin"))
.Saga(saga =>saga
.StartWith<Task1>()
.CompensateWith<UndoTask1>()
.Then<Task2>()
.CompensateWith<UndoTask2>()
.Then<Task3>()
.CompensateWith<UndoTask3>()
)
.OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
.Then(context => Console.WriteLine("End"));
可以使用 Redis
、 Mongdb
、 Sqlserver
等持久化,具体可以看文档,此处使用 Redis
,先安装 nuget
包
Install-Package WorkflowCore.Providers.Redis
然后注入就可以了
builder.Services.AddWorkflow(cfg =>{
cfg.UseRedisPersistence("localhost:6379", "app-name");
cfg.UseRedisLocking("localhost:6379");
cfg.UseRedisQueues("localhost:6379", "app-name");
cfg.UseRedisEventHub("localhost:6379", "channel-name");
//cfg.UseMongoDB(@"mongodb://mongo:27017", "workflow");
//cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://elastic:9200")), "workflows");
});
运行打开可以看到
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://www.cnblogs.com/xwc1996/p/17306568.html
内容来源于网络,如有侵权,请联系作者删除!