假设我的制作人正在将消息写入主题a…一旦消息位于主题a中,我就要将相同的消息复制到主题b。这在Kafka有可能吗?
ezykj2lf1#
将一个主题的内容转发给另一个主题有两个即时选项:利用kafka的流特性在两个主题之间建立转发链接。通过创建一个消费者/生产者对,并使用它们来接收和转发消息我有一段简短的代码(在scala中)显示了这两个方面:
def topologyPlan(): StreamsBuilder = { val builder = new StreamsBuilder val inputTopic: KStream[String, String] = builder.stream[String, String]("topic2") inputTopic.to("topic3") builder } def run() = { val kafkaStreams = createStreams(topologyPlan()) kafkaStreams.start() val kafkaConsumer = createConsumer() val kafkaProducer = createProducer() kafkaConsumer.subscribe(List("topic1").asJava) while (true) { val record = kafkaConsumer.poll(Duration.ofSeconds(5)).asScala for (data <- record.iterator) { kafkaProducer.send(new ProducerRecord[String, String]("topic2", data.value())) } } }
在run方法中,前两行设置了一个streams对象,该对象使用topologyplan()侦听“topic2”中的消息,然后转发到“topic3”。剩下的几行显示了消费者如何收听“topic1”并使用制作人将其发送到“topic2”。这里示例的最后一点是,kafka非常灵活,允许您根据需要混合选项,因此上面的代码将在“topic1”中接收消息,并通过“topic2”将它们发送到“topic3”。如果您想查看设置使用者、生产者和流的代码,请参阅此处的完整类。
nbysray52#
如果我理解正确,你只需要 stream.to("topic-b") 不过,如果不对数据做些什么,这似乎很奇怪。注:指定的主题应在使用前手动创建
stream.to("topic-b")
svmlkihl3#
我不清楚通过简单地将数据从一个主题复制到另一个主题,您到底想要实现什么用例。如果两个主题都在同一个kafka集群中,那么让两个主题具有相同的消息/内容永远不是一个好主意。我相信这里的差距是,你可能不清楚Kafka的消费群体的概念。通过使用Kafka主题的信息,您可能有两个动作项要做。您认为,如果第一个应用程序使用来自kafka主题的消息,那么第二个应用程序是否可以使用相同的消息。kafka允许您在消费者群体的帮助下解决这种常见用例。让我们尝试区分其他消息队列和kafka,您将了解不需要在两个主题之间复制相同的数据/消息。在其他消息队列中,如sqs(简单队列服务),如果消息被使用者使用,则同一消息不可被其他使用者使用。消费者有责任在处理完信息后安全地删除信息。通过这样做,我们可以保证相同的消息不会被两个使用者处理而导致不一致。但是,在Kafka,有多组消费者从同一主题消费是完全好的。这组消费者形成了一个群体,通常称为消费者群体。在这里,消费者组中的一个消费者可以根据消息所使用的kafka主题的分区来处理消息。现在的关键是,我们可以有多个消费群体从同一个Kafka主题消费。每个消费群体都将以他们想要的方式处理消息。两个不同消费群体的消费者之间没有干扰。为了实现您的用例,我相信您可能需要两个消费者组,他们可以简单地以他们想要的方式处理消息。基本上不必在两个主题之间复制数据。希望这有帮助。
3条答案
按热度按时间ezykj2lf1#
将一个主题的内容转发给另一个主题有两个即时选项:
利用kafka的流特性在两个主题之间建立转发链接。
通过创建一个消费者/生产者对,并使用它们来接收和转发消息
我有一段简短的代码(在scala中)显示了这两个方面:
在run方法中,前两行设置了一个streams对象,该对象使用topologyplan()侦听“topic2”中的消息,然后转发到“topic3”。
剩下的几行显示了消费者如何收听“topic1”并使用制作人将其发送到“topic2”。
这里示例的最后一点是,kafka非常灵活,允许您根据需要混合选项,因此上面的代码将在“topic1”中接收消息,并通过“topic2”将它们发送到“topic3”。
如果您想查看设置使用者、生产者和流的代码,请参阅此处的完整类。
nbysray52#
如果我理解正确,你只需要
stream.to("topic-b")
不过,如果不对数据做些什么,这似乎很奇怪。注:
指定的主题应在使用前手动创建
svmlkihl3#
我不清楚通过简单地将数据从一个主题复制到另一个主题,您到底想要实现什么用例。如果两个主题都在同一个kafka集群中,那么让两个主题具有相同的消息/内容永远不是一个好主意。
我相信这里的差距是,你可能不清楚Kafka的消费群体的概念。通过使用Kafka主题的信息,您可能有两个动作项要做。您认为,如果第一个应用程序使用来自kafka主题的消息,那么第二个应用程序是否可以使用相同的消息。kafka允许您在消费者群体的帮助下解决这种常见用例。
让我们尝试区分其他消息队列和kafka,您将了解不需要在两个主题之间复制相同的数据/消息。
在其他消息队列中,如sqs(简单队列服务),如果消息被使用者使用,则同一消息不可被其他使用者使用。消费者有责任在处理完信息后安全地删除信息。通过这样做,我们可以保证相同的消息不会被两个使用者处理而导致不一致。
但是,在Kafka,有多组消费者从同一主题消费是完全好的。这组消费者形成了一个群体,通常称为消费者群体。在这里,消费者组中的一个消费者可以根据消息所使用的kafka主题的分区来处理消息。
现在的关键是,我们可以有多个消费群体从同一个Kafka主题消费。每个消费群体都将以他们想要的方式处理消息。两个不同消费群体的消费者之间没有干扰。
为了实现您的用例,我相信您可能需要两个消费者组,他们可以简单地以他们想要的方式处理消息。基本上不必在两个主题之间复制数据。
希望这有帮助。