使用 C# 存取 RabbitMQ Streams

RabbitMQ 團隊在 RabbitMQ 3.9 導入 Streams,官網文件大致上說明了有哪些異動與效能改善,以下整理個人理解:

  1. 不會像過去 RabbitMQ 在 message 得到 ack 後就刪除,處理方式如同 Kafka
  2. 提供 server-side offset 追蹤,讓 consumer 可以從上次 offset 繼續消費,這也跟 Kafka 一樣
  3. 可以使用二進位協定來存取,有效提升效能

根據官網說明,RabbitMQ Streams 適合的情境有:

  1. 多個應用程式需要存取相同的資料
  2. 大量的資料需要被存取 (RabbitMQ Streams 將訊息儲存在 disk 而非傳統 queue 的 memory)
  3. 訊息的 replay (可以透過 offset 或是指定 timestamp 來重新讀取過去的訊息)
  4. 高傳輸量的應用程式 (相較於傳統 queue 高出幾個數量級)

今天要來紀錄的是 RabbitMQ Stream queue type 的使用方式,並且使用 C# 來存取,與二進位協定的存取方式不同(需要使用 stream client,這邊使用的是 RabbitMQ .NET client 來存取,除此之外 RabbitMQ 也不需額外安裝 stream 相關 plugin)。


  1. macOS Ventura 13.3
  2. OrbStack 1.0.1(16297)
  3. .NET SDK 6.0.413
  4. JetBrains Rider 2023.2.3
  5. NuGet libaries

    • RabbitMQ.Client 6.6.0
  6. docker images

    • rabbitmq:3.12.7-management
    • yowko/rabbitmq:3.12.7-management
  7. RabbitMQ cluster docker compose

    詳細內容可以參考過去筆記 透過 docker compose 啟動 RabbitMQ cluster

  8. 建立 RabbitMQ exchange

    rabbitmqadmin declare exchange name=test type=topic -u admin -p pass.123
  9. 建立 RabbitMQ Stream queue

    rabbitmqadmin declare queue name=test-streams durable=true queue_type=stream -u admin -p pass.123
  10. 設定 exchange queue 的 binding

    rabbitmqadmin declare binding source=test destination_type=queue destination=test-streams routing_key=streams -u admin -p pass.123

C# 存取 RabbitMQ Stream

  1. Produce

  2. Consume


  1. RabbitMQ 無需啟用 rabbitmq_streamrabbitmq_stream_management plugin
  2. 不需使用 RabbitMQ.Stream.Client 來存取
  3. 必需設定 QOS (有些設定不支援)

    • QOS 未設定錯誤訊息

      Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'test-streams' in vhost '/''', classId=60, methodId=20
         at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
         at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.IModelExensions.BasicConsume(IModel model, IBasicConsumer consumer, String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments)
         at Program.<<Main>$>g__ConsumeStreams|0_1(IConnection connection, IModel channel, EventingBasicConsumer consumer) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 67
         at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 47


    • prefetchSize 只能設為 0

      Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=540, text='NOT_IMPLEMENTED - prefetch_size!=0 (1)', classId=60, methodId=10
         at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
         at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
         at RabbitMQ.Client.Framing.Impl.Model.BasicQos(UInt32 prefetchSize, UInt16 prefetchCount, Boolean global)
         at RabbitMQ.Client.Impl.AutorecoveringModel.BasicQos(UInt32 prefetchSize, UInt16 prefetchCount, Boolean global)
         at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 29
      Process finished with exit code 134.


    • prefetchCount 不得設為 0

      Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'test-streams' in vhost '/''', classId=60, methodId=20
         at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
         at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.IModelExensions.BasicConsume(IModel model, IBasicConsumer consumer, String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments)
         at Program.<<Main>$>g__ConsumeStreams|0_1(IConnection connection, IModel channel, EventingBasicConsumer consumer) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 67
         at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 47
      Process finished with exit code 134.


    • global 只能設為 false

      Unhandled exception. RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'test-streams' in vhost '/''', classId=60, methodId=20
         at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
         at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
         at RabbitMQ.Client.IModelExensions.BasicConsume(IModel model, IBasicConsumer consumer, String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments)
         at Program.<<Main>$>g__ConsumeStreams|0_1(IConnection connection, IModel channel, EventingBasicConsumer consumer) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 67
         at Program.<Main>$(String[] args) in /Users/yowko.tsai/POCs/Solution1/RabbitMqPublishConsume/Program.cs:line 47
      Process finished with exit code 134.


  4. x-stream-offset(stream-offset) offset 與 timestamp 使用上模糊

    文件說允許 "first""last""next"OffsetTimestampInterval,但實際測試下來 Timestamp 沒有成功過,感覺都是吃到 Offset 的效果,不知道是不是我個人對文件的理解有問題


  5. 文件不夠充足

    這個可能是我個人問題:queue type: stream 與 binary RabbitMQ Stream protocol 區別不夠明顯,範例與說明容易混淆,而且有些設定不支援,但文件沒有明確說明,只能透過測試才知道

  6. 雖然是對應 Kafka 相關功能,但還是有不少差異,其他像是 Kafka consumer group 的設定,RabbitMQ 可能需要透過 single active consumer 實作、RabbitMQ 有 Super streams,初步看來跟 Kafka partition 概念接近,但都還沒花時間仔細研究

完整程式碼:GitHub: yowko/streams-queue-demo


  1. RabbitMQ 3.9
  2. 透過 docker compose 啟動 RabbitMQ cluster
  3. RabbitMQ Streams Overview
  4. RabbitMQ Streams and Replay Features, Part 1: When to Use RabbitMQ Streams
  5. RabbitMQ Streams and replay features, Part 2: How to work with RabbitMQ Streams
  6. GitHub: yowko/streams-queue-demo