使用 C# 存取 RabbitMQ Streams

RabbitMQ 團隊在 RabbitMQ 3.9 導入 Streams,官網文件大致上說明了有哪些異動與效能改善,以下整理個人理解:

  1. 不會像過去 RabbitMQ 在 message 得到 ack 後就刪除,處理方式如同 Kafka
  2. 提供 server-side offset 追蹤,讓 consumer 可以從上次 offset 繼續消費,這也跟 Kafka 一樣
  3. 可以使用二進位協定來存取,有效提升效能

根據官網說明,RabbitMQ Streams 適合的情境有:

  1. 多個應用程式需要存取相同的資料
  2. 大量的資料需要被存取 (RabbitMQ Streams 將訊息儲存在 disk 而非傳統 queue 的 memory)
  3. 訊息的 replay (可以透過 offset 或是指定 timestamp 來重新讀取過去的訊息)
  4. 高傳輸量的應用程式 (相較於傳統 queue 高出幾個數量級)

今天要來紀錄的是 RabbitMQ Stream queue type 的使用方式,並且使用 C# 來存取,與二進位協定的存取方式不同(需要使用 stream client,這邊使用的是 RabbitMQ .NET client 來存取,除此之外 RabbitMQ 也不需額外安裝 stream 相關 plugin)。

基本環境說明

  1. macOS Ventura 13.3
  2. OrbStack 1.0.1(16297)
  3. .NET SDK 6.0.413
  4. JetBrains Rider 2023.2.3
  5. NuGet libaries

    • RabbitMQ.Client 6.6.0
  6. docker images

    • rabbitmq:3.12.7-management
    • yowko/rabbitmq:3.12.7-management
  7. RabbitMQ cluster docker compose

    詳細內容可以參考過去筆記 透過 docker compose 啟動 RabbitMQ cluster

  8. 建立 RabbitMQ exchange

    rabbitmqadmin declare exchange name=test type=topic -u admin -p pass.123
    
  9. 建立 RabbitMQ Stream queue

    rabbitmqadmin declare queue name=test-streams durable=true queue_type=stream -u admin -p pass.123
    
  10. 設定 exchange queue 的 binding

    rabbitmqadmin declare binding source=test destination_type=queue destination=test-streams routing_key=streams -u admin -p pass.123
    

C# 存取 RabbitMQ Stream

  1. Produce

  2. Consume

心得

  1. RabbitMQ 無需啟用 rabbitmq_streamrabbitmq_stream_management plugin
  2. 不需使用 RabbitMQ.Stream.Client 來存取
  3. 必需設定 QOS (有些設定不支援)

    • QOS 未設定錯誤訊息

      Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'test-streams' in vhost '/''', classId=60, methodId=20
         at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
         at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.IModelExensions.BasicConsume(IModel model, IBasicConsumer consumer, String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments)
         at Program.<<Main>$>g__ConsumeStreams|0_1(IConnection connection, IModel channel, EventingBasicConsumer consumer) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 67
         at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 47
      

      1noqos

    • prefetchSize 只能設為 0

      Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=540, text='NOT_IMPLEMENTED - prefetch_size!=0 (1)', classId=60, methodId=10
         at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
         at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
         at RabbitMQ.Client.Framing.Impl.Model.BasicQos(UInt32 prefetchSize, UInt16 prefetchCount, Boolean global)
         at RabbitMQ.Client.Impl.AutorecoveringModel.BasicQos(UInt32 prefetchSize, UInt16 prefetchCount, Boolean global)
         at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 29
      
      Process finished with exit code 134.
      

      2prefetchsizeonly0

    • prefetchCount 不得設為 0

      Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'test-streams' in vhost '/''', classId=60, methodId=20
         at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
         at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.IModelExensions.BasicConsume(IModel model, IBasicConsumer consumer, String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments)
         at Program.<<Main>$>g__ConsumeStreams|0_1(IConnection connection, IModel channel, EventingBasicConsumer consumer) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 67
         at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 47
      
      Process finished with exit code 134.
      

      3prefetchcountnot0

    • global 只能設為 false

      Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'test-streams' in vhost '/''', classId=60, methodId=20
         at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
         at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.IModelExensions.BasicConsume(IModel model, IBasicConsumer consumer, String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments)
         at Program.<<Main>$>g__ConsumeStreams|0_1(IConnection connection, IModel channel, EventingBasicConsumer consumer) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 67
         at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 47
      
      Process finished with exit code 134.
      

      4globalonlyfalse

  4. x-stream-offset(stream-offset) offset 與 timestamp 使用上模糊

    文件說允許 "first""last""next"OffsetTimestampInterval,但實際測試下來 Timestamp 沒有成功過,感覺都是吃到 Offset 的效果,不知道是不是我個人對文件的理解有問題

    5streamoffset

  5. 文件不夠充足

    這個可能是我個人問題:queue type: stream 與 binary RabbitMQ Stream protocol 區別不夠明顯,範例與說明容易混淆,而且有些設定不支援,但文件沒有明確說明,只能透過測試才知道

  6. 雖然是對應 Kafka 相關功能,但還是有不少差異,其他像是 Kafka consumer group 的設定,RabbitMQ 可能需要透過 single active consumer 實作、RabbitMQ 有 Super streams,初步看來跟 Kafka partition 概念接近,但都還沒花時間仔細研究

完整程式碼:GitHub: yowko/streams-queue-demo

參考資料

  1. RabbitMQ 3.9
  2. 透過 docker compose 啟動 RabbitMQ cluster
  3. RabbitMQ Streams Overview
  4. RabbitMQ Streams and Replay Features, Part 1: When to Use RabbitMQ Streams
  5. RabbitMQ Streams and replay features, Part 2: How to work with RabbitMQ Streams
  6. GitHub: yowko/streams-queue-demo