消息驱动—— Spring Cloud Stream
发布于 15 天前 作者 yan 58 次浏览

(给ImportNew加星标,提高Java技能)

作者:sprinkle_liz

www.jianshu.com/p/1621becd3c4a

在进入正题之前,我们先来设想一个场景。有两个人在对话,其中一个人问了另一个人一个问题,这个问题比不简单,得出正确答案需要花点时间。那么问题来了,问问题的一方,在对方给出答案之前,会怎么做呢?是一直做在那里等呢,还是去做其它待会必须去做的事呢,比如boss昨天才交代今天解决的bug;等到对方回复后,再根据回复的内容继续谈话或者做其它。如果是我,我会选择第二种(我还是很敬业的);当然若对方是美女,那我不介意在坐在她面前欣赏她思考的样子(哈哈,不小心暴露了)。

设想上面的场景的目的,并不是想说明我有多敬业(饥渴),我想表达的是:我们与世界的互动并不是同步的、线性的、严格按照提问-回应的模式进行的,而是消息驱动,不断的接发信息。当我们接收到消息,会对这些消息做出反应,当然,我们也经常会被打断正在执行的主要工作。

这一教程将会介绍的是,如何设计并实现,能通过异步消息与其它微服务互相交互的微服务。使用异步消息在应用间互相通信并不是什么新概念,新的概念是使用消息来传达事件状态的改变——Event Driven Architecture(EDA),即事件驱动架构,也可以称为Message Driven Architecture(MDA),消息驱动架构。基于事件驱动架构,我们可以构建高度解耦的系统,需要互相通信的服务不用通过特定的库或其它服务紧密耦合在一起。当与微服务结合时,我们只需让服务监听应用程序发出的事件(消息)流,接收到事件(消息)后作出对应的响应,就可以在应用程序中快速添加新功能。

Spring Cloud的子项目Spring Cloud Stream,能让基于消息驱动的应用开发变得更加简单,使用它,我们可以很容易地就能实现“消息发布和消费”,而且会对底层消息传递平台(后文会介绍)屏蔽服务(包括发布者和消费者)的实现细节。

消息传递、事件驱动架构和微服务

在基于微服务的应用中,为什么消息传递的地位如此重要?回答这个问题时,我们会用到整个教程都涉及到的两个服务——license和organization服务。不妨想象一下,当这些服务部署到生产环境中后,发现license服务在访问organization服务获取organization信息的时候,需要花费很长时间。不过幸运的是,发现了organization数据的使用模式是:organization的数据很少变更,而且是使用主键从organization服务获取数据的。所以将organization服务获取的数据缓存起来,这样一来就能大大降低访问license服务时的响应时间(获取license数据时需要获取对应的organizationId的organization数据)。

在使用实现缓存的解决方案时,需要特别注意的有三点:

  • 需要保证license服务所有实例获取到的缓存数据保持一致。这意味着不能将数据缓存到license服务每个实例中,因为我们需要保证不管哪个实例来获取数据,都要保证返回的organization数据是一致的。
  • 不能将organization数据缓存在托管license服务的容器的内存中。服务托管在的容器的资源一般都是有限的,而且本地缓存会引入更大的复杂性,因为必须确保本地缓存与集群中的其它服务实例的缓存是同步的。
  • 当一个organization记录发生改变,比如更新或删除,license服务就需要去确认organization服务的哪些数据状态发生了变更。然后license服务需要让发生变更的organization缓存数据无效并从缓存中移除或更新。

针对上面说的三点,现在有2个解决方案。第一种,使用同步的“请求-响应”模式;当organization数据状态发生变更,license和organization服务通过它们各自的REST端点来回通信,比如organization调用license服务的端点通知license某个organization的状态发生改变,你那边的缓存需要做处理,license处理完后响应organization服务。第二种,organization服务在数据发生变更后,organization服务将这一变更通过异步事件(消息)发送出去;换句话说,organization服务会将某个organization记录发生了变更这一事件发布到一个队列中,而license服务一直在监听消息传递平台,当监听到队列中有新的事件(来自organization数据变更的事件)后,会“消费”这一事件,即对缓存做对应的处理——更新或移除。下面进一步介绍这两种方案的实现细节。

同步的“请求-响应”模式

为缓存organization数据,我们使用Redis数据库来存储。redis是一个分布式的以键值对的形式来存储数据的数据库。下图说明了如何使用“请求-响应”模式来实现缓存的。

上图中,当用户请求license服务,license服务也需要查询获取organization数据。license服务会优先根据organization Id从集群redis中获取数据,如果找不到,license服务则会向organization服务发送请求,得到正确返回结果之后,返回license数据给用户之前,license服务会把organization数据存储到redis中。此时,若有人通过organization的端点更新或删除organization记录,那么organization服务在完成业务逻辑后,还需要访问license服务,告知对应的organization记录已经无效,需要更新或从缓存中删除。这一步中,至少存在三个问题/隐患:

  • organization和license服务高度耦合。
  • 这种耦合会使服务间交互的灵活性变脆弱。如果license服务提供的更新缓存的端点变更,那么organization服务也需要跟着改变。
  • 这种方案极不灵活。因为我们无法在不修改organization服务源码的情况下,给“organization数据的变更”这一事件添加新的消费者,即当organization数据变更后,别的服务也做相应的业务逻辑。

下面对这3点做进一步分析。

服务间高度耦合

在之前的图中,我们可以看到license和organization服务间紧密耦合。license服务依赖organization服务获取数据,另外,当organization记录发生变更后,organization服务还需要访问license服务,通知缓存中的哪个organization已经失效,所以organization服务需要license服务暴露一个端点来完成这一需求,这样一来,organization服务也跟license服务耦合在一起。当然,还有另一种做法,organization服务直接与license服务的redis服务器交互,让redis中的某条记录失效。

但是,organization服务直接与license服务的redis服务器交互这一做法在微服务环境中本身就是一大禁忌。当然,肯定会有人反驳说:缓存中的数据本身就是属于organization服务的,license服务只是在特定上下文使用它们或者围绕这些数据构建业务。但是,让organization服务直接license服务redis中的数据打交道,会使organization服务依赖于license服务的redis,也极易打破license服务已经建立实现的规则。

服务变得脆弱

license和organization服务的高度耦合会使这两个服务变得更脆弱。如果license服务挂掉或处理能力下降,organization服务也会因此受到影响,因为organization服务现在需要直接与license服务交互,即依赖于license服务。

为organization服务添加消费者不灵活

如果有另一个服务也需要“监听”organization数据的改变,那么必须在organization服务为这一服务添加一个远程调用逻辑。这意味着organization必须添加新的代码并重新编译、部署。想一想,如果以后有多个这样的服务(需要消费organization数据变更事件),甚至许多类似organization服务和license服务这样高度耦合在一起的服务群,并且使用同步的“请求-响应”模式,那么应用内服务的“织网”表演将从此开始。最后你会发现,这个应用的失败,这张“网”占了很重的分量。

使用消息传递实现服务间的交互

使用消息传递方案,将会在license服务和organization服务间引入一个消息队列。该队列并不是用来从organization服务获取数据,而是当organization服务数据发生变更后,可以将这一消息发布到队列中。下图说明了具体细节:

上图展示的模式,每一次organization数据发生变更,organization服务会发布消息到队列中。而license一直在监听来自organization的数据变更消息,当发现队列中发布新的消息后会立刻消费,即根据消息内容执行相应的逻辑,如更新缓存或直接失效。在这一模式中,消息队列扮演的是一个处于license服务和organization服务的中间人的角色,即前文提到的“底层消息传递平台”。这一模式能带来许多好处,可以简单概括为以下4点:

  • 低耦合
  • 持久性
  • 高可扩展性
  • 高灵活性

低耦合

一个微服务应用可以由许多个小的、分散的服务组成,这些服务间大都需要与其他服务交互,而且可能对其它服务管理的数据“感兴趣”。之前提到使用同步的方式,同步的HTTP响应会让license和organization两个服务对彼此产生极大的依赖。虽然,我们无法完全消除这种依赖,但可以尽量让这种依赖减弱,服务只暴露直接管理自己数据的端点。消息传递模型可以解耦两个服务间的依赖,因为对organization服务来说,当需要发布数据状态变更的消息,只需将该消息发布到消息队列中;而对license服务而言,只负责对消息进行消费,并不关心是谁发布的消息。

持久性

消息队列的存在,可以确保消息一定会被传递出去,即使消费方服务已经挂掉。也就是说,即使license服务已经不可用,organization服务依然可以将消息发布到消息队列中。这些消息会被存储在消息队列中,直到license服务可用后才开始消费这些积攒了许多的消息。相反的,缓存和消息队列的结合,当organization服务挂掉,license服务可以优雅的降级,因为至少有部分organization数据在license服务的缓存中。有时,过时的数据总比缺少数据来得强。

高可扩展性

由于消息发布出去后会存储在消息队列中,所以小心的发布者并不需要等待消费者消费后返回的响应消息,因此它们可以继续它们的工作。同样的,当消费方的一个消费者(实例)已经无法尽快地从队列中读取消息,那么可以启动更多的消费方服务来处理这些“溢出”的消息。而传统的扩展机制是增加处理消息的线程数量,这样,一个消费者就能足以应对。不幸的是,若采用这种办法,那么消息的消费者的处理性能最后将受限CPU的核心数量,当服务发布消息频率再次增高,处理熟读又无法满足需求,最后只能通过部署到性能更强大的机器上。而通过启动更多消费者实例这种可扩展性极强的方法则非常适用于微服务模式,因为启动一个微服务的更多实例来相对于性能强大的机器来说是微不足道的,毕竟这些微服务随便部署在普通机器上就能很好地运行。

高灵活性

消息的发布者并不知道谁会去消费这些消息,这意味着可以很轻松地加入新的消息消费者,重要的是这并不会对消息的发布者有任何影响。这是一个非常强大的优势,因为完全可以在添加拥有新功能的微服务到应用中的情况下,不会影响到其它已存在的微服务。新添加的服务只需监听事件的发布然后对其做出响应即可。

Spring Cloud Stream

Spring Cloud Stream可以很容易实现将消息传递模式应用到基于Spring的微服务应用中。Spring Cloud Stream子项目官网https://cloud.spring.io/spring-cloud-stream/。Spring Cloud Stream是一个注解驱动框架,所以可以使用简单的几个注解就能在应用中构建消息的发布者和消费者。

Spring Cloud Stream还支持我们将消息传递平台的实现细节抽象出来,Spring Cloud 只提供与平台无关的接口。这意味着可以将消息传递平台详细的实现细节从应用代码中抽离出来,然后使用已经实现消息传递的平台,Spring Cloud Stream支持的消息传递平台包括Apache Kafka和RabbitMQ,这样,应用中就可以直接使用与具体平台无关Spring接口实现消息的发布和消费(本教程会对RabbitMQ消息总线的使用进行介绍,因为Kafka我也不怎么熟悉,囧)。

为了了解Spring Cloud Stream,我们首先对Spring Cloud Stream的架构进行介绍并熟悉Spring Cloud Steam相关术语的含义。如果是第一次接触消息传递平台,先打个预防针,接下来涉及的东西学习起来可能会有点吃力。

Spring  Cloud Stream架构

首先,我们假设两个服务通过消息传递进行交互来介绍Spring Cloud Steam的架构。一个是消息发布者,另一个是消息消费者。如下图,借助Spring Cloud Stream来实现消息的传递:

Spring Cloud Stream架构

Spring Cloud中消息的发布和消费涉及到4个组件:

  • Source
  • Channel
  • Binder
  • Sink

Source

当服务发布消息前的前置业务完成后会通过Source将消息发布出去。Source是一个Spring注解接口,它可以将代表消息主体的POJO对象发布到消息管道(Channel)中,发布之前会把该消息对象序列化(默认使用JSON)。

Channel

Channel(消息管道)是消息队列的进一步抽象,它会保存消息生产者发布的或者消息消费者接收到的消息。消息管道的名称一般与目标队列名称相关联。然而,消息队列的名称不会直接在代码中暴露,相反管道名称则会被用在代码中,所以只能在配置文件中配置,为消息管道选取正确的消息队列进行读和写,而不是在代码中体现。

Binder

Binder则是Spring Cloud Stream框架的一部分。它是由Spring Cloud Stream实现的用来与特殊的消息平台交互。因为Binder是由Spring Cloud Stream实现的,所以我们可以在不需要暴露特殊消息平台的类库和API的情况下就能实现对消息的发布和消费。下文你将会看到它的强大之处。

Sink

在Spring Cloud Stream中,当从消息队列接收到一条消息后,需要通过Sink。Sink能监听进入管道中的消息并将消息反序列化成一个POJO对象。之后,消息就能给业务逻辑使用了。

**安装RabbitMQ **

本教程主要使用RabbitMQ做示例。其实无论使用RabbitMQ或是Kafka,代码是一样的,不一样的只是配置,在配置消息中间件时有一点点不一样。

在学习接下来的内容之前,需要先在本地安装RabbitMQ,至于RabbitMQ的安装,这里就不给出了,网上的入门教程大把。烦请第一次接触RabbitMQ的童鞋自己在网上找找。注意,安装RabbitMQ之前还需安装eralang。

消息发布和消费的实现

上面我们已经简单介绍了Spring Cloud Stream涉及到的几个组件,下面开始编写一个简单的Spring Cloud 例子。在该例子中,我们会使用organization服务发布消息然后license服务消费消息,license服务接收到消息后只做最简单的消费——在控制台打印日志。

在organization服务实现消息发布者

接下来我们会实现,每当服务维护的organization数据发生变更(添加、更新或删除)时,organization服务会向一个RabbitMQ topic发布一条消息,表明organization数据变更事件已经发生。

发布出去的消息包含与该数据变更事件相关的organization ID和数据变更行为(添加、更新或删除)。

pom文件

实现消息的发布,第一件事就是在pom文件引入需要的启动依赖。启动依赖很简单,只有一个,在organization服务的pom文件中添加如下依赖:

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency> 

核心类

引入需要的依赖后,我们就可以大展拳脚了。Show Time!

首先,来看下在消息发布端需要创建的几个类或接口,如下图:

其中包含两个类一个接口,看它们的名字大概就能猜出各自的作用了。下面来看具体源码:

OrgChangeModel类:

public class OrgChangeModel {
   private String type;
   private String action;
   private String organizationId;
   private String correlationId;

   public OrgChangeModel() {}

   public OrgChangeModel(String type, String action, String organizationId, String correlationId) {
       super();
       this.type   = type;
       this.action = action;
       this.organizationId = organizationId;
       this.correlationId = correlationId;
   }

   public String getType() {
       return type;
   }

   public void setType(String type) {
       this.type = type;
   }

   public String getAction() {
       return action;
   }

   public void setAction(String action) {
       this.action = action;
   }


   public String getOrganizationId() {
       return organizationId;
   }

   public void setOrganizationId(String organizationId) {
       this.organizationId = organizationId;
   }


   public String getCorrelationId() {
       return correlationId;
   }

   public void setCorrelationId(String correlationId) {
       this.correlationId = correlationId;
   }
}
若仔细观察OrgChangePublisher的代码,大概可以猜出,该类其实就是一个消息模型,是一个POJO,是用来承载需要传递的消息,换句话说就是消息的载体。另外该模型在被发送出去的时候,会被序列化成json(默认)。 

OrgChangeSource接口:

public interface OrgChangeSource {

   [@Output](/user/Output)("orgChangeOutput")
   MessageChannel output();
} 

OrgChangeSource接口很简单,现在只有一个方法,该方法返回的是一个MessageChannel,但不简单的是该方法上面的注解——@output,加上@output注解,Spring Cloud Stream会自动实现一个返回MessageChannel(消息管道)的方法。另外,注解@output有一个属性——value,用来自定义方法返回的消息管道的名称。

OrgChangePublisher类:

[@EnableBinding](/user/EnableBinding)(OrgChangeSource.class)
public class OrgChangePublisher {

   private static final Logger logger = LoggerFactory.getLogger(OrgChangePublisher.class);

   @Autowired
   private OrgChangeSource source;

   public void publish(String action, String orgId) {
       OrgChangeModel model = new OrgChangeModel(
               OrgChangeModel.class.getTypeName(),
               action,
               orgId,
               UserContextHolder.getContext().getCorrelationId());
       logger.info("sending rabbitmq message {} for Organization Id: {}",
               action, orgId);
       source.output().send(MessageBuilder.withPayload(model).build());
   }
} 

首先可以看到该类上面有一个@EnableBinding注解,下图是官方文档对该注解的介绍:

从上图可以知道,@EnableBinding可以让一个Spring应用变成一个Spring Cloud Stream应用,该注解可以加在应用中的其中一个配置类上。另外,该注解中也只有一个value属性,用来接收一个Class类(在我们看来其实是接口)数组,这些Class类包含一个或多个接口方法,如上面的OrgChangeSource,这些方法都返回可绑定的组件(Channel)。

接着再来看OrgChangePublisher#publish()方法的逻辑。其中最关键的一步就是:

source.output().send(MessageBuilder.withPayload(model).build()); 

我们知道source是Spring注入的由Spring Cloud Stream帮我们实现的Source实例,(注意,只有@EnableBinding的value包含了对应的Source接口,此处为OrgChangeSource,Spring Cloud Stream才会知道需要帮我们实现,若去掉@EnableBinding注解中的OrgChangeSource.class,将无法成功启动服务),output()方法是我们之前在OrgChangeSource中定义的接口方法,返回的是一个实现MessageChannel接口的对象,其中只有两个send方法。方法签名分别为:

boolean send(Message<?> message);

boolean send(Message<?> message, long timeout); 

Channel#send()方法能将一个Message发送到这个Channel中。如果发送成功返回true。这个方法可能会无限期阻塞,取决于使用哪种实现。所以第二个方法的timeout参数就是来控制超时时间的。send方法在发送消息时会阻塞线程,直到消息发送成功或超时发送失败。

而MessageBuilder是默认的消息构建器,它的静态方法withPayload()接收的就是我们需要发送的消息(负载),返回的是一个MessageBuilder对象,MessageBuilder对象调用build()方法最终产生一个Message对象。最后由send()方法发送到Channel中。

发布消息

我们的目的是在organization数据发生变更后,通过消息传递将这一信息通知license服务。接下来介绍如何使用Publisher来发布消息。

修改OrganizationService类的updateOrg()和deleteOrg()方法,如下:

[@Service](/user/Service)
public class OrganizationService {
   @Autowired
   private OrganizationRepository orgRepository;

   @Autowired
   private OrgChangePublisher orgChangePublisher;

   public Organization getOrg(String organizationId) {
       return orgRepository.findById(organizationId);
   }
   ...
   public void updateOrg(Organization org) {
       orgRepository.save(org);
       orgChangePublisher.publish("update", org.getId());
   }

   public void deleteOrg(Organization org){
       orgRepository.delete( org.getId());
       orgChangePublisher.publish("delete", org.getId());
   }
} 

可以看到,在OrganizationService中注入了OrgChangePublisher,然后在update()和delete()方法中使用,即organization记录更新或删除成功后会将消息发布出去。

配置文件

当所有核心类实现后,最后一步是配置使用哪种消息中间件及其环境,还有消息管道的绑定关系。配置如下:

spring:
 ...
 cloud:
   stream:
     binders:
       rabbitmq:
         type: rabbit
         environment:
           spring:
             rabbitmq:
               host: localhost
               port: 5672
               username: guest
               password: guest
     bindings:
       orgChangeOutput:
         destination: orgChangeTopic
         content-type: application/json
         binder: rabbitmq 

可以看到,主要涉及到两个属性的配置:spring.cloud.stream.binders和spring.cloud.stream.bindings,这两个属性都接收一个Map集合,即可以配置多个binder和 管道(Channel)。binders Map的键值是binder的名称,这个名称需要在bindings配置中用到;binding的键值是定义的管道的名称,取注解@Output@Input的value值,如OrgChangeSource#output()方法上注解@Output的value值"orgChangeOutput"。其它属性的含义如下图所示:

这样,消息发布者的工作就全部完成,接下来是实现消息消费者。

在license服务实现消息消费者

pom文件

同样,license服务也需要引入消息驱动的启动依赖。跟消息发布者引入的依赖一样:

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency> 

核心类

下图是消息消费端需要创建的几个类或接口:

OrgChangeModel类:

OrgChangeModel.java必须与organization服务的一样,因为license服务会在接收到消息后将消息内容(Model)反序列化,所以消息发布端和消费端的消息模型(Model)必须保持一致。一般会将消息模型提取到一个公共库,然后发布端和消费端就可以引用同一个了。这里就不给出源码了。

OrgChangeSink接口

public interface OrgChangeSink {

   [@Input](/user/Input)("orgChangeInput")
   SubscribableChannel input();
} 

上面的接口同样只有一个方法,与OrgChangeSource不同的是,该接口方法上的注解变成@Input,但原理与@Output差不多,注解中的"orgChangeInput"同样的Channel(管道)的名称。该接口同样是交由Spring Cloud Stream来实现。

OrgChangeHandler

[@EnableBinding](/user/EnableBinding)(OrgChangeSink.class)
public class OrgChangeHandler {

   private static final Logger logger = LoggerFactory.getLogger(OrgChangeHandler.class);

   [@StreamListener](/user/StreamListener)("orgChangeInput")
   public void handle(OrgChangeModel model) {
       logger.info("Received a message of type " + model.getType());
       switch(model.getAction()){
           case "get":
               logger.info("Received a GET event from the organization service for organization id {}", model.getOrganizationId());
               break;
           case "save":
               logger.info("Received a SAVE event from the organization service for organization id {}", model.getOrganizationId());
               break;
           case "update":
               logger.info("Received a UPDATE event from the organization service for organization id {}", model.getOrganizationId());
               break;
           case "delete":
               logger.info("Received a DELETE event from the organization service for organization id {}", model.getOrganizationId());
               break;
           default:
               logger.error("Received an UNKNOWN event from the organization service of type {}", model.getType());
               break;
       }
   }
} 

将接收到的消息反序列化成对应的Model后,就可以使用Model做相应的业务了。这里只是在控制台中打印。实际开发中就不会这么简单了,比如需要操作数据库等等。

源码中还出现了一个注解@StreamListener,该注解能用来接听某个管道,当监听到管道中有消息到来,机会接收然后将消息反序列化,最后执行相应的业务,这里为handle()方法。

消费端的核心代码已经介绍完,最后还需要配置一些必要的属性。

配置文件

配置如下:

spring:
 ...
 cloud:
   stream:
     binders:
       rabbitmq:
         type: rabbit
         environment:
           spring:
             rabbitmq:
               host: localhost
               port: 5672
               username: guest
               password: guest
     bindings:
       orgChangeInput:
         destination: orgChangeTopic
         content-type: application/json
         group: licenseGroup
         binder: rabbitmq 

消费端的配置与发布端的配置绝大多数是一样的,不一样的是在配置OrgChangInput Channel的时候,多了一个属性:OrgChangInput.group。实际上,一个消息发布者可以有多个消息消费者,也就是说,还可以有另外的服务也去监听organization服务发布的同一个消息事件。分组的作用是将不同的服务隔离开,服务间的监听互不影响,若不分组,那么发布的消息,只有其中的某个服务的能接收到并消费;而分组后,会将消息的副本分别发送到所有监听该消息事件的服务对应的管道中,最后消息会由服务的某个实例消费。

发布并消费消息

最后启动organization服务和license服务。然后使用postman访问http://localhost:11000/v1/organizations/e254f8c-c442-4ebe-a82a-e2fc1d1ff78a,http方法为PUT,请求体为:

{
   "id": "e254f8c-c442-4ebe-a82a-e2fc1d1ff78a",
   "name": "customer-crm-co",
   "contactName": "Mark Balster",
   "contactEmail": "mark.balster@custcrmco.com",
   "contactPhone": "823-555-1213"
} 

然后,观察organization服务和license服务的控制台,可以看到类似如下输出:

organization服务控制台输出:

license服务控制台输出:

证明license服务接收到由organization服务发布的消息,并消费(打印日志)了。

完!

(git上的源码:https://gitee.com/rain7564/spring_microservices_study/tree/master/sixth-spring-cloud-stream)

推荐阅读

(点击标题可跳转阅读)

spring-cloud 中 zuul 的两种隔离机制实验

从 Spring Cloud 看一个微服务框架的「五脏六腑」

Spring Boot 基础教程 ( 二 ) :快速构建 Spring Boot/Cloud 工程

看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

好文章,我在看❤️

回到顶部