文章摘要: RabbitMQ支援多種訊息佇列協議產生和消費訊息 使用Spring Integration註解或者Spring Cloud Stream的@StreamListener註解可以進行訊息的傳送和消費
前言: 本文作者張天,節選自筆者與其合著的《Spring Cloud微服務架構進階》,即將在八月出版問世。本文將其中Spring Cloud Stream應用與自定義Rocketmq Binder的內容抽取出來,主要介紹Spring Cloud Stream的相關概念,並概述相關的程式設計模型。
概述
Spring Cloud Stream 簡介
Spring Cloud Stream 是一個用來為微服務應用構建訊息驅動能力的框架。它可以基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程式。他通過使用Spring Integration來連線訊息代理中介軟體以實現訊息事件驅動。Spring Cloud Stream 為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,引用了釋出-訂閱、消費組、分割槽的三個核心概念。Spring Cloud Stream目前僅支援RabbitMQ、Kafka。
訊息佇列簡介
訊息佇列中介軟體是分散式系統中最為重要的元件之一,主要解決應用耦合,非同步訊息,流量削鋒等問題,是大型分散式系統不可缺少的中介軟體。訊息佇列技術是分散式應用間交換資訊的一種技術,訊息可駐留在記憶體或磁碟上,佇列儲存訊息直到它們被應用程式讀走。通過訊息佇列,應用程式可以相對獨立地執行,它們不需要知道彼此的位置,只需要處理從訊息佇列傳送來的訊息和向訊息佇列傳送訊息。
訊息佇列的主要特點是非同步處理和解耦。其主要的使用場景就是將比較耗時而且不需要同步返回結果的操作作為訊息放入訊息佇列。同時由於使用了訊息佇列,只要保證訊息格式不變,訊息的傳送方和接受者並不需要彼此聯絡,也不需要受對方的影響,即解耦。
訊息佇列的使用場景有:
- 跨系統的非同步通訊,需要非同步互動的場景都可以使用訊息佇列。
- 訊息驅動的架構(EDA),系統分解為訊息佇列,訊息佇列製造者和訊息佇列消費者,一個是處理流程可以根據需求拆分成多個階段,每個階段之間通過佇列連線起來。
- 流量削鋒,它是訊息佇列中的常用場景之一,一般在秒殺或團搶活動中使用廣泛。秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉,為解決這個問題,一般需要在應用前端加入訊息佇列,來緩和流量的暴增。
在軟體的正常功能開發過程中,開發人員並不需要去刻意的尋找訊息佇列的使用場景,而是當出現效能瓶頸時,去檢視業務邏輯是否存在可以非同步處理的耗時操作,如果存在的話便可以引入訊息佇列來解決。否則盲目的使用訊息佇列可能會增加維護和開發的成本卻無法得到可觀的效能提升,那就得不償失了。
常見的訊息佇列
目前業界有四款常用的訊息佇列,它們分別是RabbitMQ、RocketMQ、ActiveMQ和Kafka。我們這裏主要介紹前兩種。
RabbitMQ
RabbitMQ在2007年釋出,是一個在AMQP(高階訊息佇列協議)基礎上完成的,可複用的企業訊息系統,是當前最流行的訊息中介軟體之一。
RabbitMQ的主要特性有:
- 可靠性: RabbitMQ提供了多種技術可以讓你在效能和可靠性之間進行權衡。這些技術包括永續性機制、投遞確認、釋出者證實和高可用性機制;
- 靈活的路由:訊息在到達佇列前是通過交換機進行路由的。RabbitMQ為典型的路由邏輯提供了多種內建交換機型別。如果你有更復雜的路由需求,可以將這些交換機組合起來使用,你甚至可以實現自己的交換機型別,並且當做RabbitMQ的外掛來使用;
- 訊息叢集:在相同區域網中的多個RabbitMQ伺服器可以聚合在一起,作為一個獨立的邏輯代理來使用;
- 佇列高可用:佇列可以在叢集中的機器上進行映象,以確保在硬體問題下還保證訊息安全;
- 多種協議的支援:RabbitMQ支援多種訊息佇列協議;
- 多語言支援:RabbitMQ的伺服器端用Erlang語言編寫,其客戶端支援基本所有程式語言;
- 管理介面: RabbitMQ有一個易用的用戶界面,使得使用者可以監控和管理訊息Broker的許多方面;
- 跟蹤機制:如果訊息異常,RabbitMQ提供訊息跟蹤機制,使用者可以跟蹤發現異常;
- 外掛機制:提供了許多外掛,來從多方面進行擴充套件,也可以編寫自己的外掛;
RabbitMQ的優點有:
- 由於erlang語言的特性,mq 效能較好,高併發;
- 健壯、穩定、易用、跨平臺、支援多種語言、文件齊全;
- 有訊息確認機制和持久化機制,可靠性高;
- 高度可定製的路由;
- 管理介面較豐富,在網際網路公司也有較大規模的應用;
- 社羣活躍度高;
RabbitMQ的缺點有:
- 儘管結合erlang語言本身的併發優勢,效能較好,但是不利於做二次開發和維護;
- 實現了代理架構,意味著訊息在傳送到客戶端之前可以在中央節點上排隊。此特性使得RabbitMQ易於使用和部署,但是使得其執行速度較慢,因為中央節點增加了延遲,訊息封裝後也比較大;
- 需要學習比較複雜的介面和協議,學習和維護成本較高;
RocketMQ
RocketMQ出自阿里公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,並做出了自己的一些改進,訊息可靠性上比 Kafka 更好。RocketMQ在阿里集團被廣泛應用在訂單,交易,充值,流計算,訊息推送,日誌流式處理,binglog分發等場景。
RocketMQ的主要特性有:
- 是一個佇列模型的訊息中介軟體,具有高效能、高可靠、高實時、分散式特點;
- Producer、Consumer、佇列都可以分散式;
- Producer向一些佇列輪流傳送訊息,佇列集合稱為Topic,Consumer如果做廣播消費,則一個consumer例項消費這個Topic對應的所有佇列,如果做叢集消費,則多個Consumer例項平均消費這個topic對應的佇列集合;
- 能夠保證嚴格的訊息順序;
- 提供豐富的訊息拉取模式;
- 高效的訂閱者水平擴充套件能力;
- 實時的訊息訂閱機制;
- 億級訊息堆積能力;
- 較少的依賴;
RocketMQ的優點有:
- 單機支援 1 萬以上持久化佇列;
- RocketMQ 的所有訊息都是持久化的,先寫入系統 PAGECACHE,然後刷盤,可以保證記憶體與磁碟都有一份資料;
- 模型簡單,介面易用(JMS 的介面很多場合並不太實用);
- 效能非常好,可以大量堆積訊息在broker中;
- 支援多種消費,包括叢集消費、廣播消費等。
- 各個環節分散式擴充套件設計,主從HA;
RocketMQ的缺點有:
- 支援的客戶端語言不多,目前是java及c++,其中c++不成熟;
- RocketMQ社羣關注度及成熟度也不及前兩者;
- 沒有web管理介面,提供了一個CLI(命令列介面)管理工具帶來查詢、管理和診斷各種問題;
- 沒有在訊息佇列的核心部分實現JMS等介面;
原理簡介
如圖是Stream原始碼的流程圖。Stream首先會動態註冊相關BeanDefinition,並且處理@StreamListener註解;然後在Bean例項初始化之後,會呼叫BindingService進行服務繫結;BindingService在繫結服務時會首先獲取特定的Binder繫結器,然後繫結Producer和Consumer;最後Stream的相關例項就會進行傳送和接受訊息的處理。
程式設計模型
Spring Cloud Stream提供了一系列的預先定義的註解來宣告輸入型和輸出型channel,業務系統基於這些channel與訊息中介軟體進行通訊,而不是直接與訊息中介軟體進行通訊。
宣告和繫結Channels
通過給業務應用的配置類新增 @EnableBinding
註解來將一個Spring應用轉變成Spring Cloud Stream應用。 @EnableBinding
註解本身擁有 @Configuration
元註解來進行相關配置並且會觸發Spring Cloud Stream框架的初始化機制。
@Configuration @EnableIntegration public @interface EnableBinding { ... Class>[] value() default {}; }
@EnableBinding
註解可以使用宣告輸入型和輸出行channel的介面類作為其value屬性值。 @EnableBinding
註解只能使用在業務系統的Configuration類上,可以提供儘可能多的介面類作為該註解的value屬性值,比如說 @EnableBinding(value={Order.class, Payment.class})
,Order和Payment都是宣告了channel的介面類。
在Spring Cloud Stream應用中,介面類可以通過被 @Input
和 @Output
註解修飾的函式來宣告的輸入型和輸出型channels。
public interface OnlineStore{ @Input SubscribableChannel orders(); #宣告輸入型channel,表示接收訂單 @Output MessageChannel stock(); #宣告輸出型channel,表示向供應商進貨 }
使用這個介面類當作 @EnableBinding
的value屬性值可以觸發Stream框架的初始化機制,建立兩個channel,名字分別為orders和stock,orders是輸入型channel,而stock是輸出型channel。
@EnableBinding(OnlineStore.class) public class ShopConfiguration{ ... }
自定義通道
使用 @Input
和 @Output
註解,程式設計人員可以給每個通道一個自定義的名稱,使用這個自定義通道,可以與訊息對立中相應的Channel進行互動。
public interface OnlineStore{ @Input("inboundOrders") SubscribableChannelorders(); }
在上邊程式碼示例中,自定義通道的名稱為inboundOrders,Stream框架會建立出名為inboundOrders的通道。
Spring Cloud Stream提供了預先設定的三種介面來定義輸入型channel和輸出型channel,它們是Source、Sink和Processor。Source用來宣告輸出型channel,它的通道名稱為output。Sink用來宣告輸入型channel,它的通道名稱為input。Processor則用來宣告輸出輸入型的channel。
# Source public interface Source { String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output(); } # Sink public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); } # Processor public interface Processor extends Source, Sink { }
產生和消費訊息
使用Spring Integration註解或者Spring Cloud Stream的@StreamListener註解可以進行訊息的傳送和消費。 @StreamListener
註解基於Spring Messaging註解(比如說@MessageMapping,@JmsListener,@RabbitListener),除此之外,該註解新增了內容(content)型別管理和型別強制等特性。
作為Spring Integration的補充,Spring Cloud Stream提供了它自己的@StreamListener註解,該註解構建在Spring Messaging註解的基礎上,比如說@MessageMapping、@JmsListener和 @RabbitListener
。 @StreamListener
註解提供了更加簡便處理輸入訊息的模型。
Spring Cloud Stream提供了可擴充套件的訊息轉換(MessageConverter)機制來處理資料轉換,並將轉換後的資料分配給對應的被 @StreamListener
修飾的方法。下面這個例子展示了一個處理外部訂單訊息的應用。
@EnableBinding(Sink.class) public class OrderHandler{ @Autowired OrderService orderService; @StreamListener(Sink.INPUT) public void handle(Order order){ orderService.handle(order); } }
假設,輸入的Message物件有一個string型別的Payload和一個值為application/json的contentType。在使用 @StreamListener
時, MessageConverter
會使用訊息的contentType來解析String型別的Payload並賦值給Order物件。
就像其他的Spring Messaging方法一樣,被 @StreamListener
註解的方法的引數可以使用 @Payload
和 @Headers
進行註解。對於返回資料的方法,必須使用 @SendTo
註解來指定該返回資料傳送到哪個輸出型channel。
@EnableBinding(Processor.class) public class TransformProcessor{ @Autowired VotingService votingService; @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public VoteResult handle(Vote vote){ return votingService.record(vote); } }
Spring Cloud Stream支援將訊息分配到多個 @StreamListener
修飾的方法。爲了能使用該分配機制,一個方法必須首先滿足下列條件:
- 方法不能有返回值。
- 方法必須是單獨一類訊息的處理函式。
使用註解的condition屬性中的SpEL表示式可以設定 @StreamListener
接收訊息的條件判斷。所有匹配了該condition的方法都會在同一個執行緒中被呼叫,但是方法呼叫相對順序不能保證。
下面就是一個 @StreamListener
分配訊息的例子。在這個例子中,所有頭部屬性type對應的值為food的訊息都會被分配給receiveFoodOrder方法,所有頭部屬性type對應的值為compute的訊息都會被分配給receiveComputeOrder方法。
@EnableBinding(Sink.class) @EnableAutoConfiguration public static class TestPojoWithAnnotatedArguments{ @StreamListener(target = Sink.INPUT, condition = "headers['type']=='food'") public void receiveFoodOrder(@Payload FoodOrder foodOrder){ // handle the message } @StreamListener(target = Sink.INPUT, condition = "headers['type']=='compute'") public void receiveComputeOrder(@Payload ComputeOrder computeOrder){ // handle the message } }
小結
本文主要介紹了Spring Cloud Stream中涉及到的相關概念,重點介紹了Spring Cloud Stream的程式設計模型,為後面文章實戰應用和自定義奠定一些基礎。Spring Cloud Stream封裝了多種訊息中介軟體的操作介面,目前只有kafka和rabbitmq,下一篇將會介紹如何自已實現一個Rocketmq的繫結器。