文章目錄
讓 RabbitMQ 支援延遲發送訊息
一般情況下,Message Queue 都是將 message 由 producer 送給 broker 後接著就由 consumer dequeue 進行處理,常見的額外需求是 message 有不同的 priority,但這次客戶提出想要在發生動作的當下不立即觸發對應的行為,想透過設定延遲時間來執行對應行為
客戶的需求永遠是刺激工程師進步的原動力,辜且不論客戶延遲執行的理由跟合理性,身為工程師一定要有可以滿足客戶需求的能力,所以就先想想可能的解決方案
如果是一般情況下可能我會將 message 與預計執行動作的時間紀錄到 db,然後再使用 scheduler 定期去檢查時間及實際執行動作,而這次專案因為已經使用 RabbitMQ 於是就試試 RabbitMQ 是否可以支援這樣的需求,來看看可以怎麼設定吧
RabbitMQ 安裝套件 - RabbitMQ Delayed Message Plugin
預設情境下 RabbitMQ 並不支援延遲發送的功能,但 RabbitMQ 不愧是套成熟的 mq,可以透過安裝套件來讓 RabbitMQ 支援延遲發送
- 至 RabbitMQ Community Plugins page 下載 rabbitmq_delayed_message_exchange 套件 (.ez 檔)
將下載的
.ez
檔複製至 RabbitMQ 安裝目錄下的我的檔名是
rabbitmq_delayed_message_exchange-0.0.1.ez
,我的安裝目錄是C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.9\plugins
啟用套件 - RabbitMQ Delayed Message Plugin
開啟 RabbitMQ Command Prompt (sbin dir)
執行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
以啟用套件
修改發送程式碼
要延遲發送 message 只需要為 message 加上一個 x-delay
的 header 及延遲毫秒數並在發送時指定 property 給 message 即可
程式碼說明
// 建立 BasicProperties var props = new BasicProperties(); // 建立 diciionary 來放 header 內容 Dictionary<string, object> headers = new Dictionary<string, object>(); // 指定 x-delay 及 延遲毫秒數 headers.Add("x-delay", 5000); // 將定義完成的 dictionary 指定給 BasicProperties 的 header props.Headers = headers; // 以 BasicProperties 發送訊息 channel.BasicPublish(exchange, routingKey, props, body);
完整程式碼
//初始化連線資訊 var factory = new ConnectionFactory(); //設定 RabbitMQ 位置 factory.HostName = "localhost"; //設定 RabbitMQ port factory.Port = 5672; //設定連線 RabbitMQ username factory.UserName = "yowko"; //設定 RabbitMQ password factory.Password = "pass.123"; //開啟連線 using (var connection = factory.CreateConnection()) //開啟 channel using (var channel = connection.CreateModel()) { string exchange = "yowkoDelay"; string queue = "yowkoDelayTest"; string routingKey = "hello"; Dictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-delayed-type", "direct"); //宣告 exchanges,RabbitMQ提供了四種Exchange模式:fanout,direct,topic,header channel.ExchangeDeclare(exchange, "x-delayed-message", true, false, args); //宣告 queues channel.QueueDeclare(queue, true, false, false, null); //將 exchnage、queue 依 route rule 綁定 channel.QueueBind(queue, exchange, routingKey, null); string message = $"delayed payload-{Guid.NewGuid()}"; var body = Encoding.UTF8.GetBytes(message); // 建立 BasicProperties var props = new BasicProperties(); // 建立 diciionary 來放 header 內容 Dictionary<string, object> headers = new Dictionary<string, object>(); // 指定 x-delay 及 延遲毫秒數 headers.Add("x-delay", 5000); // 將定義完成的 dictionary 指定給 BasicProperties 的 header props.Headers = headers; // 以 BasicProperties 發送訊息 channel.BasicPublish(exchange, routingKey, props, body); Console.WriteLine(" set {0}", message); }
實際效果
將同一則訊息發送至兩個不同的 queue,一個是直接發送,另一個則使用延遲發送,確認延遲發送可以運作
心得
透過安裝套件及修改發送端程式就可以達成延遲發送目的,感覺上還滿方便的,只是我在進行長時間測試時發現,超過一個小時的延遲無法被達成,我自己實際測試的上限都約在一個小時以下,超過一個小時的延遲沒有成功過,但我沒查到實際問題發生原因,官方文件上寫的限制數字是 ERL_MAX_T
= 4294967295 ,遠大於我測試出來的數字,我看 issues 列表也沒有相關資訊,不知道是不是 windows 版限制,提供給大家參考
參考資訊
文章作者 Yowko Tsai
上次更新 2021-10-28
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。