kafka流:无法从kafka持久键值状态存储获取数据

nbysray5  于 2021-07-14  发布在  Java
关注(0)|答案(0)|浏览(248)

我在应用程序中使用kafka流和持久键值存储。我正在使用两个keyvalue存储和两个处理器。我面临两个处理器之间共享的statestore的问题。nameprocessor将数据放入namestore,eventprocessor从namestore提取数据。从调试来看,nameprocessor能够成功地放置数据,但是当eventprocessor试图从namestore获取数据时,它没有获取任何数据。下面是应用程序类、拓扑、nameprocessor和eventprocessor的代码段。另外,我使用的是SpringBootParentVersion2.4.3、kafka streams版本2.2.0和kafka客户端版本2.2.0

public static void main(String[] args) {

        SpringApplication.run(Application.class, args);

        Properties configs = getKafkaStreamProperties();

        Topology builder = new Topology();

        new ApplicationTopology(builder);

        KafkaStreams stream = new KafkaStreams(builder, configs);

        stream.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
            // here you should examine the throwable/exception and perform an appropriate action!
            logger.error("Uncaught exception in stream, MessageDetail: "+ ExceptionUtils.getRootCauseMessage(throwable) + ", Stack Trace: " + throwable.fillInStackTrace());
            Runtime.getRuntime().halt(1);
        });

        Runtime.getRuntime().addShutdownHook(new Thread(stream::close));

        stream.start();
    }

    private static Properties getKafkaStreamProperties() {
        Properties configs = new Properties();
        configs.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, getApplicationId());
        configs.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
        configs.setProperty(StreamsConfig.RETRIES_CONFIG, getRetries());
        configs.setProperty(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackOffMs());
        configs.setProperty(StreamsConfig.REPLICATION_FACTOR_CONFIG, getReplicationFactor());
        configs.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs());
        return configs;
    }

    public class ApplicationTopology {

    public ApplicationTopology (Topology builder) {

        StoreBuilder<KeyValueStore<String, Sensor>> nameStoreBuilder = Stores.
                keyValueStoreBuilder(Stores.persistentKeyValueStore("nameStore"), Serdes.String(), CustomSerdes.getNameSerde()).withCachingEnabled().withLoggingEnabled(new HashMap<>());

        StoreBuilder<KeyValueStore<String, String>> stateStoreBuilder = Stores.
                keyValueStoreBuilder(Stores.persistentKeyValueStore("stateStore"), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(new HashMap<>());

        builder.addSource(AutoOffsetReset.LATEST, "source", Serdes.String().deserializer(), CustomSerdes.getIncomingEventSerde().deserializer(), getInboundTopic())
                .addProcessor(TRANSFORMER, () -> new EventProcessor(), "source")
                .addStateStore(nameStoreBuilder, TRANSFORMER)
                .addSink("sink", getOutboundTopic(), Serdes.String().serializer(), CustomSerdes.getIncomingEventSerde().serializer(), TRANSFORMER);

        //reset to earliest for model config topic as some models could be already on the topic
        builder.addSource(AutoOffsetReset.EARLIEST, "nameStoreSource", Serdes.String().deserializer(), CustomSerdes.getSensorSerde().deserializer(), getInboundSensorUpdateTopic())
                .addProcessor("process", () -> new NameProcessor(), "nameStoreSource")
                .addStateStore(nameStoreBuilder,  TRANSFORMER, "process");

    }

    public ApplicationTopology() {}
    } }

    public class NameProcessor extends AbstractProcessor<String, Sensor> {

    private static final Logger LOGGER = LoggerFactory.getLogger(NameProcessor.class);

    ProcessorContext context;

    private KeyValueStore<String, Name> nameStore;

    private static List<String> externalDeviceIdList = new ArrayList<>();

    @Override
    public void init(ProcessorContext processorContext) {
        this.context = processorContext;
        this.nameStore = (KeyValueStore<String, Name>) context.getStateStore("nameStore");
    }

    @Override
    public void process(String externalDeviceId, Name name) {

        if (StringUtils.isNotBlank(externalDeviceId)) {
            String[] externalDeviceIds = SensorUtils.getExternalDeviceIdsWithoutSuffix(externalDeviceId);

            if (Objects.isNull(name)) {
                Arrays.stream(externalDeviceIds).forEach(id -> {
                    sensorStore.delete(id);
                });
            } else {
                addOrUpdateNameInStore(sensor, externalDeviceIds);
            }
        }

    }

    private void addOrUpdateNameInStore(Sensor sensor, String[] externalDeviceIds) {

        Arrays.stream(externalDeviceIds).forEach(id -> {
            sensorStore.put(id, sensor);

        });

        // context.commit();
    }

}

    public class EventProcessor extends AbstractProcessor<String, IncomingEvent> {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);

    ProcessorContext context;

    private KeyValueStore<String, Name> nameStore;

    private KeyValueStore<String, String> stateStore;

    @Override
    public void init(ProcessorContext processorContext) {
        this.context = processorContext;

        this.nameStore = (KeyValueStore<String, Name>) context.getStateStore("nameStore");
        this.stateStore = (KeyValueStore<String, String>) context.getStateStore("stateStore");

    }

    @Override
    public void process(String key, IncomingEvent value) {

        String correlationId = UUID.randomUUID().toString();

        try {

            String externalDeviceId = value.getExternalDeviceId();

           Name nameFromStore = nameStore.get(externalDeviceId);

}
}
}

In nameFromStore variable, I don't get even value even after storing it in NameProcessor.

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题