文章目錄
放大 kafka message size
今天 prod 監控噴出大量 Message size too large 的錯誤訊息,訊息內容很明確:就是傳到 kafka 的 message 太大 (預設為 1048588
約 1MB),趁著這個機會紀錄一下 調整方式與使用方式
基本環境說明
- CentOS 8 kernel 5.7.2
- zookeeper 3.5.9
- Kafka 2.6.2
- .NET SDK 5.0.401
NuGet packages
- Confluent.Kafka 0.9.4
- Confluent.Kafka 1.8.2
code
producer
var conf = new ProducerConfig { BootstrapServers = "localhost:9092" }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); using var p = new ProducerBuilder<Null, string>(conf).Build(); p.Produce("yowkotest", new Message<Null, string> { Value = GetStringFromFile("GetMatchById_18814541.json") }, handler); p.Flush(TimeSpan.FromSeconds(10));
consumer
1.8.2
var conf = new ConsumerConfig { GroupId = "yowkotest", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest }; using var c = new ConsumerBuilder<Ignore, string>(conf).Build(); c.Subscribe("yowkotest"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); }
0.9.4
var brokerList = "localhost:9092"; var config = new Dictionary<string, object> { { "group.id", "yowkotest" }, { "bootstrap.servers", brokerList } }; using var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)); consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset("yowkotest", 0, 0) }); while (true) { Message<Null, string> msg; if (consumer.Consume(out msg)) { Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); } }
情況描述與設定方式
調整 producer config
var conf = new ProducerConfig { MessageMaxBytes = 104857600 //放大為 10MB };
未設定錯誤
錯誤訊息
Unhandled exception. Confluent.Kafka.ProduceException`2[Confluent.Kafka.Null,System.String]: Broker: Message size too large at Confluent.Kafka.Producer`2.Produce(TopicPartition topicPartition, Message`2 message, Action`1 deliveryHandler) at Confluent.Kafka.Producer`2.Produce(String topic, Message`2 message, Action`1 deliveryHandler) at KafkaProducer.Program.Main(String[] args) in /Users/yowko.tsai/POCs/ForTest/KafkaProducer/Program.cs:line 32
錯誤截圖
調整 kafka 允許的 message size
topic level
建立新 topic
語法
kafka-topics.sh --bootstrap-server {kafka server}:{kafka port} --create --topic {topic name} --partitions {partition} --replication-factor {replication} --config max.message.bytes={size}
範例
kafka-topics.sh --bootstrap-server 192.168.80.3:9092 --create --topic yowkotest --partitions 1 --replication-factor 1 --config max.message.bytes=10485760
修改既有 topic
語法
kafka-configs.sh --bootstrap-server {kafka server}:{kafka port} --alter --add-config max.message.bytes={size} --topic {topic name}
範例
kafka-configs.sh --bootstrap-server 192.168.80.3:9092 --alter --add-config max.message.bytes=10485760 --topic yowkotest
kafka instnce level (apply to all topic)
動態修改
語法
kafka-configs.sh --bootstrap-server {kafka server}:{kafka port} --alter --add-config max.message.bytes={size} --entity-type brokers --entity-name {boker id}
範例
kafka-configs.sh --bootstrap-server 192.168.80.3:9092 --alter --add-config max.message.bytes=10485760 --entity-type brokers --entity-name 1
修改 config (需要重啟 kafka instance)
修改
config/server.properties
message.max.bytes=10485760
未設定錯誤
錯誤訊息
Delivery Error: Broker: Message size too large
錯誤截圖
調整 consumer config (實測下不需要調整)
confluent 官網 consumer 版本如果小於
0.10.2
需要調整 consumer fetch size,但我測試 NuGet 上最舊版本0.9.4
也不需要額外設定,我這才意識到 confluent 官網 提到的0.10.2
是不是 kafka 的版本而不是 confluent 的版本,不過 kafka0.10.2
實在是滿舊的 我沒有特別測試,如果日後有遇到再紀錄,這邊就提醒一下而已囉
心得
原本印象中只能調整 kafka 的 server.properties
並且重啟套用設定,經過今天查資料後才發現原來可以動態設定,不過我還是傾向設定在 config 上,其他人才能快速地取得設定值
參考資訊
文章作者 Yowko Tsai
上次更新 2021-11-05
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。