kafka streams ktable ktable join在输出主题中不发出结果

00jrzges  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(467)

我正在尝试一个非常简单的ktable to ktable加入scala。两个主题都有一个分区,但我仍然没有看到任何关于“输出主题”的内容

object SimpleMerge extends App {
  val properties = KafkaProperties.get()

  val streamBuilder = new StreamsBuilder()

  val objectMetadataTable: KTable[Long, String] = streamBuilder.table("metadata-topic")

  val objectClassificationTable: KTable[Long, String] = streamBuilder.table("classification-topic")

  val objectClassificationWithMetadata: KTable[Long, String] = objectMetadataTable
    .join(objectClassificationTable, (metadata: String, classification: String) => metadata + classification)

  objectClassificationWithMetadata.toStream().to("output-topic")

  val kafkaStreams = new KafkaStreams(streamBuilder.build(), properties)
  kafkaStreams.cleanUp()
  kafkaStreams.start()

  Runtime.getRuntime.addShutdownHook(new Thread(() => {
    kafkaStreams.close()
  }))
}

我相信这是非常基本的,但我找不到任何线索。注意:我可以看到两个输入主题都使用正确的相同键生成结果,所以空键的问题已经被排除了。
谢谢
例外情况

02:26:57.936 [MetadataClassificationMerger_1-757746d5-32e5-412b-934d-d29c1fc8c7a9-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - stream-thread [MetadataClassificationMerger_1-757746d5-32e5-412b-934d-d29c1fc8c7a9-StreamThread-1] task [0_0] Failed to flush state store metadata-topic-STATE-STORE-0000000000: 
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: [B, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:122) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:333) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:276) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:582) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:536) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:524) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:529) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:959) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:813) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-streams-2.4.0.jar:?]
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
    at com.pan.pcs.dlp.kafkamerger.MergeObjectMetadataClassification$$anon$1.apply(MergeObjectMetadataClassification.scala:19) ~[classes/:?]
    at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.process(KTableKTableInnerJoin.java:110) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.process(KTableKTableInnerJoin.java:67) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-2.4.0.jar:?]
    ... 24 more
6kkfgxo0

6kkfgxo01#

默认情况下 ByteArraySerdes 并且错误消息指示您的程序仍然使用这些 LongSerde 钥匙和 StringSerde 对于值。
如果您使用官方的scalaapi并加载相应的显式表达式,那么应该自动使用正确的serdes。否则,您需要通过 StreamsConfig 通过传入 Consumed.with(...) 进入 table(...) 操作员。
查看文档了解详细信息:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kstreams-scala的dsl

相关问题