文章目錄
同時訂閱多個 Kafka topic
隨著產品的發展,團隊所建立的 application 也逐漸變多,不同 application 的溝通也變得複雜,為了避免直接相依,一部份的功能是透 kafka 來交換資料,但如果不加以管理,可以想見 kafka topic 的數量與 message 的大小、發送頻率很快就會失控,所以團隊就想要針對 kafka 的 topic 命名加上一些規範:
- [原生限制] 長度不得超過
255
字元 - [原生限制] 只允許
a-z
,A-Z
,0-9
,. (dot)
,_ (underscore)
與- (dash)
- [團隊建議] 命名格式為:
{發送端 application name}-{用途}
ex:auth-log
前兩項因為是 kafka 原生的限制,沒什麼討論空間,但第三點的團隊建議就有不同聲音了。希望以 發送端 application name
而不是接收端 application name 的原因是希望可以迅速定位到資料源頭也避免 topic 遭到誤用;以上面 auth-log
為例,如果改成接收端 (filebeat) 來命名:filebeat-log
,這時候有個新 application 也將 log 接入 filebeat-log
可是格式不符,造成 log 爬不出來,這時候就需要逐一排除到底是哪個 application 造成的問題,也可能造成到其他沒問題 application 無法輸出 log
想要使用接收端 (filebeat) 來命名主要是因為需要同時接收多個 application 提供的資料,建立多個 consumer 很麻煩,但其實 kafka 是支援同時訂閱多個 topic 的,今天就快速紀錄一下使用方式
基本環境說明
- macOS Ventura 13.1
- .NET SDK 6.0.400
- NuGet packages
- Confluent.Kafka 1.9.3
- docker images
- quay.io/strimzi/kafka:latest-kafka-3.3.1-amd64
kafka 建立方式
使用 kraft 算法而不用相依於 zookeeper 的方式來建立 kafka,詳細內容可以參考 使用 Docker 啟動不依賴 ZooKeeper 的 Kafka
server.properties
process.roles=broker,controller node.id=1 offsets.topic.replication.factor=1 controller.quorum.voters=1@kafka:9093 listeners=PLAINTEXT://:9092,CONTROLLER://:9093 advertised.listeners=PLAINTEXT://kafka:9092 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT log.dirs=kafka_data
docker-compose.yaml
version: '3' services: kafka: container_name: kafka user: root image: quay.io/strimzi/kafka:latest-kafka-3.3.1-amd64 volumes: - "./server.properties:/opt/kafka/config/kraft/server.properties" command: [ "sh", "-c", "bin/kafka-storage.sh format -t $$(bin/kafka-storage.sh random-uuid) -c /opt/kafka/config/kraft/server.properties && bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties" ] ports: - "9092:9092"
topic 建立
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic t1 && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic t2
設定方式
C# code:進行 topic subscribe,傳入多個 topic
class Program { static void Main(string[] args) { var conf = new ConsumerConfig { GroupId = "yowkotest", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest }; using var c = new ConsumerBuilder<Ignore, string>(conf).Build(); //進行 topic subscribe,使用多個 topic c.Subscribe(new []{"t1","t2"}); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); } } }
實際效果
心得
看得出來 code 修改的部份很小: c.Subscribe("topic");
–> c.Subscribe(new []{"t1","t2"});
,甚至以後預設都改用傳入 array 的方式來 subscribe
也行
這樣的筆記自己都覺得太水,但剛好趁這個機會驗證一下順便紀錄,大家討論時可以比較有信心
參考資訊
文章作者 Yowko Tsai
上次更新 2023-01-18
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。