一个超轻量级工作流引擎:Workflow-Core
发布于 1 个月前 作者 yan 68 次浏览

近期工作上有一个工作流的开发需求,自己基于面向对象和职责链模式捣鼓了一套小框架,后来在github上发现一个轻量级的工作流引擎轮子:Workflow-Core,看完其wiki之后决定放弃之前自己造的轮子,使用这个开源项目来改造,也就有了这一篇博文。

01

关于Workflow-Core

Workflow-Core是一个基于.NET Standard的轻量级工作流引擎,其GitHub地址为:https://github.com/danielgerlag/workflow-core,目前有超过1200+个star。它提供了FluentAPI、多任务、持久化以及并行处理的功能,适合于小型工作流、责任链的需求开发。

由于Workflow-Core支持工作流长期运行,因此Workflow-Core支持以下多种数据源格式的持久化,可以通过安装不同的Provider包来实现对应的持久化:

  • 内存(默认提供,用于测试和开发)
  • MongoDB
  • MS SQL Server
  • MySql
  • Sqlite
  • Redis
  • PostgreSQL

立刻上手把,Nuget上安装一把,目前最新版本2.0.0:

PM> Install-Package WorkflowCore 

02

Workflow-Core 基础使用

1、Hello World

这里创建了一个.NET Core控制台应用程序,快速演示第一个Workflow-Core的Hello World,展示如何开始一个Workflow:

(1)定义一个实现IWorkflow接口的Workflow:

   public class HelloWorldWorkflow : IWorkflow 
 
   { 
 
       public string Id => "HelloWorld"; 
 


 
       public int Version => 1; 
 


 
       public void Build(IWorkflowBuilder<object> builder) 
 
       { 
 
           builder 
 
               .StartWith<HelloWorld>() 
 
               .Then<ActiveWorld>() 
 
               .Then<GoodbyeWorld>(); 
 
       } 
 
   } 

这里定义了一个HelloWorldWorkflow,其版本号为1,它有3个步骤:HelloWorld、ActiveWorld和GoodbyeWorld,会依次执行。

(2)定义三个继承自StepBody类的步骤类:

   public class HelloWorld : StepBody 
 
   { 
 
       public override ExecutionResult Run(IStepExecutionContext context) 
 
       { 
 
           Console.WriteLine("Hello World!"); 
 
           return ExecutionResult.Next(); 
 
       } 
 
   } 
 


 
   public class ActiveWorld : StepBody 
 
   { 
 
       public override ExecutionResult Run(IStepExecutionContext context) 
 
       { 
 
           Console.WriteLine("I am activing in the World!"); 
 
           return ExecutionResult.Next(); 
 
       } 
 
   } 
 


 
   public class GoodbyeWorld : StepBody 
 
   { 
 
       public override ExecutionResult Run(IStepExecutionContext context) 
 
       { 
 
           Console.WriteLine("Goodbye World!"); 
 
           return ExecutionResult.Next(); 
 
       } 
 
   } 
 


 
  (3)ServiceCollection中注入Workflow-Core相关组件 
 
   private static IServiceProvider ConfigureServices() 
 
   { 
 
       IServiceCollection services = new ServiceCollection(); 
 
       services.AddLogging(); // WorkflowCore需要用到logging service 
 
       services.AddWorkflow(); 
 


 
       var serviceProvider = services.BuildServiceProvider(); 
 


 
       return serviceProvider; 
 
   } 

(4)在Program.cs的Main方法中获取到注入的host并执行工作流

       public static void Main(string[] args) 
 
       { 
 
           var serviceProvider = ConfigureServices(); 
 
           var host = serviceProvider.GetService<IWorkflowHost>(); 
 
           host.RegisterWorkflow<HelloWorldWorkflow>(); 
 
           host.Start(); 
 


 
           // Demo1:Hello World 
 
           host.StartWorkflow("HelloWorld"); 
 


 
           Console.ReadKey(); 
 
           host.Stop(); 
 
       }     

这里传入的是Workflow的Id,Workflow-Core会根据Id去自动匹配最新版本的对应Workflow,运行结果如下:

2、无处不在的If

在工作流处理中,往往会有很多的条件判断,那么在Workflow-Core中也提供了直接的If功能,如下面这个IfStatementWorkflow所示:

public class IfStatementWorkflow : IWorkflow<MyData> 
 
   { 
 
       public string Id => "if-sample"; 
 


 
       public int Version => 1; 
 


 
       public void Build(IWorkflowBuilder<MyData> builder) 
 
       { 
 
           builder 
 
               .StartWith<SayHello>() 
 
               .If(data => data.Counter < 3).Do(then => then 
 
                       .StartWith<PrintMessage>() 
 
                           .Input(step => step.Message, data => "Outcome is less than 3") 
 
               ) 
 
              .If(data => data.Counter < 5).Do(then => then 
 
                       .StartWith<PrintMessage>() 
 
                           .Input(step => step.Message, data => "Outcome is less than 5") 
 
               ) 
 
               .Then<SayGoodbye>(); 
 
       } 
 
   } 

这个传递进来的MyData的定义如下:

public class MyData 
 
   { 
 
       public int Counter { get; set; } 
 
   } 

当传递进来的MyData的Counter属性<3 或 <5时会有不同的分支进行逻辑的处理。

3、MySQL持久化支持

想要将工作流配置持久化到MySQL,只需以下两步:

(1)通过Nuget安装MySQL Provider包:

PM> Install-Package WorkflowCore.Persistence.MySQL 

(2)注入到ServiceCollection

services.AddWorkflow(x => x.UseMySQL(@"Server=127.0.0.1;Database=workflow;User=root;Password=password;", true, true)); 

一旦启动,你就会发现Workflow-Core自动帮你创建了很多表用于持久化工作流配置和实例。

4、计划任务和循环任务

Workflow-Core还继承了计划任务和循环任务的功能:

(1)计划任务:比如在工作流步骤中设置一个延迟5分钟执行的计划任务

builder 
 
   .StartWith(context => Console.WriteLine("Hello")) 
 
   .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule 
 
       .StartWith(context => Console.WriteLine("Doing scheduled tasks")) 
 
   ) 
 
   .Then(context => Console.WriteLine("Doing normal tasks")); 

(2)循环任务:比如在工作流步骤中设置一个延迟5分钟进行的循环任务,知道Counter > 5才结束

builder 
 
   .StartWith(context => Console.WriteLine("Hello")) 
 
   .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur 
 
       .StartWith(context => Console.WriteLine("Doing recurring task")) 
 
   ) 
 
   .Then(context => Console.WriteLine("Carry on")); 

5、Saga支持

了解分布式事务方案的童鞋应该都知道Saga,在Workflow-Core中也有支持,这是一个十分有用的功能:

(1)比如:在创建一个客户信息之后,将其推送到Salesforce和ERP,如果推送过程中发生了错误,那么就通过重试进行补偿,并且重试有时间间隔。

builder 
 
           .StartWith<CreateCustomer>() 
 
           .Then<PushToSalesforce>() 
 
               .OnError(WorkflowErrorHandling.Retry, TimeSpan.FromMinutes(10)) 
 
           .Then<PushToERP>() 
 
               .OnError(WorkflowErrorHandling.Retry, TimeSpan.FromMinutes(10)); 

(2)又比如:当Task2发生异常时,Workflow-Core会帮助执行UndoTask2 和 UndoTask1 帮你回滚数据以恢复状态。

builder 
 
   .StartWith<LogStart>() 
 
   .Saga(saga => saga 
 
       .StartWith<Task1>() 
 
           .CompensateWith<UndoTask1>() 
 
       .Then<Task2>() 
 
           .CompensateWith<UndoTask2>() 
 
       .Then<Task3>() 
 
           .CompensateWith<UndoTask3>() 
 
   ) 
 
   .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromMinutes(10)) 
 
   .Then<LogEnd>(); 

更多Saga示例,请参考:https://github.com/danielgerlag/workflow-core/tree/master/src/samples/WorkflowCore.Sample17

03

ASP.NET Core中使用Workflow-Core

1、注入与初始化

(1)注入:使用AddWorkflow()扩展方法

     public void ConfigureServices(IServiceCollection services) 
 
       { 
 
           services.AddWorkflow(); 
 
           services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2); 
 
       } 

(2)初始化:

   public void Configure(IApplicationBuilder app, IHostingEnvironment env) 
 
   { 
 
           ....... 
 
           app.UseWorkflow(); 
 
   } 

扩展方法如下:

   public static class ConfigureExtensions 
 
   { 
 
       public static IApplicationBuilder UseWorkflow(this IApplicationBuilder app) 
 
       { 
 
           var host = app.ApplicationServices.GetService<IWorkflowHost>(); 
 
           host.RegisterWorkflow<EdcWorkflow>(); 
 
           host.RegisterWorkflow<EdcDataWorkflow, EdcData>(); 
 
           host.Start(); 
 


 
           var appLifetime = app.ApplicationServices.GetService<IApplicationLifetime>(); 
 
           appLifetime.ApplicationStopping.Register(() => 
 
           { 
 
               host.Stop(); 
 
           }); 
 


 
           return app; 
 
       } 
 
   } 

这里需要注意的就是:将你要用到的所有Workflow都事先进行Register注册。

在你想要用到的地方,无论是Controller还是Service,通过依赖注入获取到Host,并使用它:

   [Route("api/[controller]")] 
 
   [ApiController] 
 
   public class ValuesController : ControllerBase 
 
   { 
 
       private IWorkflowController _workflowService; 
 


 
       public ValuesController(IWorkflowController workflowService) 
 
       { 
 
           _workflowService = workflowService; 
 
       } 
 


 
       // GET api/values 
 
       [HttpGet] 
 
       public async Task<IEnumerable<string>> Get() 
 
       { 
 
           await _workflowService.StartWorkflow("EdcWorkflow"); 
 
           return new string[] { "EdcWorkflow v1" }; 
 
       } 
 


 
       // GET api/values/5 
 
       [HttpGet("{id}")] 
 
       public async Task<string> Get(int id) 
 
       { 
 
           await _workflowService.StartWorkflow("EdcDataWorkflow", new EdcData() { Id = id }); 
 
           return "EdcDataWorkflow v1"; 
 
       } 
 
   } 

这两个Workflow的定义如下:

   public class EdcWorkflow : IWorkflow 
 
   { 
 
       public string Id => "EdcWorkflow"; 
 


 
       public int Version => 1; 
 


 
       public void Build(IWorkflowBuilder<object> builder) 
 
       { 
 
           builder 
 
               .StartWith<HelloWorld>() 
 
               .Then<GoodbyeWorld>(); 
 
       } 
 
   } 
 


 
   public class EdcDataWorkflow : IWorkflow<EdcData> 
 
   { 
 
       public string Id => "EdcDataWorkflow"; 
 


 
       public int Version => 1; 
 


 
       public void Build(IWorkflowBuilder<EdcData> builder) 
 
       { 
 
           builder 
 
               .StartWith<HelloWorld>() 
 
               .If(data => data.Id < 3).Do(then => then 
 
                       .StartWith<PrintMessage>() 
 
                           .Input(step => step.Message, data => "Passed Id is less than 3") 
 
               ) 
 
              .If(data => data.Id < 5).Do(then => then 
 
                       .StartWith<PrintMessage>() 
 
                           .Input(step => step.Message, data => "Passed Id is less than 5") 
 
               ) 
 
               .Then<GoodbyeWorld>(); 
 
       } 
 
   } 

示例结果很简单:

(1)api/values

(2)api/values/1

04

小结

** **

**

**

Workflow-Core是一个适合.NET Core的优秀的轻量级工作流引擎,对于小型工作流和责任链类型的需求开发很适合,可以节约大量时间避免重复造轮子,将时间主要花在业务逻辑上面。当然,这里演示的示例只是众多功能特性中的一小部分,我只是选取了我用到的部分而已,大家有兴趣的话可以去GitHub上先给个star再仔细研究其wiki文档,应用到自己的项目中去。


**恰童鞋骚年,风华不再正茂,仍想挥斥方遒 **

点个在看少个bug  👇

回到顶部