脚本:我正在将数据json对象数据写入kafka主题中,而我只想根据消息中的值读取一组特定的消息。我正在使用kafka python库。示例消息:
{flow_status: "completed", value: 1, active: yes} {flow_status:"failure",value 2, active:yes}
在这里,我想只读消息流的状态为完成。
qc6wkl3g1#
您可以创建两个不同的主题;一个表示已完成,另一个表示故障状态。然后从完成的主题中读取消息来处理它们。否则,如果你想把它们放在一个单独的主题中,并且只想阅读完整的主题,我相信你需要全部阅读它们,并使用一个简单的if-else条件忽略失败的主题。
iqjalb3h2#
kafka消费者不支持这种功能。您必须按顺序使用所有事件,过滤掉状态已完成的事件并将其放在某个位置。相反,您可以考虑使用kafka streams应用程序,在该应用程序中,您可以将数据作为流读取,并过滤flow_status=“completed”并在某个输出主题或某个其他目标中发布的事件。例子:
KStream<String,JsonNode> inputStream= builder.stream(inputTopic); KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));
p、 s.kafka没有针对kstream的PythonAPI的官方版本,但有一个开源项目:https://github.com/wintoncode/winton-kafka-streams
gzjq41n43#
在Kafka是不可能这样做的。使用者从最近提交的偏移量开始(或从开始,或在特定偏移量处查找)一个接一个地消耗消息。根据您的用例,在您的场景中可能会有不同的流程:执行流程的消息进入主题,然后处理操作的应用程序将结果(完成的或失败的)写入两个不同的主题:这样,您就可以将所有的完成从失败中分离出来。另一种方法是使用kafka streams应用程序进行过滤,但考虑到它只是一个糖块,实际上streams应用程序将始终读取所有消息,但允许您轻松地过滤消息。
ghhaqwfi4#
到目前为止,无法在broker端实现此功能,有一个jira功能请求对apache kafka开放,以实现此功能,您可以在此处跟踪,我希望他们在不久的将来实现此功能:https://issues.apache.org/jira/browse/kafka-6020我觉得最好的方法是使用recordfilterstrategy(java/spring)接口,并在消费端对其进行过滤。
4条答案
按热度按时间qc6wkl3g1#
您可以创建两个不同的主题;一个表示已完成,另一个表示故障状态。然后从完成的主题中读取消息来处理它们。
否则,如果你想把它们放在一个单独的主题中,并且只想阅读完整的主题,我相信你需要全部阅读它们,并使用一个简单的if-else条件忽略失败的主题。
iqjalb3h2#
kafka消费者不支持这种功能。您必须按顺序使用所有事件,过滤掉状态已完成的事件并将其放在某个位置。相反,您可以考虑使用kafka streams应用程序,在该应用程序中,您可以将数据作为流读取,并过滤flow_status=“completed”并在某个输出主题或某个其他目标中发布的事件。
例子:
p、 s.kafka没有针对kstream的PythonAPI的官方版本,但有一个开源项目:https://github.com/wintoncode/winton-kafka-streams
gzjq41n43#
在Kafka是不可能这样做的。使用者从最近提交的偏移量开始(或从开始,或在特定偏移量处查找)一个接一个地消耗消息。根据您的用例,在您的场景中可能会有不同的流程:执行流程的消息进入主题,然后处理操作的应用程序将结果(完成的或失败的)写入两个不同的主题:这样,您就可以将所有的完成从失败中分离出来。另一种方法是使用kafka streams应用程序进行过滤,但考虑到它只是一个糖块,实际上streams应用程序将始终读取所有消息,但允许您轻松地过滤消息。
ghhaqwfi4#
到目前为止,无法在broker端实现此功能,有一个jira功能请求对apache kafka开放,以实现此功能,您可以在此处跟踪,我希望他们在不久的将来实现此功能:https://issues.apache.org/jira/browse/kafka-6020
我觉得最好的方法是使用recordfilterstrategy(java/spring)接口,并在消费端对其进行过滤。