如何更改@kafkastreamsstatestore的默认serdes?我知道在kafka stream cloud的新版本3.0.1中,方法如下:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.1.release/reference/html/spring-cloud-stream-binder-kafka.html#_state_store. 但是由于我使用的是2.1.12,请你帮我举一些代码示例。我找了很多地方都没找到。
@kafkastreamsstatestore(name=dedup\u store,type=kafkastreamsstatestoreproperties.storetype.keyvalue,keyserde=“????”,valueserde=“????”)这也没用。
https://www.bountysource.com/issues/87943127-consider-changing-the-default-serdes-of-kafkastreamsstatestore
我试过:
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "VALUE_SERDE", valueSerde = "VALUE_SERDE")
public class CustomSerde {
static public final class CustomSerdes extends WrapperSerde<Entity> {
public CustomSerdes () {
super(new JsonPOJOSerializer<Entity>(), new JsonPOJODeserializer<Entity>());
}
}
}
public static final String VALUE_SERDE = "CustomSerde$CustomSerdes";
public class JsonPOJODeserializer<T> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
/**
* Default constructor needed by Kafka
*/
public JsonPOJODeserializer() {
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}
@Override
public T deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;
T data;
try {
data = objectMapper.readValue(bytes, tClass);
} catch (Exception e) {
throw new SerializationException(e);
}
return data;
}
@Override
public void close() {
}
}
public class JsonPOJOSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
/**
* Default constructor needed by Kafka
*/
public JsonPOJOSerializer() {
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}
@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON
message", e);
}
}
@Override
public void close() {
}
}
但它不起作用。请告知。
2条答案
按热度按时间f0ofjuux1#
找到了一个帮助我的解决方案:
ffdz8vbo2#
在上面的代码段中,您有:
到底是什么
VALUE_SERDE
? 是不是在实施Serde
接口?只要是执行适当的Serde
接口,应该可以。活页夹把这个值降低到StoreBuilder
内部。你有什么错误吗?如果你仍然面临问题,请与我们分享一个小样本应用程序,我们可以进一步研究。