现在我的Kafka制作人正在将所有的消息下沉到一个Kafka主题的一个分区中,这个分区实际上有多个分区。
如何创建使用默认分区器并在主题的不同分区之间分发消息的生产者。
我的Kafka制作人的代码片段:
Properties props = new Properties();
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap.servers);
props.put(ProducerConfig.ACKS_CONFIG, "all");
我用FlinkKafka生产者下沉Kafka主题的信息。
speStream.addSink(
new FlinkKafkaProducer011(kafkaTopicName,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
props,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)
2条答案
按热度按时间nnt7mjpx1#
通过将flinkproducer改为
spestream.addsink(新的flinkkafkaproducer011(kafkatopicname,新的simplestringschema(),props));
tv6aics12#
使用默认分区器,消息将使用以下逻辑分配一个分区:
密钥消息:生成密钥的散列,并基于此选择分区。这意味着具有相同密钥的消息将在同一分区上结束
未知消息:循环用于分配分区
解释您看到的行为的一个选项是,如果您对所有消息使用相同的键,那么使用默认的分区器,它们将在同一个分区上结束。