我试图从kgroupedstream创建一个ktable来存储每个键的值的总和。
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
.groupByKey()
.aggregate(new Initializer<Long>() {
@Override
public Long apply() {
return Long.MIN_VALUE;
}
}, new Aggregator<String, Long, Long>() {
@Override
public Long apply(final String key, final Long value,final Long aggregate) {
aggregate += value;
return aggregate;
}
}, Materialized.<String, Long, KeyValueStore<Byte, byte[]>>as("counts-store"));
但我得到了一个错误:
The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Materialized<String,Long,KeyValueStore<Byte,byte[]>>)
我看到的所有示例都将serde作为第三个参数传入,但我尝试了这个方法,并得到了一个非常类似的错误(我认为这可能来自旧版本,因为它与当前实现的签名不匹配?):
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
.groupByKey()
.aggregate(new Initializer<Long>() {
@Override
public Long apply() {
return Long.MIN_VALUE;
}
}, new Aggregator<String, Long, Long>() {
@Override
public Long apply(final String key, final Long value,final Long aggregate) {
aggregate += value;
return aggregate;
}
}, Serdes.Long());
错误:
The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Serde<Long>)
我做错什么了?
使用Kafka版本:2.1.0
1条答案
按热度按时间rdlzhqv91#
代码中有几个问题:
为了
Materialized.as
相反java.lang.Byte
你应该通过org.apache.kafka.common.utils.Bytes
你不应该修改final
变量:aggregate += value;
必须将键和值的类型添加到StreamsBuilder::stream
呼叫(builder.<String, Long>stream("streams-plaintext-input")
)修改后,大致如下所示: