你能帮我配置一个基于kafka的spring云流应用吗?我在selectkey操作上遇到了问题。
让我们来解释一下我试图达到的2个传入主题person,refgenre person包含refgenre的键(in value)
public class Person {
String nom;
String prenom;
String codeGenre; <<--- here is the key of the second topic refgenre
}
所以我使用selectkey操作符在join操作之前准备我的流。
使用selectbykey(my-app-kstream-key-select-0000000004-repartition)创建一个新主题,然后发生序列化问题:
线程“my-app-3c57b31c-28e5-4199-b07d-87f8940425ab-streamthread-1”org.apache.kafka.streams.errors.streamsexception:生成主题my-app-kstream-key-select-0000000004-repartition的数据时发生classcastexception异常。序列化程序(键:org.apache.kafka.common.serialization.stringserializer/value:statefull.serde.personwithgenresorde)与实际的键或值类型(键类型:java.lang.string/value类型:statefull.model.person)不兼容。更改streamconfig中的默认serdes或通过方法参数提供正确的serdes(例如,如果使用dsl, #to(String topic, Produced<K, V> produced)
与 Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))
).
在哪里可以指定此重新分区主题的serde,以及可以指定此“内部”主题的名称?
@Bean
public BiFunction<KStream<String, Person>, KTable<String, ReferentielGenre>, KStream<Long, PersonWithGenre>> joinKtable() {
return (persons, referentielGenres) ->
persons.selectKey((k,v) -> v.getCodeGenre())
.join(referentielGenres,
(person, genre) -> new PersonWithGenre(person.getNom(), person.getPrenom(),genre),
Joined.with(Serdes.String(), new PersonWithGenreSerde(), null));
}
以下是我不工作的全部代码:https://github.com/yohanalard/joinkstream
有没有更好的方法来处理这个用例?
暂无答案!
目前还没有任何答案,快来回答吧!