如何基于过滤器将csv数据拆分为两个不同的kafka主题

soat7uwm  于 2021-06-29  发布在  Java
关注(0)|答案(3)|浏览(416)

我有一个Kafka主题客户,其中我流式传输了一个csv文件。现在有没有可能在Kafka的基础上转换数据

计数。例如,如果计数小于20,则将其发送到主题a;如果计数大于20,则将其发送到主题b
我是新来Kafka,我试图这样,但它不工作

builder.stream("Customer")
            .groupByKey()
            .count()
            .toStream()
            .filter((k,v)-> String(v) > 20)
            .to("test_A");

这个代码是错误的,我很确定,但请任何人能帮我一下吗

qhhrdooz

qhhrdooz1#

您现在要做的是根据一个条件仅将一部分记录发送到一个主题,而放弃其他主题。如果要将一部分发送到一个主题,将另一部分发送到另一个主题,则应使用分支运算符。
像这样:

KStream<K, Long>[] branches = builder.stream("Customer")
        .groupByKey()
        .count()
        .toStream()
        .branch((k, v) -> v > 20),
                (k, v) -> v <= 20);
branches[0].to("topicB");
branches[1].to("topicA");

另一件事你应该注意的是比较必须在数字之间,而不是字符串之间,因为当你对字符串使用更大的比较器时,你是在比较字符串的长度和字典顺序

hfwmuf9z

hfwmuf9z2#

简单的解决方案是创建一个producer,一次从输入的csv文件中读取一行,并根据计数条件将其发送到特定的主题。

psuedo code:
while no row left in csv:
    row = readrow()
    if row.counts<20
       producer.send(topicA,row)
    else
       producer.send(topicB,row)

你可以跟着https://towardsdatascience.com/kafka-python-explained-in-10-lines-of-code-800e3e07dad1 为了便于理解Kafka和python

klh5stk1

klh5stk13#

StreamsBuilder builder=new StreamsBuilder();

    KStream<String, String> inputTopic = builder.stream("Trans_Topic");

            inputTopic
                   .filter((k,v)->{
                       return Long.parseLong(v.split(",")[6]) < 20L; //6 is the column id.
                   }).to("trans_topic_result_1");
    inputTopic
            .filter((k,v)->{
                return Long.parseLong(v.split(",")[6]) > 20L;
            }).to("trans_topic_result_2");

相关问题