所以,这是我的代码,我需要添加一些属性吗?
一开始我认为这与分区有关,但后来发现有一种方法可以让kafka生产者使用更多线程。
有人能解释一下我是怎么做到的吗?
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "IdPartitioner")
var kafkaProducer = new KafkaProducer[String, String](props)
2条答案
按热度按时间ee7vknir1#
KafkaProducer
通过使用内部线程将消息发送到代理,是线程安全的。它确实保留了一个内部缓存来存储要发送的消息(由linger.ms
),这样的场景是可能的:线程1向生产者提交消息m1(target=topic1/partition1)
线程2向生产者提交消息m2(target=topic2/partition1)
假设:t1/p1和t2/p1都托管在同一个代理上
生产者的内部线程唤醒并在同一请求中发送它们
一般来说,它应该足以满足您的要求-您可以提交您的生产请求并行,他们将由Kafka自己组织。
sq1bmfud2#
原来有一种方法可以让Kafka制作人使用更多的线程
不是天生的,不是。你得做个决定
new Thread
或者使用一个更高级别的生成库来为您实现这一点。 Spring Kafka或阿克卡的Kafka支持可能是一个选择,看看。或者spark/flink/beam(因为你有hadoop)但是,多条消息已经在一批中发送,并且每个生成的记录都包含一个主题名和密钥,因此,即使是单个线程也会“并行”地生成多个可能的代理