kafka streams应用程序-计数和总和聚合

6ss1mwsb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(343)

我试图从kgroupedstream创建一个ktable来存储每个键的值的总和。

final StreamsBuilder builder = new StreamsBuilder();
 final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
            .groupByKey()
            .aggregate(new Initializer<Long>() {
                @Override
                public Long apply() {
                    return Long.MIN_VALUE;
                }
            }, new Aggregator<String, Long, Long>() {
                @Override
                public Long apply(final String key, final Long value,final Long aggregate) {
                    aggregate += value;
                    return aggregate;
                }
            }, Materialized.<String, Long, KeyValueStore<Byte, byte[]>>as("counts-store"));

但我得到了一个错误:

The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Materialized<String,Long,KeyValueStore<Byte,byte[]>>)

我看到的所有示例都将serde作为第三个参数传入,但我尝试了这个方法,并得到了一个非常类似的错误(我认为这可能来自旧版本,因为它与当前实现的签名不匹配?):

final StreamsBuilder builder = new StreamsBuilder();
    final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
            .groupByKey()
            .aggregate(new Initializer<Long>() {
                @Override
                public Long apply() {
                    return Long.MIN_VALUE;
                }
            }, new Aggregator<String, Long, Long>() {
                @Override
                public Long apply(final String key, final Long value,final Long aggregate) {
                    aggregate += value;
                    return aggregate;
                }
            }, Serdes.Long());

错误:

The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Serde<Long>)

我做错什么了?
使用Kafka版本:2.1.0

rdlzhqv9

rdlzhqv91#

代码中有几个问题:
为了 Materialized.as 相反 java.lang.Byte 你应该通过 org.apache.kafka.common.utils.Bytes 你不应该修改 final 变量: aggregate += value; 必须将键和值的类型添加到 StreamsBuilder::stream 呼叫( builder.<String, Long>stream("streams-plaintext-input") )
修改后,大致如下所示:

KTable<String, Long> sum = builder.<String, Long>stream("streams-plaintext-input")
        .groupByKey()
        .aggregate(new Initializer<Long>() {
            @Override
            public Long apply() {
                return Long.MIN_VALUE;
            }
        }, new Aggregator<String, Long, Long>() {
            @Override
            public Long apply(final String key, final Long value,final Long aggregate) {
                return aggregate + value;
            }
        }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));

相关问题