kafkaconnect:如何解析要Map的字符串

fzsnzjdm  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(398)

假设我有一个文件 JSON 由新行字符分隔的对象/行( \n ). 当基于filestreamsource的连接器读取此文件时,它会将每一行视为 java.lang.String .
我们怎么能分析这个呢 java.lang.Stringjava.util.Map 或结构以执行进一步的转换(例如,使用maskfield屏蔽字段或使用extractfield提取字段)?
ps:问题不在于如何解析一些 java.lang.Stringjava.util.Map 或者struct,但是关于如何将这种解析逻辑与kafka(一个自定义的kafka转换?)集成,或者如何通过其他方式(例如,在kafka中配置某个东西或使用特定的连接器/转换等)获得相同的结果

72qzrwbm

72qzrwbm1#

正如Apache·Kafka的文档所说, FileStreamSource 不完全是生产支持的连接器。。。
也许您最好使用spooldir连接器,它支持行分隔的jsonhttps://github.com/jcustenborder/kafka-connect-spooldir/blob/master/readme.md

ncecgwcz

ncecgwcz2#

有两种可能的方法:
您可以使用confluent平台并使用适当的ksql查询运行连接器(https://docs.confluent.io/current/ksql/docs/tutorials/index.html#ksql-教程)。
你可以启动一个Kafka流应用程序(https://kafka.apache.org/documentation/streams/)以及一个源连接器。流应用程序将从连接器放置消息的主题/-s中读取消息。您需要在kafka流应用程序中实现转换逻辑。当消息被处理时,流应用程序将其放入一个输出主题。下面是流应用程序代码的示例结构。

Properties props = new Properties();

...

final StreamsBuilder builder = new StreamsBuilder();
Pattern pattern = Pattern.compile(<YOUR_INPUT_TOPIC_PATTERN>);
KStream<String, String> source = builder.stream(pattern);

...

source.mapValues((k,v) -> {
     Gson gson = new Gson();
     Map map = gson.fromJson(v, Map.class);

     // here is your transformation logi

     return v;
}).to(<YOUR_OUTPUT_TOPIC>);

...

final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);

...

streams.start();

相关问题