我必须创建一个包含成功和失败计数的记录,因为我已经创建了两个流分支,并为每个流创建了一个记录,但是我已经将其合并,以获得包含来自两个流的组合数据的新记录。
@Component
public class PageViewEventProcessor {
private final Log log = LogFactory.getLog(getClass());
@Bean
public Function<KStream<String, RequestMD>, KStream<String, Response>> process() {
return input -> {
KStream<String, Response> SuccessResponseStream =
input
.filter((key, value) -> value.getHeaders().getStatusCode().equalsIgnoreCase("200"))
.selectKey((key, value) -> value.getHeaders().getOperation())
.map((key, value) -> new KeyValue<>(value.getHeaders().getOperation(), "value"))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(30)))
.count(Materialized.as(Constants.ALL_STATUS_NEW))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map(
(key, value) ->
new KeyValue<>(
key.key(),
new Response(
key.key(),
new Date(key.window().start()),
new Date(key.window().end()),
value,
value,
(double) ((value / value) * 100),
(double) ((value / value) * 100),
0L,
0.0)));
KStream<String, Response> AllOperations =
input
.selectKey((key, value) -> value.getHeaders().getOperation())
.map((key, value) -> new KeyValue<>(value.getHeaders().getOperation(), "value"))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(30)))
.count(Materialized.as(Constants.ALL_STATUS))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map(
(key, value) ->
new KeyValue<>(
key.key(),
new Response(
key.key(),
new Date(key.window().start()),
new Date(key.window().end()),
value,
value,
(double) ((value / value) * 100),
(double) ((value / value) * 100),
0L,
0.0)));
return SuccessResponseStream.toTable()
.join(AllOperations.toTable(), merge())
.toStream()
.peek((key, value) -> log.info(key + value));
};
}
public static <K extends Response, V extends Response> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
if (v1.getService().equals(v2.getService())) {
v1.setSuccessCount(v2.getSuccessCount());
}
return v1;
};
}
}
在执行上述代码时,我得到下面的错误
org.apache.kafka.streams.errors.streamsexception:向接收器主题生成数据时发生classcastexception。序列化程序(key:org.apache.kafka.common.serialization.stringserializer/value:org.apache.kafka.common.serialization.bytearrayserializer)与实际的键或值类型(key type:java.lang.string/value type:com.config.response)不兼容。更改streamconfig中的默认serdes或通过方法参数提供正确的serdes(例如,如果使用dsl, #to(String topic, Produced<K, V> produced)
与 Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))
). 在org.apache.kafka.streams.processor.internals.sinknode.process(sinknode。java:97)~[kafka-streams-2.5.1。jar:na]在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:201)~[kafka-streams-2.5.1。jar:na]在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:180)~[kafka-streams-2.5.1。jar:na]在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:133)~[kafka-streams-2.5.1。jar:na]在org.apache.kafka.streams.kstream.internals.kstreamfilter$kstreamfilterprocessor.process(kstreamfilter)。java:43)~[kafka-streams-2.5.1。jar:na]
暂无答案!
目前还没有任何答案,快来回答吧!