如何更改@kafkastreamsstatestore kafka stream cloud的默认serdes

camsedfj  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(369)

如何更改@kafkastreamsstatestore的默认serdes?我知道在kafka stream cloud的新版本3.0.1中,方法如下:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.1.release/reference/html/spring-cloud-stream-binder-kafka.html#_state_store. 但是由于我使用的是2.1.12,请你帮我举一些代码示例。我找了很多地方都没找到。
@kafkastreamsstatestore(name=dedup\u store,type=kafkastreamsstatestoreproperties.storetype.keyvalue,keyserde=“????”,valueserde=“????”)这也没用。
https://www.bountysource.com/issues/87943127-consider-changing-the-default-serdes-of-kafkastreamsstatestore
我试过:

@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "VALUE_SERDE", valueSerde = "VALUE_SERDE")

public class CustomSerde {
    static public final class CustomSerdes extends WrapperSerde<Entity> {
        public CustomSerdes () {
            super(new JsonPOJOSerializer<Entity>(), new JsonPOJODeserializer<Entity>());
        }

    }
}

    public static final String VALUE_SERDE = "CustomSerde$CustomSerdes";

    public class JsonPOJODeserializer<T> implements Deserializer<T> {
        private ObjectMapper objectMapper = new ObjectMapper();

        private Class<T> tClass;

        /**
         * Default constructor needed by Kafka
         */
        public JsonPOJODeserializer() {
        }

        @SuppressWarnings("unchecked")
        @Override
        public void configure(Map<String, ?> props, boolean isKey) {
            tClass = (Class<T>) props.get("JsonPOJOClass");
        }

        @Override
        public T deserialize(String topic, byte[] bytes) {
            if (bytes == null)
                return null;

            T data;
            try {
                data = objectMapper.readValue(bytes, tClass);
            } catch (Exception e) {
                throw new SerializationException(e);
            }

            return data;
        }

        @Override
        public void close() {

        }
    }

    public class JsonPOJOSerializer<T> implements Serializer<T> {
        private final ObjectMapper objectMapper = new ObjectMapper();

        private Class<T> tClass;

        /**
         * Default constructor needed by Kafka
         */
        public JsonPOJOSerializer() {

        }
        @SuppressWarnings("unchecked")
        @Override
        public void configure(Map<String, ?> props, boolean isKey) {
            tClass = (Class<T>) props.get("JsonPOJOClass");
        }

        @Override
        public byte[] serialize(String topic, T data) {
            if (data == null)
                return null;

            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (Exception e) {
                throw new SerializationException("Error serializing JSON 
               message", e);
            }
        }

        @Override
        public void close() {
        }

    }

但它不起作用。请告知。

f0ofjuux

f0ofjuux1#

找到了一个帮助我的解决方案:

@SuppressWarnings({"WeakerAccess", "unused"})
public class PageViewTypedDemo {

/**
 * A serde for any class that implements {@link JSONSerdeCompatible}. Note that the classes also need to
 * be registered in the {@code @JsonSubTypes} annotation on {@link JSONSerdeCompatible}.
 *
 * @param <T> The concrete type of the class that gets de/serialized
 */
public static class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>, Deserializer<T>, Serde<T> {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @Override
    public void configure(final Map<String, ?> configs, final boolean isKey) {}

    @SuppressWarnings("unchecked")
    @Override
    public T deserialize(final String topic, final byte[] data) {
        if (data == null) {
            return null;
        }

        try {
            return (T) OBJECT_MAPPER.readValue(data, JSONSerdeCompatible.class);
        } catch (final IOException e) {
            throw new SerializationException(e);
        }
    }

    @Override
    public byte[] serialize(final String topic, final T data) {
        if (data == null) {
            return null;
        }

        try {
            return OBJECT_MAPPER.writeValueAsBytes(data);
        } catch (final Exception e) {
            throw new SerializationException("Error serializing JSON message", e);
        }
    }

    @Override
    public void close() {}

    @Override
    public Serializer<T> serializer() {
        return this;
    }

    @Override
    public Deserializer<T> deserializer() {
        return this;
    }
}

/**
 * An interface for registering types that can be de/serialized with {@link JSONSerde}.
 */
@SuppressWarnings("DefaultAnnotationParam") // being explicit for the example
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "_t")
@JsonSubTypes({
                  @JsonSubTypes.Type(value = PageView.class, name = "pv"),
                  @JsonSubTypes.Type(value = UserProfile.class, name = "up"),
                  @JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"),
                  @JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = "wpvbr"),
                  @JsonSubTypes.Type(value = RegionCount.class, name = "rc")
              })
public interface JSONSerdeCompatible {

}

// POJO classes
static public class PageView implements JSONSerdeCompatible {
    public String user;
    public String page;
    public Long timestamp;
}

static public class UserProfile implements JSONSerdeCompatible {
    public String region;
    public Long timestamp;
}

static public class PageViewByRegion implements JSONSerdeCompatible {
    public String user;
    public String page;
    public String region;
}

static public class WindowedPageViewByRegion implements JSONSerdeCompatible {
    public long windowStart;
    public String region;
}

static public class RegionCount implements JSONSerdeCompatible {
    public long count;
    public String region;
}

public static void main(final String[] args) {
    final Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class);
    props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, JSONSerde.class);
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class);
    props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, JSONSerde.class);
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

    // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    final StreamsBuilder builder = new StreamsBuilder();

    final KStream<String, PageView> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), new JSONSerde<>()));

    final KTable<String, UserProfile> users = builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new JSONSerde<>()));

    final KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
        .leftJoin(users, (view, profile) -> {
            final PageViewByRegion viewByRegion = new PageViewByRegion();
            viewByRegion.user = view.user;
            viewByRegion.page = view.page;

            if (profile != null) {
                viewByRegion.region = profile.region;
            } else {
                viewByRegion.region = "UNKNOWN";
            }
            return viewByRegion;
        })
        .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
        .groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>()))
        .windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
        .count()
        .toStream()
        .map((key, value) -> {
            final WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
            wViewByRegion.windowStart = key.window().start();
            wViewByRegion.region = key.key();

            final RegionCount rCount = new RegionCount();
            rCount.region = key.key();
            rCount.count = value;

            return new KeyValue<>(wViewByRegion, rCount);
        });

    // write to the result topic
    regionCount.to("streams-pageviewstats-typed-output");

    final KafkaStreams streams = new KafkaStreams(builder.build(), props);
    final CountDownLatch latch = new CountDownLatch(1);

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-pipe-shutdown-hook") {
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });

    try {
        streams.start();
        latch.await();
    } catch (final Throwable e) {
        e.printStackTrace();
        System.exit(1);
    }
    System.exit(0);
}
}
ffdz8vbo

ffdz8vbo2#

在上面的代码段中,您有:

@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "VALUE_SERDE", valueSerde = "VALUE_SERDE")

到底是什么 VALUE_SERDE ? 是不是在实施 Serde 接口?只要是执行适当的 Serde 接口,应该可以。活页夹把这个值降低到 StoreBuilder 内部。你有什么错误吗?如果你仍然面临问题,请与我们分享一个小样本应用程序,我们可以进一步研究。

相关问题