文章目錄
放大 kafka message size
今天 prod 監控噴出大量 Message size too large 的錯誤訊息,訊息內容很明確:就是傳到 kafka 的 message 太大 (預設為 1048588 約 1MB),趁著這個機會紀錄一下 調整方式與使用方式
基本環境說明
- CentOS 8 kernel 5.7.2
- zookeeper 3.5.9
- Kafka 2.6.2
- .NET SDK 5.0.401
NuGet packages
- Confluent.Kafka 0.9.4
- Confluent.Kafka 1.8.2
code
producer
var conf = new ProducerConfig { BootstrapServers = "localhost:9092" }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); using var p = new ProducerBuilder<Null, string>(conf).Build(); p.Produce("yowkotest", new Message<Null, string> { Value = GetStringFromFile("GetMatchById_18814541.json") }, handler); p.Flush(TimeSpan.FromSeconds(10));consumer
1.8.2
var conf = new ConsumerConfig { GroupId = "yowkotest", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest }; using var c = new ConsumerBuilder<Ignore, string>(conf).Build(); c.Subscribe("yowkotest"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); }0.9.4
var brokerList = "localhost:9092"; var config = new Dictionary<string, object> { { "group.id", "yowkotest" }, { "bootstrap.servers", brokerList } }; using var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)); consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset("yowkotest", 0, 0) }); while (true) { Message<Null, string> msg; if (consumer.Consume(out msg)) { Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); } }
情況描述與設定方式
調整 producer config
var conf = new ProducerConfig { MessageMaxBytes = 104857600 //放大為 10MB };未設定錯誤
錯誤訊息
Unhandled exception. Confluent.Kafka.ProduceException`2[Confluent.Kafka.Null,System.String]: Broker: Message size too large at Confluent.Kafka.Producer`2.Produce(TopicPartition topicPartition, Message`2 message, Action`1 deliveryHandler) at Confluent.Kafka.Producer`2.Produce(String topic, Message`2 message, Action`1 deliveryHandler) at KafkaProducer.Program.Main(String[] args) in /Users/yowko.tsai/POCs/ForTest/KafkaProducer/Program.cs:line 32錯誤截圖

調整 kafka 允許的 message size
topic level
建立新 topic
語法
kafka-topics.sh --bootstrap-server {kafka server}:{kafka port} --create --topic {topic name} --partitions {partition} --replication-factor {replication} --config max.message.bytes={size}範例
kafka-topics.sh --bootstrap-server 192.168.80.3:9092 --create --topic yowkotest --partitions 1 --replication-factor 1 --config max.message.bytes=10485760
修改既有 topic
語法
kafka-configs.sh --bootstrap-server {kafka server}:{kafka port} --alter --add-config max.message.bytes={size} --topic {topic name}範例
kafka-configs.sh --bootstrap-server 192.168.80.3:9092 --alter --add-config max.message.bytes=10485760 --topic yowkotest
kafka instnce level (apply to all topic)
動態修改
語法
kafka-configs.sh --bootstrap-server {kafka server}:{kafka port} --alter --add-config max.message.bytes={size} --entity-type brokers --entity-name {boker id}範例
kafka-configs.sh --bootstrap-server 192.168.80.3:9092 --alter --add-config max.message.bytes=10485760 --entity-type brokers --entity-name 1
修改 config (需要重啟 kafka instance)
修改
config/server.propertiesmessage.max.bytes=10485760
未設定錯誤
錯誤訊息
Delivery Error: Broker: Message size too large錯誤截圖

調整 consumer config (實測下不需要調整)
confluent 官網 consumer 版本如果小於
0.10.2需要調整 consumer fetch size,但我測試 NuGet 上最舊版本0.9.4也不需要額外設定,我這才意識到 confluent 官網 提到的0.10.2是不是 kafka 的版本而不是 confluent 的版本,不過 kafka0.10.2實在是滿舊的 我沒有特別測試,如果日後有遇到再紀錄,這邊就提醒一下而已囉
心得
原本印象中只能調整 kafka 的 server.properties 並且重啟套用設定,經過今天查資料後才發現原來可以動態設定,不過我還是傾向設定在 config 上,其他人才能快速地取得設定值
參考資訊
文章作者 Yowko Tsai
上次更新 2021-11-05
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。
