我有一个用例,我需要把记录从Hive移到Kafka。我找不到一种方法,我可以直接添加一个KafkaFlume到flink数据集。因此,我使用了一种变通方法,在flink数据集上调用map转换,在map函数中,对给定的记录使用kafkaproducer.send()命令。
我面临的问题是,我没有任何方法在每个工作节点上执行kafkaproducer.flush(),因此用kafka编写的记录数总是略小于数据集中的记录数。
有没有优雅的方法来处理这个问题?我能不能在Flink的数据集中加一个KafkaFlume?或者调用kafkaproducer.flush()作为终结器的方法?
1条答案
按热度按时间mfuanj7w1#
你可以简单地创建一个
Sink
这将使用KafkaProducer
在引擎盖下并将数据写入Kafka。