文章目錄
使用 .NET client 連線至 Apache Kafka 收發訊息
.NET Core 用法可以參考 讓 Kafka 達成 Broadcast 效果
繼之前紀錄 如何在 Windows OS 安裝 Apache Kafka 到現在默默地過了兩個月XD,直到最近才有時間可以再開始進行 MQ 相關功能比較,經過初步比較後,目前打算以 RabbitMQ 與 Apache Kafka 兩個 MQ 系統為主要選項來做比較,之前文章 使用 .Net client 連線至 RabbitMQ 收發訊息 介紹了使用 .Net Client 連線 RabbitMQ 進行基本發送訊息及接收訊息的程式碼,今天就來看看 .Net 如何連線 Apache Kafka 進行收發訊息
身為 .NET 工程師,雖然想多熟悉 linux,但終究沒有那麼容易,今天的 demo 還是會以 Windows 上的 Kafka 當做範例,如果對 Windows 上安裝 Apache Kafka 有興趣的可以參考 如何在 Windows OS 安裝 Apache Kafka
安裝 .Net Client
Microsoft/CSharpClient-for-KafkaKafka .Net Client 在 NuGet 上有好幾套件,其實並不好選擇,本來想使用 Microsoft 出的 CSharpClient-for-Kafka,但專案說明出現:
只支援到 Kafka 0.8,直到 Kafka 1.0 才會重新包裝 library,如果是 Kafaka 0.9 之後版本請使用 confluent-kafka-dotnet
confluentinc/confluent-kafka-dotnet
使用 Package Manager Console
Install-Package Confluent.Kafka -Version 0.9.5
官方建議的安裝語法有指定版本為 0.9.5 我看最新版本就是 0.9.5 ,不指定還是會安裝 0.9.5
使用 NuGet Package Explorer
專案(project) 上按右鍵 –> Manage NuGet Packages…
Browse –> 搜尋
confluent.kafka
–> 點選Confluent.Kafka
–> Install
接收訊息
使用 confluent-kafka-dotnet 的官方範例來建立 consumer
範例程式
void Main() { //指定 kafka 所在位置及 port string brokerList = "localhost:9092"; //指定要監聽的 topic,可以監聽多個 var topics = new List<string>() { "Yowkotest" }; //這個 group.id 沒什麼作用,可以隨便給,將 kafka 位置設定給 config var config = new Dictionary<string, object> { { "group.id", "yowko" }, { "bootstrap.servers", brokerList } }; //將 config 餵給 consumer using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8))) { consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset(topics.First(), 0, 0) }); //持續監聽 while (true) { Message<Null, string> msg; //接受訊息 if (consumer.Consume(out msg)) { Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); } } } }
注意事項
- 只會接受 producer 在 consumer 啟動後所發送的訊息
- 多個 consumer 都會收到同樣訊息,不是分配接受端
發送訊息
範例程式
void Main() { //指定 kafka 所在位置及 port string brokerList = "localhost:9092"; //指定發送的 topic string topicName = "Yowkotest"; //將 kafka 位置設定給 config var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } }; //將 config 餵給 producer using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) { Console.WriteLine($"{producer.Name} producing on {topicName}. q to exit."); string text; //持續等待 user 輸入 while ((text = Console.ReadLine()) != "q") { // 非同步發送訊息至 broker var deliveryReport = producer.ProduceAsync(topicName, null, text); //發送成功後輸出訊息 deliveryReport.ContinueWith(task => { Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}, Message: {text}"); }); } //將 producer request 保留至 disk,確保資料不會遺失 producer.Flush(); } }
注意事項
- flush 動作不一定要執行,建議只針對重要訊息執行即可,會影響效能
- 如果會關閉 producer ,建議執行避免有未完成的 request 遺失
其他選項
- 使用其他 .Net Client - ah-/rdkafka-dotnet
範例程式
void Main() { // 這個 GroupId 沒什麼作用,可以隨便給 var config = new Config() { GroupId = "example-csharp-consumer" }; //將 config 跟 kafaka host 跟 port 指定跟 consumer using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) { //如果收到訊息時的行為 consumer.OnMessage += (obj, msg) => { string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length); //取得訊息後輸出 Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}"); }; // consumer 訂閱 topic consumer.Subscribe((new[] { "Yowkotest" }).ToList()); // 開始監聽 consumer.Start(); Console.WriteLine("Started consumer, press enter to stop consuming"); Console.ReadLine(); } }
注意事項
- 只有沒收過的訊息,啟動監聽後會全部收下來
- 一樣是非分配訊息接受,訊息會被後起的 consumer 收走
心得
Kafka 在 .Net 上的資源相對於 RabbitMQ 是比較少的,設定上也較煩瑣,周邊配套功能或是工具支援都較少,不像 RabbitMQ 成熟,可能因為發展時間造成的。
功能跟預期(可以自動分流 consumer)不同,我想應該是設定面問題,這個再找時間進行研究跟調整,有新的心得再跟大家分享
.NET Core 用法可以參考 讓 Kafka 達成 Broadcast 效果
參考資訊
文章作者 Yowko Tsai
上次更新 2021-11-02
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。