文章目錄
在 ASP.NET Core 中從 Apache Pulsar 接收訊息
Apache Pulsar 常常被拿來與 Kafka 做比較,孰優孰劣常常也是各自擁護者爭相討論的內容,以下條列幾項選擇 Pulsar 的正面意見
- 同時支援即時訊息與訊息佇列
- 支援 partition 但可以選擇不用
- 分散式 log
- 無狀態
- 跨地域性複製
- 較高吞吐量的同時保持較低的延遲
- 分層儲存與多租戶
- 可以簡易與 kafka 、 rabbitmq 整合
其實列了一堆,我個人是不太在意啦,當然有些內容是有打中 kafka 的部份缺點,不過我會想嘗試的主因是有效能測試比較 - Benchmarking Pulsar and Kafka - A More Accurate Perspective on Pulsar’s Performance :Pulsar 在大部份情境的性能都優於 kafka,加上 kafka 在大資料量下可能會出現延遲,雖然還沒實際遇到問題,但我還是想先預先增加一些技術 solution,以備不時之需
之前筆記 在 ASP.NET Core 中發送訊息至 Apache Pulsar 紀錄到如何在 ASP.NET Core 中如何發送訊息,今天要紀錄透過 Consumer 接受訊息的使用方式
如果想要透過 Reader 來獲取訊息可以參考 在 ASP.NET Core 中從 Apache Pulsar 接收訊息 (Reader)
Reader 與 Consumer 的差異在於
- Reader 可以指定開始處理 message 的位置,而 Consumer 都是從可用、未確認且最新的 message 開始處理
- Reader 不需 ack 也不保留相關資料
基本環境說明
- macOS Monterey 12.2.1
- docker desktop 4.2.0(70708)
docker images
- apachepulsar/pulsar:2.9.1
.NET SDK 6.0.200
NuGet packages
- DotPulsar 2.2.0
- Pulsar.Client 2.10.0
使用 docker 啟動 pulsar
docker run -d -p 6650:6650 -p 8080:8080 --name pulsar apachepulsar/pulsar:latest bin/pulsar standalone
6650
是 broker service port8080
是 web service port
使用方式
DotPulsar
建立 consumer BackgroundService
下列兩個方式的差異只在於 consumer 的建立方式
使用 builder
public class ConsumerBuilderService:BackgroundService { private const string Topic = "persistent://public/default/yowkotest"; private const string SubscriptionName = "YowkoSubscription"; private readonly IPulsarClient _pulsarClient; public ConsumerBuilderService(IPulsarClient pulsarClient) { _pulsarClient = pulsarClient; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { await using var pulsarConsumer = _pulsarClient.NewConsumer(Schema.String) .SubscriptionName(SubscriptionName) .Topic(Topic) .Create(); await foreach (var message in pulsarConsumer.Messages(stoppingToken)) { Console.WriteLine($"Received: {message.Value()}"); await pulsarConsumer.Acknowledge(message, stoppingToken); //await pulsarConsumer.AcknowledgeCumulative(message, cancellationToken); } await Task.Delay(TimeSpan.FromSeconds(3), stoppingToken); } } }
不使用 builder
public class ConsumerBuilderService : BackgroundService { private const string Topic = "persistent://public/default/yowkotest"; private const string SubscriptionName = "YowkoSubscription"; private readonly IPulsarClient _pulsarClient; public ConsumerBuilderService(IPulsarClient pulsarClient) { _pulsarClient = pulsarClient; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { var options = new ConsumerOptions<string>(SubscriptionName,Topic,Schema.String); await using var pulsarConsumer = _pulsarClient.CreateConsumer(options); await foreach (var message in pulsarConsumer.Messages(stoppingToken)) { Console.WriteLine($"Received: {message.Value()}"); // 手動 ack await pulsarConsumer.Acknowledge(message, stoppingToken); //await pulsarConsumer.AcknowledgeCumulative(message, cancellationToken); } await Task.Delay(TimeSpan.FromSeconds(3), stoppingToken); } } }
Program.cs : 註冊 Pulsar client 與 HostedService
// 建立 Pulsar 連線 client,可以透過 `ServiceUrl` 設定 Pulsar 位址(預設: `pulsar://localhost:6650`)、`RetryInterval` 設定重試或是重連的間隔秒數 (預設值:3s) var pulsarClient = PulsarClient .Builder() .ServiceUrl(new Uri("pulsar://localhost:6650")) .RetryInterval(TimeSpan.FromSeconds(3)) .Build(); builder.Services.AddSingleton(pulsarClient); builder.Services.AddHostedService<ConsumerBuilderService>(); //builder.Services.AddHostedService<ConsumerWithoutBuilderService>();
Pulsar.Client
建立 consumer BackgroundService
public class ReaderService : BackgroundService { private const string Topic = "yowkotest"; private readonly PulsarClient _pulsarClient; public ReaderService(PulsarClient pulsarClient) { _pulsarClient = pulsarClient; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { await using var pulsarReader = await _pulsarClient.NewReader(Schema.STRING(Encoding.UTF8)) .Topic(Topic) .StartMessageId(MessageId.Earliest) .CreateAsync(); var message = await pulsarReader.ReadNextAsync(stoppingToken); Console.WriteLine($"Received: {message.GetValue()}"); } } }
Program.cs : 註冊 Pulsar client 與 HostedService
const string serviceUrl = "pulsar://localhost:6650"; const string topicName = "yowkotest"; var client = await new PulsarClientBuilder() .ServiceUrl(serviceUrl) .BuildAsync(); builder.Services.AddSingleton(client); builder.Services.AddHostedService<ConsumerService>();
心得
關於 DotPulsar
官網文件與 GitHub 的 wiki 不是即時的
library 的 api 已更新,官網上的教學未更新,照著做一定沒辦法用
沒有指定 message 類型
方法命名不好: async 方法,名稱未以 async 結尾
官網上手動 ack 的程式碼錯誤
這個可能是我自己不懂造成的:沒有說明兩種 ack 的差異
關於 Pulsar.Client
sample code 沒有指定 message 類型
GitHub 上的 sample 只看到一筆筆收資料的方式,不知道大資料效能如何
NuGet 上的下載數與 GitHub 上的 star 數都是 Pulsar.Client 多一些
完整程式碼可以參考 yowko/PulsarTest
參考資訊
- Benchmarking Pulsar and Kafka - A More Accurate Perspective on Pulsar’s Performance
- Set up a standalone Pulsar in Docker
- apachepulsar/pulsar
- Pulsar configuration
- Pulsar C# client
- apache/pulsar-dotpulsar
- fsprojects/pulsar-client-dotnet
- yowko/PulsarTest
- 在 ASP.NET Core 中發送訊息至 Apache Pulsar
- 在 ASP.NET Core 中從 Apache Pulsar 接收訊息 (Reader)
文章作者 Yowko Tsai
上次更新 2022-03-10
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。