我需要根据另一个Kafka主题中的一些事件将数据从配置单元导出到Kafka主题。我知道我可以使用hql从spark job中的hive读取数据,然后从spark将其写入kafka,但是有更好的方法吗?
fwzugrvs1#
这可以通过使用非结构化流来实现。步骤如下:创建一个连接到所需主题并获取所需数据导出信息的spark流作业。从流中,收集并获取驱动程序变量中的数据导出需求。使用指定的条件创建Dataframe使用kafkautils将Dataframe写入所需主题。根据数据量和kafka写吞吐量提供轮询间隔。
omjgkv6w2#
通常情况下,您可以采用另一种方式(从kafka到hdfs/hive)。但是欢迎您尝试使用kafka connect jdbc插件按计划从配置单元表中读取数据,这会将行转换为结构化的键值kafka消息。否则,我会重新评估其他工具,因为Hive是缓慢的。couchbase或cassandra为Kafka提供了更好的cdc功能。或者重新编写插入到hive中的上游应用程序,而不是立即编写到kafka中,例如,您可以从中加入其他主题。
2条答案
按热度按时间fwzugrvs1#
这可以通过使用非结构化流来实现。步骤如下:
创建一个连接到所需主题并获取所需数据导出信息的spark流作业。
从流中,收集并获取驱动程序变量中的数据导出需求。
使用指定的条件创建Dataframe
使用kafkautils将Dataframe写入所需主题。
根据数据量和kafka写吞吐量提供轮询间隔。
omjgkv6w2#
通常情况下,您可以采用另一种方式(从kafka到hdfs/hive)。
但是欢迎您尝试使用kafka connect jdbc插件按计划从配置单元表中读取数据,这会将行转换为结构化的键值kafka消息。
否则,我会重新评估其他工具,因为Hive是缓慢的。couchbase或cassandra为Kafka提供了更好的cdc功能。或者重新编写插入到hive中的上游应用程序,而不是立即编写到kafka中,例如,您可以从中加入其他主题。