为什么状态存储失败并出现序列化问题?

6tqwzwtp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(467)

我使用kafka streams 1.1.0。
我创建了以下拓扑:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000001 (topics: [configurationTopicName])
      --> KTABLE-SOURCE-0000000002
    Processor: KTABLE-SOURCE-0000000002 (stores: [configurationTopicName-STATE-STORE-0000000000])
      --> KTABLE-MAPVALUES-0000000003
      <-- KSTREAM-SOURCE-0000000001
    Processor: KTABLE-MAPVALUES-0000000003 (stores: [configuration_store_application1])
      --> none
      <-- KTABLE-SOURCE-0000000002

代码如下:

case class Test(name: String, age: Int)
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
  Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
builder.table(configurationTopicName, Consumed.`with`(Serdes.String(), Serdes.String()))
  .someAdditionalTransformation
  .mapValues[Test](
      new ValueMapperWithKey[String, String, Test] {
         override def apply(readOnlyKey: String, value: String): Test = Test("aaa", 432)
      }, mal)

我想建立一个可查询的存储,以后可以用来查询它(检索过滤/转换的值)。
我使用 TopologyTestDriver 并引发以下异常:
原因:java.lang.classcastexception:com.example.kafka.streams.topology.test无法转换为org.apache.kafka.common.serialization.stringserializer.serialize(stringserializer)中的java.lang.string。java:28)在org.apache.kafka.streams.state.stateserdes.rawvalue(stateserdes。java:178)在org.apache.kafka.streams.state.internals.meteredkeyvaluebytesstore$1.innervalue(meteredkeyvaluebytesstore)。java:66)位于org.apache.kafka.streams.state.internals.meteredkeyvaluebytesstore$1.innervalue(meteredkeyvaluebytesstore)。java:57)位于org.apache.kafka.streams.state.internals.innermeteredkeyvaluestore.put(innermeteredkeyvaluestore。java:198)在org.apache.kafka.streams.state.internals.meteredkeyvaluebytesstore.put(meteredkeyvaluebytesstore。java:117)位于org.apache.kafka.streams.kstream.internals.ktablemapvalues$ktablemapvaluesprocessor.process(ktablemapvalues)。java:103)位于org.apache.kafka.streams.kstream.internals.ktablemapvalues$ktablemapvaluesprocessor.process(ktablemapvalues)。java:83) 在org.apache.kafka.streams.processor.internals.processornode$1.run(processornode。java:46)位于org.apache.kafka.streams.processor.internals.streamsmetricsimpl.measurelatencins(streamsmetricsimpl。java:208)在org.apache.kafka.streams.processor.internals.processornode.process(processornode。java:124)在org.apache.kafka.streams.processor.internals.abstractprocessorcontext.forward(abstractprocessorcontext。java:174)在org.apache.kafka.streams.kstream.internals.ktablefilter$ktablefilterprocessor.process(ktablefilter。java:89)在org.apache.kafka.streams.kstream.internals.ktablefilter$ktablefilterprocessor.process(ktablefilter。java:63)在org.apache.kafka.streams.processor.internals.processornode$1.run(processornode。java:46)位于org.apache.kafka.streams.processor.internals.streamsmetricsimpl.measurelatencins(streamsmetricsimpl。java:208)在org.apache.kafka.streams.processor.internals.processornode.process(processornode。java:124)在org.apache.kafka.streams.processor.internals.abstractprocessorcontext.forward(abstractprocessorcontext。java:174)在org.apache.kafka.streams.kstream.internals.forwardingcacheflushlistener.apply(forwardingcacheflushlistener。java:42)位于org.apache.kafka.streams.state.internals.cachingkeyvaluestore.putandmaybeforward(cachingkeyvaluestore)。java:101)在org.apache.kafka.streams.state.internals.cachingkeyvaluestore.access$000(cachingkeyvaluestore。java:38)在org.apache.kafka.streams.state.internals.cachingkeyvaluestore$1.apply(cachingkeyvaluestore。java:83)在org.apache.kafka.streams.state.internals.namedcache.flush(namedcache。java:142)在org.apache.kafka.streams.state.internals.namedcache.flush(namedcache。java:100)在org.apache.kafka.streams.state.internals.threadcache.flush(threadcache。java:127)在org.apache.kafka.streams.state.internals.cachingkeyvaluestore.flush(cachingkeyvaluestore。java:123)在org.apache.kafka.streams.state.internals.innermeteredkeyvaluestore.flush(innermeteredkeyvaluestore)。java:267)位于org.apache.kafka.streams.state.internals.meteredkeyvaluebytesstore.flush(meteredkeyvaluebytesstore)。java:149)在org.apache.kafka.streams.processor.internals.processorstatemanager.flush(processorstatemanager。java:244) ... 还有58个
你知道为什么,怎么修吗?

bvhaajcl

bvhaajcl1#

经过调查,我找到了上述例外的原因。
我已经创建了用于存储数据的materialized,但是我没有为key或value传递任何serdes。
如果没有通过任何测试,则使用默认值。对我来说是的 StringSerializer 我试着序列化 Test 使用stringserializer mea culpa初始化
通过塞德斯 .withValueSerde(GenericSerde[Test]) 只有在genericserdes是 org.apache.kafka.common.serialization.Serde ```
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
Materialized.asString, Test, KeyValueStore[Bytes, Array[Byte]]
.withValueSerde(GenericSerde[Test])

相关问题