我们无法控制发送给Kafka的消息的生产者。但是,我们需要将这些消息的内容以约定的特定json数组结构复制到客户的kafka主题中。在哪一个阶段,我们如何干预以修改数据以匹配所需的结构?我们能在收到的消息在kafka集群上持久化之前修改它们吗?当我们将消息从kafka集群复制到客户的kafka集群时,是否应该修改消息?应该使用ksql来处理它吗?非常感谢你的帮助
kkih6yb81#
我认为,关于何时执行数据转换的问题更像是根据您的上下文应该回答的问题;但是下面的项目符号可以帮助你决定如何做。“我们能在收到的消息在kafka集群上持久化之前修改它们吗?”——虽然我知道您不能控制生产者,但您至少可以调整设置为 KafkaProducer 示例并注册一个producerinterceptor,它可以获取消息负载、执行反转换并发送到其他地方。这种方法的缺点是,您仍然需要更改生产者代码,并且您将向另一个集群插入一个callout,这可能会由于网络往返而降低吞吐量。我个人不建议这样做,但这是可能的。“当我们将消息从kafka群集复制到客户的kafka时,是否应该修改消息?”——是。我认为这可能是最好的方法,因为它可以确保在发生任何变异之前,数据最终被记录在集群中(因此您可以根据需要多次重放/重新处理)。为了实现这一点,您可以使用kafka connect,它提供了一组可插入的工具来执行消息转换——特别是单消息转换和转换器@rmoff写了一个很好的博客,解释了这是如何工作的:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained.“应该使用ksql来处理它吗?”——是的。我强烈建议使用ksqldb,因为它是数据流之上的流处理器。使用ksqldb,您可以很好地支持可能的转换语义,这种语义远远超出了简单的每记录/每聚合转换,并使您能够提出一种新的数据模型,用于分发下游处理器所期望的数据。这是我的建议。不管您选择哪种方法,如果您的想法是让数据在另一个不同于您的kafka集群中可用,那么使用mirrormaker或confluent replicator似乎是一个不错的选择。否则,您可以简单地利用kafka connect和一些专门的sink连接器从集群读取数据并发送到不需要kafka的目标系统。
KafkaProducer
1条答案
按热度按时间kkih6yb81#
我认为,关于何时执行数据转换的问题更像是根据您的上下文应该回答的问题;但是下面的项目符号可以帮助你决定如何做。
“我们能在收到的消息在kafka集群上持久化之前修改它们吗?”——虽然我知道您不能控制生产者,但您至少可以调整设置为
KafkaProducer
示例并注册一个producerinterceptor,它可以获取消息负载、执行反转换并发送到其他地方。这种方法的缺点是,您仍然需要更改生产者代码,并且您将向另一个集群插入一个callout,这可能会由于网络往返而降低吞吐量。我个人不建议这样做,但这是可能的。“当我们将消息从kafka群集复制到客户的kafka时,是否应该修改消息?”——是。我认为这可能是最好的方法,因为它可以确保在发生任何变异之前,数据最终被记录在集群中(因此您可以根据需要多次重放/重新处理)。为了实现这一点,您可以使用kafka connect,它提供了一组可插入的工具来执行消息转换——特别是单消息转换和转换器@rmoff写了一个很好的博客,解释了这是如何工作的:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained.
“应该使用ksql来处理它吗?”——是的。我强烈建议使用ksqldb,因为它是数据流之上的流处理器。使用ksqldb,您可以很好地支持可能的转换语义,这种语义远远超出了简单的每记录/每聚合转换,并使您能够提出一种新的数据模型,用于分发下游处理器所期望的数据。这是我的建议。
不管您选择哪种方法,如果您的想法是让数据在另一个不同于您的kafka集群中可用,那么使用mirrormaker或confluent replicator似乎是一个不错的选择。否则,您可以简单地利用kafka connect和一些专门的sink连接器从集群读取数据并发送到不需要kafka的目标系统。