我已经使用confluent-4.0.0使用hdfs连接器安装了kafka connect。我能够将从kafka主题接收的avro记录保存到配置单元。我想知道在写入hdfs接收器之前是否有任何方法来修改记录。我的要求是对记录的值做一些小的修改。例如,对整数执行算术运算或字符串操作等。请建议是否有任何方法来实现这一点
8yoxcaq71#
看看Kafka连接变压器[1]&[2]。您可以构建一个自定义转换器库并在连接器中使用它。[1] http://kafka.apache.org/documentation.html#connect_transforms [2] https://cwiki.apache.org/confluence/display/kafka/kip-66%3a+single+message+transforms+for+kafka+connect
yqhsw0fo2#
你有几个选择。单消息转换,您可以在这里看到。非常适合在消息通过connect时进行轻量级更改。基于配置文件,并可使用提供的api进行扩展(如果没有满足您需要的现有转换)。请参阅此处关于smt何时适用于给定需求的讨论。ksql是kafka的流式sql引擎。在将数据流发送到hdfs之前,可以使用它修改数据流。看看这个例子。ksql是基于kafka流的api构建的,它是一个java库,可以让您随心所欲地转换数据。下面是一个例子。
2条答案
按热度按时间8yoxcaq71#
看看Kafka连接变压器[1]&[2]。您可以构建一个自定义转换器库并在连接器中使用它。
[1] http://kafka.apache.org/documentation.html#connect_transforms [2] https://cwiki.apache.org/confluence/display/kafka/kip-66%3a+single+message+transforms+for+kafka+connect
yqhsw0fo2#
你有几个选择。
单消息转换,您可以在这里看到。非常适合在消息通过connect时进行轻量级更改。基于配置文件,并可使用提供的api进行扩展(如果没有满足您需要的现有转换)。
请参阅此处关于smt何时适用于给定需求的讨论。
ksql是kafka的流式sql引擎。在将数据流发送到hdfs之前,可以使用它修改数据流。看看这个例子。
ksql是基于kafka流的api构建的,它是一个java库,可以让您随心所欲地转换数据。下面是一个例子。