无法通过springboot中的某些操作将kafka流合并为单个流

pqwbnv8z  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(175)

我必须创建一个包含成功和失败计数的记录,因为我已经创建了两个流分支,并为每个流创建了一个记录,但是我已经将其合并,以获得包含来自两个流的组合数据的新记录。

@Component
public class PageViewEventProcessor {

  private final Log log = LogFactory.getLog(getClass());

  @Bean
  public Function<KStream<String, RequestMD>, KStream<String, Response>> process() {
    return input -> {
      KStream<String, Response> SuccessResponseStream =
          input
              .filter((key, value) -> value.getHeaders().getStatusCode().equalsIgnoreCase("200"))
              .selectKey((key, value) -> value.getHeaders().getOperation())
              .map((key, value) -> new KeyValue<>(value.getHeaders().getOperation(), "value"))
              .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
              .windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(30)))
              .count(Materialized.as(Constants.ALL_STATUS_NEW))
              .suppress(Suppressed.untilWindowCloses(unbounded()))
              .toStream()
              .map(
                  (key, value) ->
                      new KeyValue<>(
                          key.key(),
                          new Response(
                              key.key(),
                              new Date(key.window().start()),
                              new Date(key.window().end()),
                              value,
                              value,
                              (double) ((value / value) * 100),
                              (double) ((value / value) * 100),
                              0L,
                              0.0)));

      KStream<String, Response> AllOperations =
          input
              .selectKey((key, value) -> value.getHeaders().getOperation())
              .map((key, value) -> new KeyValue<>(value.getHeaders().getOperation(), "value"))
              .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
              .windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(30)))
              .count(Materialized.as(Constants.ALL_STATUS))
              .suppress(Suppressed.untilWindowCloses(unbounded()))
              .toStream()
              .map(
                  (key, value) ->
                      new KeyValue<>(
                          key.key(),
                          new Response(
                              key.key(),
                              new Date(key.window().start()),
                              new Date(key.window().end()),
                              value,
                              value,
                              (double) ((value / value) * 100),
                              (double) ((value / value) * 100),
                              0L,
                              0.0)));

      return SuccessResponseStream.toTable()
          .join(AllOperations.toTable(), merge())
          .toStream()
          .peek((key, value) -> log.info(key + value));
    };
  }

  public static <K extends Response, V extends Response> ValueJoiner<K, V, K> merge() {
    return (v1, v2) -> {
      if (v1.getService().equals(v2.getService())) {
        v1.setSuccessCount(v2.getSuccessCount());
      }
      return v1;
    };
  }
}

在执行上述代码时,我得到下面的错误
org.apache.kafka.streams.errors.streamsexception:向接收器主题生成数据时发生classcastexception。序列化程序(key:org.apache.kafka.common.serialization.stringserializer/value:org.apache.kafka.common.serialization.bytearrayserializer)与实际的键或值类型(key type:java.lang.string/value type:com.config.response)不兼容。更改streamconfig中的默认serdes或通过方法参数提供正确的serdes(例如,如果使用dsl, #to(String topic, Produced<K, V> produced)Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)) ). 在org.apache.kafka.streams.processor.internals.sinknode.process(sinknode。java:97)~[kafka-streams-2.5.1。jar:na]在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:201)~[kafka-streams-2.5.1。jar:na]在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:180)~[kafka-streams-2.5.1。jar:na]在org.apache.kafka.streams.processor.internals.processorcontextimpl.forward(processorcontextimpl。java:133)~[kafka-streams-2.5.1。jar:na]在org.apache.kafka.streams.kstream.internals.kstreamfilter$kstreamfilterprocessor.process(kstreamfilter)。java:43)~[kafka-streams-2.5.1。jar:na]

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题