使用flink将java数据集转换为kafka?有可能吗

hs1ihplo  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(382)

我有一个用例,我需要把记录从Hive移到Kafka。我找不到一种方法,我可以直接添加一个KafkaFlume到flink数据集。因此,我使用了一种变通方法,在flink数据集上调用map转换,在map函数中,对给定的记录使用kafkaproducer.send()命令。
我面临的问题是,我没有任何方法在每个工作节点上执行kafkaproducer.flush(),因此用kafka编写的记录数总是略小于数据集中的记录数。
有没有优雅的方法来处理这个问题?我能不能在Flink的数据集中加一个KafkaFlume?或者调用kafkaproducer.flush()作为终结器的方法?

mfuanj7w

mfuanj7w1#

你可以简单地创建一个 Sink 这将使用 KafkaProducer 在引擎盖下并将数据写入Kafka。

相关问题