文章目錄
使用 C# 存取 RabbitMQ Streams
RabbitMQ 團隊在 RabbitMQ 3.9 導入 Streams
,官網文件大致上說明了有哪些異動與效能改善,以下整理個人理解:
- 不會像過去 RabbitMQ 在 message 得到 ack 後就刪除,處理方式如同 Kafka
- 提供 server-side offset 追蹤,讓 consumer 可以從上次 offset 繼續消費,這也跟 Kafka 一樣
- 可以使用二進位協定來存取,有效提升效能
根據官網說明,RabbitMQ Streams 適合的情境有:
- 多個應用程式需要存取相同的資料
- 大量的資料需要被存取 (RabbitMQ Streams 將訊息儲存在 disk 而非傳統 queue 的 memory)
- 訊息的 replay (可以透過 offset 或是指定 timestamp 來重新讀取過去的訊息)
- 高傳輸量的應用程式 (相較於傳統 queue 高出幾個數量級)
今天要來紀錄的是 RabbitMQ Stream queue type 的使用方式,並且使用 C# 來存取,與二進位協定的存取方式不同(需要使用 stream client,這邊使用的是 RabbitMQ .NET client 來存取,除此之外 RabbitMQ 也不需額外安裝 stream 相關 plugin)。
基本環境說明
- macOS Ventura 13.3
- OrbStack 1.0.1(16297)
- .NET SDK 6.0.413
- JetBrains Rider 2023.2.3
NuGet libaries
- RabbitMQ.Client 6.6.0
docker images
- rabbitmq:3.12.7-management
- yowko/rabbitmq:3.12.7-management
RabbitMQ cluster docker compose
詳細內容可以參考過去筆記 透過 docker compose 啟動 RabbitMQ cluster
建立 RabbitMQ exchange
rabbitmqadmin declare exchange name=test type=topic -u admin -p pass.123
建立 RabbitMQ Stream queue
rabbitmqadmin declare queue name=test-streams durable=true queue_type=stream -u admin -p pass.123
設定 exchange queue 的 binding
rabbitmqadmin declare binding source=test destination_type=queue destination=test-streams routing_key=streams -u admin -p pass.123
C# 存取 RabbitMQ Stream
Produce
Consume
心得
- RabbitMQ 無需啟用
rabbitmq_stream
與rabbitmq_stream_management
plugin - 不需使用 RabbitMQ.Stream.Client 來存取
必需設定 QOS (有些設定不支援)
QOS 未設定錯誤訊息
Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'test-streams' in vhost '/''', classId=60, methodId=20 at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer) at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer) at RabbitMQ.Client.IModelExensions.BasicConsume(IModel model, IBasicConsumer consumer, String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments) at Program.<<Main>$>g__ConsumeStreams|0_1(IConnection connection, IModel channel, EventingBasicConsumer consumer) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 67 at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 47
prefetchSize
只能設為 0
Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=540, text='NOT_IMPLEMENTED - prefetch_size!=0 (1)', classId=60, methodId=10 at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) at RabbitMQ.Client.Framing.Impl.Model.BasicQos(UInt32 prefetchSize, UInt16 prefetchCount, Boolean global) at RabbitMQ.Client.Impl.AutorecoveringModel.BasicQos(UInt32 prefetchSize, UInt16 prefetchCount, Boolean global) at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 29 Process finished with exit code 134.
prefetchCount
不得設為 0
Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'test-streams' in vhost '/''', classId=60, methodId=20 at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer) at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer) at RabbitMQ.Client.IModelExensions.BasicConsume(IModel model, IBasicConsumer consumer, String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments) at Program.<<Main>$>g__ConsumeStreams|0_1(IConnection connection, IModel channel, EventingBasicConsumer consumer) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 67 at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 47 Process finished with exit code 134.
global
只能設為 false
Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'test-streams' in vhost '/''', classId=60, methodId=20 at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer) at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer) at RabbitMQ.Client.IModelExensions.BasicConsume(IModel model, IBasicConsumer consumer, String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments) at Program.<<Main>$>g__ConsumeStreams|0_1(IConnection connection, IModel channel, EventingBasicConsumer consumer) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 67 at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 47 Process finished with exit code 134.
x-stream-offset(stream-offset) offset 與 timestamp 使用上模糊
文件說允許
"first"
、"last"
、"next"
、Offset
、Timestamp
、Interval
,但實際測試下來Timestamp
沒有成功過,感覺都是吃到Offset
的效果,不知道是不是我個人對文件的理解有問題文件不夠充足
這個可能是我個人問題:queue type: stream 與 binary RabbitMQ Stream protocol 區別不夠明顯,範例與說明容易混淆,而且有些設定不支援,但文件沒有明確說明,只能透過測試才知道
雖然是對應 Kafka 相關功能,但還是有不少差異,其他像是 Kafka consumer group 的設定,RabbitMQ 可能需要透過 single active consumer 實作、RabbitMQ 有 Super streams,初步看來跟 Kafka partition 概念接近,但都還沒花時間仔細研究
完整程式碼:GitHub: yowko/streams-queue-demo
參考資料
文章作者 Yowko Tsai
上次更新 2023-10-30
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。