什么是Saga,Saga能做什么
发布于 11 天前 作者 yan 37 次浏览

前言

Saga单词翻译过来是指尤指古代挪威或冰岛讲述冒险经历和英雄业绩的长篇故事,对,这里强调长篇故事。许多系统都存在长时间运行的业务流程,NServiceBus使用基于事件驱动的体系结构将容错性和可伸缩性融入这些业务处理过程中。           当然一个单一接口调用则算不上一个长时间运行的业务场景,那么如果在给定的用例中有两个或多个调用,则应该考虑数据一致性的问题,这里有可能第一个接口调用成功,第二次调用则可能失败或者超时,Saga的设计以简单而健壮的方式处理这样的业务用例。

认识Saga

先来通过一段代码简单认识一下Saga,在NServiceBus里,使用Saga的话则需要实现抽象类Saga,SqlSaga,这里的T的是Saga业务实体,封装数据,用来在长时间运行过程中封装业务数据。

public class Saga:Saga<State>,
       IAmStartedByMessages<StartOrder>,
       IHandleMessages<CompleteOrder>
   {
       protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
       {
           mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
           mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
       }

       public Task Handle(StartOrder message, IMessageHandlerContext context)
       {
           return Task.CompletedTask;
       }

       public Task Handle(CompleteOrder message, IMessageHandlerContext context)
       {
           MarkAsComplete();
           return Task.CompletedTask;
       }
   } 

临时状态

长时间运行则意味着 有状态 ,任何涉及多个网络调用的进程都需要一个临时状态,这个临时状态可以存储在内存中,序列化在磁盘中,也可以存储在分布式缓存中。在NServiceBus中我们定义实体,继承抽象类ContainSagaData即可,默认情况下,所有公开访问的属性都会被持久化。

public class State:ContainSagaData
{
   public Guid OrderId { get; set; }
} 

添加行为

在NServiceBus里,处理消息的有两种接口:IHandlerMessages、IAmStartedByMessages。

开启一个Saga

在前面的代码片段里,我们看到已经实现了接口IAmStartedByMessages,这个接口告诉NServiceBus,如果收到了StartOrder 消息,则创建一个Saga实例(Saga Instance),当然Saga长流程处理的实体至少有一个需要开启Saga流程。

处理无序消息

如果你的业务用例中确实存在无序消息的情况,则还需要业务流程正常轮转,那么则需要多个messaeg都要事先接口IAmStartedByMessages接口,也就是说多个message都可以创建Saga实例。

依赖可恢复性

在处理无序消息和多个消息类型的时候,就存在消息丢失的可能,必须在你的Saga状态完成以后,这个Saga实例又收到一条消息,但这时Saga状态已经是完结状态,这条消息则仍然需要处理,这里则实现NServiceBus的IHandleSagaNotFound接口。

public class SagaNotFoundHandler:IHandleSagaNotFound
{
   public Task Handle(object message, IMessageProcessingContext context)
   {
       return context.Reply(new SagaNotFoundMessage());
   }
}
 
public class SagaNotFoundMessage
{
       
} 

结束Saga

当你的业务用例不再需要Saga实例时,则调用 MarkComplete() 来结束Saga实例。这个方法在前面的代码片段中也可以看到,其实本质也就是设置Saga.Complete属性,这是个bool值,你在业务用例中也可以用此值来判断Saga流程是否结束。

namespace NServiceBus
{
   using System;
   using System.Threading.Tasks;
   using Extensibility;

   public abstract class Saga
   {
       /// <summary>
       /// The saga's typed data.
       /// </summary>
       public IContainSagaData Entity { get; set; }

       
       public bool Completed { get; private set; }

       internal protected abstract void ConfigureHowToFindSaga(IConfigureHowToFindSagaWithMessage sagaMessageFindingConfiguration);

      
       protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at) where TTimeoutMessageType : new()
       {
           return RequestTimeout(context, at, new TTimeoutMessageType());
       }

       
       protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at, TTimeoutMessageType timeoutMessage)
       {
           if (at.Kind == DateTimeKind.Unspecified)
           {
               throw new InvalidOperationException("Kind property of DateTime 'at' must be specified.");
           }

           VerifySagaCanHandleTimeout(timeoutMessage);

           var options = new SendOptions();

           options.DoNotDeliverBefore(at);
           options.RouteToThisEndpoint();

           SetTimeoutHeaders(options);

           return context.Send(timeoutMessage, options);
       }

       
       protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within) where TTimeoutMessageType : new()
       {
           return RequestTimeout(context, within, new TTimeoutMessageType());
       }

       
       protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
       {
           VerifySagaCanHandleTimeout(timeoutMessage);

           var sendOptions = new SendOptions();

           sendOptions.DelayDeliveryWith(within);
           sendOptions.RouteToThisEndpoint();

           SetTimeoutHeaders(sendOptions);

           return context.Send(timeoutMessage, sendOptions);
       }

       
       protected Task ReplyToOriginator(IMessageHandlerContext context, object message)
       {
           if (string.IsNullOrEmpty(Entity.Originator))
           {
               throw new Exception("Entity.Originator cannot be null. Perhaps the sender is a SendOnly endpoint.");
           }

           var options = new ReplyOptions();

           options.SetDestination(Entity.Originator);
           context.Extensions.Set(new AttachCorrelationIdBehavior.State { CustomCorrelationId = Entity.OriginalMessageId });

           
           options.Context.Set(new PopulateAutoCorrelationHeadersForRepliesBehavior.State
           {
               SagaTypeToUse = null,
               SagaIdToUse = null
           });

           return context.Reply(message, options);
       }

       //这个方法结束saga流程,标记Completed属性
       protected void MarkAsComplete()
       {
           Completed = true;
       }

       void VerifySagaCanHandleTimeout<TTimeoutMessageType>(TTimeoutMessageType timeoutMessage)
       {
           var canHandleTimeoutMessage = this is IHandleTimeouts<TTimeoutMessageType>;
           if (!canHandleTimeoutMessage)
           {
               var message = $"The type '{GetType().Name}' cannot request timeouts for '{timeoutMessage}' because it does not implement 'IHandleTimeouts<{typeof(TTimeoutMessageType).FullName}>'";
               throw new Exception(message);
           }
       }

       void SetTimeoutHeaders(ExtendableOptions options)
       {
           options.SetHeader(Headers.SagaId, Entity.Id.ToString());
           options.SetHeader(Headers.IsSagaTimeoutMessage, bool.TrueString);
           options.SetHeader(Headers.SagaType, GetType().AssemblyQualifiedName);
       }
   }
} 

Saga持久化

本机开发环境我们使用LearningPersistence,但是投产的话则需要使用数据库持久化,这里我们基于MySQL,SQL持久化需要引入NServiceBus.Persistence.Sql。SQL Persistence会生成几种关系型数据库的sql scripts,然后会根据你的断言配置选择所需数据库,比如SQL Server、MySQL、PostgreSQL、Oracle。      持久化Saga自动创建所需表结构,你只需手动配置即可,配置后编译成功后项目执行目录下会生成sql脚本,文件夹名称是NServiceBus.Persistence.Sql,下面会有Saga子目录。


/* TableNameVariable */

set [@tableNameQuoted](/user/tableNameQuoted) = concat('`', [@tablePrefix](/user/tablePrefix), 'Saga`');
set [@tableNameNonQuoted](/user/tableNameNonQuoted) = concat([@tablePrefix](/user/tablePrefix), 'Saga');


/* Initialize */

drop procedure if exists sqlpersistence_raiseerror;
create procedure sqlpersistence_raiseerror(message varchar(256))
begin
signal sqlstate
   'ERROR'
set
   message_text = message,
   mysql_errno = '45000';
end;

/* CreateTable */

set [@createTable](/user/createTable) = concat('
   create table if not exists ', [@tableNameQuoted](/user/tableNameQuoted), '(
       Id varchar(38) not null,
       Metadata json not null,
       Data json not null,
       PersistenceVersion varchar(23) not null,
       SagaTypeVersion varchar(23) not null,
       Concurrency int not null,
       primary key (Id)
   ) default charset=ascii;
');
prepare script from [@createTable](/user/createTable);
execute script;
deallocate prepare script;

/* AddProperty OrderId */

select count(*)
into [@exist](/user/exist)
from information_schema.columns
where table_schema = database() and
     column_name = 'Correlation_OrderId' and
     table_name = [@tableNameNonQuoted](/user/tableNameNonQuoted);

set [@query](/user/query) = IF(
   [@exist](/user/exist) <= 0,
   concat('alter table ', [@tableNameQuoted](/user/tableNameQuoted), ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status');

prepare script from [@query](/user/query);
execute script;
deallocate prepare script;

/* VerifyColumnType Guid */

set [@column_type_OrderId](/user/column_type_OrderId) = (
 select concat(column_type,' character set ', character_set_name)
 from information_schema.columns
 where
   table_schema = database() and
   table_name = [@tableNameNonQuoted](/user/tableNameNonQuoted) and
   column_name = 'Correlation_OrderId'
);

set [@query](/user/query) = IF(
   [@column_type_OrderId](/user/column_type_OrderId) <> 'varchar(38) character set ascii',
   'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', [@column_type_OrderId](/user/column_type_OrderId), \'.\'));',
   'select \'Column Type OK\' status');

prepare script from [@query](/user/query);
execute script;
deallocate prepare script;

/* WriteCreateIndex OrderId */

select count(*)
into [@exist](/user/exist)
from information_schema.statistics
where
   table_schema = database() and
   index_name = 'Index_Correlation_OrderId' and
   table_name = [@tableNameNonQuoted](/user/tableNameNonQuoted);

set [@query](/user/query) = IF(
   [@exist](/user/exist) <= 0,
   concat('create unique index Index_Correlation_OrderId on ', [@tableNameQuoted](/user/tableNameQuoted), '(Correlation_OrderId)'), 'select \'Index Exists\' status');

prepare script from [@query](/user/query);
execute script;
deallocate prepare script;

/* PurgeObsoleteIndex */

select concat('drop index ', index_name, ' on ', [@tableNameQuoted](/user/tableNameQuoted), ';')
from information_schema.statistics
where
   table_schema = database() and
   table_name = [@tableNameNonQuoted](/user/tableNameNonQuoted) and
   index_name like 'Index_Correlation_%' and
   index_name <> 'Index_Correlation_OrderId' and
   table_schema = database()
into [@dropIndexQuery](/user/dropIndexQuery);
select if (
   [@dropIndexQuery](/user/dropIndexQuery) is not null,
   [@dropIndexQuery](/user/dropIndexQuery),
   'select ''no index to delete'';')
   into [@dropIndexQuery](/user/dropIndexQuery);

prepare script from [@dropIndexQuery](/user/dropIndexQuery);
execute script;
deallocate prepare script;

/* PurgeObsoleteProperties */

select concat('alter table ', table_name, ' drop column ', column_name, ';')
from information_schema.columns
where
   table_schema = database() and
   table_name = [@tableNameNonQuoted](/user/tableNameNonQuoted) and
   column_name like 'Correlation_%' and
   column_name <> 'Correlation_OrderId'
into [@dropPropertiesQuery](/user/dropPropertiesQuery);

select if (
   [@dropPropertiesQuery](/user/dropPropertiesQuery) is not null,
   [@dropPropertiesQuery](/user/dropPropertiesQuery),
   'select ''no property to delete'';')
   into [@dropPropertiesQuery](/user/dropPropertiesQuery);

prepare script from [@dropPropertiesQuery](/user/dropPropertiesQuery);
execute script;
deallocate prepare script;

/* CompleteSagaScript */

生成的表结构:

持久化配置

Saga持久化需要依赖NServiceBus.Persistence.Sql。引入后需要实现SqlSaga抽象类,抽象类需要重写ConfigureMapping,配置Saga工作流程业务主键。

public class Saga:SqlSaga<State>,
       IAmStartedByMessages<StartOrder>
{
  protected override void ConfigureMapping(IMessagePropertyMapper mapper)
  {
     mapper.ConfigureMapping<StartOrder>(message=>message.OrderId);
  }

  protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);

  public Task Handle(StartOrder message, IMessageHandlerContext context)
  {
      Console.WriteLine($"Receive message with OrderId:{message.OrderId}");

      MarkAsComplete();
      return Task.CompletedTask;
   }
}
   
static async Task MainAsync()
{
    Console.Title = "Client-UI";

    var configuration = new EndpointConfiguration("Client-UI");
    //这个方法开启自动建表、自动创建RabbitMQ队列
    configuration.EnableInstallers();
    configuration.UseSerialization<NewtonsoftSerializer>();
    configuration.UseTransport<LearningTransport>();

    string connectionString = "server=127.0.0.1;uid=root;pwd=000000;database=nservicebus;port=3306;AllowUserVariables=True;AutoEnlist=false";
    var persistence = configuration.UsePersistence<SqlPersistence>();
    persistence.SqlDialect<SqlDialect.MySql>();
    //配置mysql连接串
    persistence.ConnectionBuilder(()=>new MySqlConnection(connectionString));

    var instance = await Endpoint.Start(configuration).ConfigureAwait(false);

    var command = new StartOrder()
    {
        OrderId = Guid.NewGuid()
    };

    await instance.SendLocal(command).ConfigureAwait(false);

    Console.ReadKey();

    await instance.Stop().ConfigureAwait(false);
} 

Saga Timeouts

在消息驱动类型的环境中,虽然传递的无连接特性可以防止在线等待过程中消耗资源,但是毕竟等待时间需要有一个上线。在NServiceBus里已经提供了Timeout方法,我们只需订阅即可,可以在你的Handle方法中根据需要订阅Timeout,可参考如下代码:

public class Saga:Saga<State>,
       IAmStartedByMessages<StartOrder>,
       IHandleMessages<CompleteOrder>,
       IHandleTimeouts<TimeOutMessage>
   {
       
       public Task Handle(StartOrder message, IMessageHandlerContext context)
       {
           var model=new TimeOutMessage();
           
           //订阅超时消息
           return RequestTimeout(context,TimeSpan.FromMinutes(10));
       }

       public Task Handle(CompleteOrder message, IMessageHandlerContext context)
       {
           MarkAsComplete();
           return Task.CompletedTask;
       }

       protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);


       public Task Timeout(TimeOutMessage state, IMessageHandlerContext context)
       {
           //处理超时消息
       }

       protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
       {
           mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
           mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
       }
   } 
//从Timeout的源码看,这个方法是通过设置SendOptions,然后再把当前这个消息发送给自己来实现
protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
{
    VerifySagaCanHandleTimeout(timeoutMessage);
    var sendOptions = new SendOptions();
    sendOptions.DelayDeliveryWith(within);
    sendOptions.RouteToThisEndpoint();
    SetTimeoutHeaders(sendOptions);

    return context.Send(timeoutMessage, sendOptions);
} 

总结

NServiceBus因为是商业产品,对分布式消息系统所涉及到的东西都做了实现,包括分布式事务(Outbox)、DTC都有,还有心跳检测,监控都有,全而大,目前我们用到的也只是NServiceBus里很小的一部分功能。

回到顶部