文章目錄
使用 C# 設定 Single Active Consumer 讀取 RabbitMQ Streams
RabbitMQ 團隊在 RabbitMQ 3.9 導入 Streams
,官網文件大致上說明了有哪些異動與效能改善,以下整理個人理解:
- 不會像過去 RabbitMQ 在 message 得到 ack 後就刪除,處理方式如同 Kafka
- 提供 server-side offset 追蹤,讓 consumer 可以從上次 offset 繼續消費,這也跟 Kafka 一樣
- 可以使用二進位協定來存取,有效提升效能
根據官網說明,RabbitMQ Streams 適合的情境有:
- 多個應用程式需要存取相同的資料
- 大量的資料需要被存取 (RabbitMQ Streams 將訊息儲存在 disk 而非傳統 queue 的 memory)
- 訊息的 replay (可以透過 offset 或是指定 timestamp 來重新讀取過去的訊息)
- 高傳輸量的應用程式 (相較於傳統 queue 高出幾個數量級)
之前筆記 使用 C# 存取 RabbitMQ Streams 紀錄的是 RabbitMQ Stream queue type 的使用方式,並且使用 C# 搭配 RabbitMQ .NET client 來存取,也在 使用 C# 透過二進位協定存取 RabbitMQ Streams 紀錄到如何使用 RabbitMQ Stream .Net Client 透過二進位協定的存取方式,今天要來紀錄如何讓 RabbitMQ Streams 如同 Kafka consumer group 的效果:只有一個 consumer 在消費訊息
基本環境說明
- macOS Ventura 13.3
- OrbStack 1.0.1(16297)
- .NET SDK 6.0.413
- JetBrains Rider 2023.2.3
NuGet libaries
- RabbitMQ.Stream.Client 1.7.4
- Microsoft.Extensions.Logging.Console 7.0.0
docker images
- rabbitmq:3.12.7-management
- yowko/rabbitmq:3.12.7-management
RabbitMQ cluster docker compose
詳細內容可以參考過去筆記 透過 docker compose 啟動 RabbitMQ cluster
建立 RabbitMQ Stream queue
使用 binary protocol 可以不用建立 exchange 與 binding
rabbitmqadmin declare queue name=test-streams durable=true queue_type=stream -u admin -p pass.123
設定方式:設定 Consumer Config
設定 Reference 名稱
Reference = "yowkoconsumer"
啟用單一 active consumer
IsSingleActiveConsumer = true
取得 Reference 對應 offset:加上
ConsumerUpdateListener
這邊我個人有稍做修改:在使用 reference 與 stream 查不到相關紀錄時不拋出
OffsetNotFoundException
,而是回傳OffsetTypeFirst
讓 consumer 從第一筆開始消費ConsumerUpdateListener = async (consumerRef, stream, isActive) => { try { var offset = await streamSystem.QueryOffset(consumerRef, stream).ConfigureAwait(false); return new OffsetTypeOffset(offset); } catch (Exception e) when (e is OffsetNotFoundException) { Console.WriteLine(e); return new OffsetTypeFirst(); } }
儲存 offfset 追蹤:修改
MessageHandler
嚴格來說,這個動作不屬於 single active consumer 的範疇,但是如果不設定的話,server 就沒有辦法知道 consumer 已經消費到哪裡,算是 single active consumer 的必要前提
MessageHandler = async (sourceStream, consumer, messageContext, message) => { Console.WriteLine( $"Received message: {Encoding.ASCII.GetString(message.Data.Contents)} |{messageContext.Offset} | {messageContext.Timestamp.TotalMilliseconds}"); await consumer.StoreOffset(messageContext.Offset).ConfigureAwait(false); await Task.CompletedTask.ConfigureAwait(false); }
完整 consumer 內容
心得
- RabbitMQ Stream 的 consumer group 設定方式與 Kafka 不同,Kafka 是透過 group id 來設定,而 RabbitMQ Stream 則是透過 Reference 來設定
- RabbitMQ Stream single active consumer 的設定明顯比 Kafka consumer group 繁瑣,需要自行設定 offset 追蹤與儲存,相較需 kafka 只要指定 group id 其他的皆交由 kafka 處理
- 個人實測,在沒有新增 message 的情況下,雖然有回傳最後一筆 message 內容,但重啟 consumer 時還是會收到最後一筆 message (這筆 message 會重複收到)
設定 ConsumerUpdateListener 時,如果 Reference 與 stream 不存在時,會拋出
OffsetNotFoundException
,我修改官網的範例以回傳OffsetTypeFirst
讓 consumer 從第一筆開始消費 (我覺得這邊可以改善,可以直接回傳OffsetTypeFirst
,不需要拋出例外,否則以我看就是會一直收到OffsetNotFoundException
,還是我誤會了什麼XD)錯誤訊息
15:11:00 fail: RabbitMQ.Stream.Client.Reliable.Consumer[0] Error reading the socket RabbitMQ.Stream.Client.OffsetNotFoundException: QueryOffset stream: test-streams, reference: yowkoconsumer at RabbitMQ.Stream.Client.ClientExceptions.MaybeThrowException(ResponseCode responseCode, String message) in /_/RabbitMQ.Stream.Client/ClientExceptions.cs:line 25 at RabbitMQ.Stream.Client.StreamSystem.QueryOffset(String reference, String stream) in /_/RabbitMQ.Stream.Client/StreamSystem.cs:line 343 at Program.<>c__DisplayClass0_0.<<<Main>$>b__3>d.MoveNext() in /Users/yowko.tsai/POCs/StreamBinaryDemo/StreamBinaryDemo/Program.cs:line 59 --- End of stack trace from previous location --- at RabbitMQ.Stream.Client.RawConsumer.<>c__DisplayClass17_0.<<Init>b__2>d.MoveNext() in /_/RabbitMQ.Stream.Client/RawConsumer.cs:line 536 --- End of stack trace from previous location --- at RabbitMQ.Stream.Client.Client.HandleIncoming(Memory`1 frameMemory) in /_/RabbitMQ.Stream.Client/Client.cs:line 495 at RabbitMQ.Stream.Client.Connection.ProcessIncomingFrames() in /_/RabbitMQ.Stream.Client/Connection.cs:line 163
錯誤截圖
完整程式碼:yowko/rabbitmq-streams-binary
參考資訊
文章作者 Yowko Tsai
上次更新 2023-11-10
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。