我有一个kafka主题,我需要从中使用consumer读取json数据,但是根据数据中的特定标记,我需要将其存储在驱动器中的不同目录中。我如何做到这一点?在存储数据之前可以直接解析数据吗?
0ve6wy6x1#
不清楚这些目录将存在于何处,但您可以尝试将hdfs kafka connect(它将与hdfs兼容的文件系统(包括本地磁盘)一起使用)与 FieldPartitioner 指定Kafka记录中的哪些字段作为目录,格式为 /topic-dir/topic-name/field1=value/field2=value 要写入本地磁盘,请设置 store.url=file:///some/path 否则,spark、flink、常规kafka消费者等等,您可以自己使用这些数据并构建目录。在存储数据之前可以直接解析数据吗?取决于您如何使用数据。。。
FieldPartitioner
/topic-dir/topic-name/field1=value/field2=value
store.url=file:///some/path
1条答案
按热度按时间0ve6wy6x1#
不清楚这些目录将存在于何处,但您可以尝试将hdfs kafka connect(它将与hdfs兼容的文件系统(包括本地磁盘)一起使用)与
FieldPartitioner
指定Kafka记录中的哪些字段作为目录,格式为/topic-dir/topic-name/field1=value/field2=value
要写入本地磁盘,请设置store.url=file:///some/path
否则,spark、flink、常规kafka消费者等等,您可以自己使用这些数据并构建目录。在存储数据之前可以直接解析数据吗?
取决于您如何使用数据。。。