kafka流:过滤记录到全局存储

oug3syen  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(187)

我的目标是在kafka streams应用程序中拥有全局存储,其中包含从源主题筛选的记录。但是,当我查询商店时,里面没有记录。我不想用过滤过的记录创建一个新的主题,我想把它们过滤到kafka streams示例的全局存储中。如何做到这一点?
这是我的密码:

import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class BrokerClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(BrokerClient.class);
    private final BrokerConfiguration configuration;
    private KafkaStreams storeStream;
    private Optional<String> optionalDataProvider;

    public BrokerClient(BrokerConfiguration configuration, String optionalDataProvider) {
        this.configuration = configuration;
        this.optionalDataProvider = Optional.ofNullable(optionalDataProvider);
        initializeGlobalStoreStream();
    }

    private <MeteoStation> ReadOnlyKeyValueStore<String, MeteoStation> getRecordsStore() {
        return storeStream.store(StoreQueryParameters.fromNameAndType(getStoreName(),
                QueryableStoreTypes.keyValueStore()));
    }

    private SpecificAvroSerde<MeteoStation> createAvroSerde() {
        final SpecificAvroSerde<MeteoStation> avroSerde = new SpecificAvroSerde<>();
        final var serdeConfigMap = Collections.singletonMap(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                configuration.getSchemaRegistryUri());
        avroSerde.configure(serdeConfigMap, false);
        return avroSerde;
    }

    private void initializeGlobalStoreStream() {
        final var applicationId = "Consumer-" + getTopicName();
        final Topology topology = createFilteredToDataProviderTopology();
        final var properties = createConsumerProperties(applicationId);
        storeStream = new KafkaStreams(topology, properties);
        Runtime.getRuntime().addShutdownHook(new Thread(storeStream::close));
        storeStream.cleanUp();

        final CountDownLatch startLatch = new CountDownLatch(1);
        storeStream.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState != KafkaStreams.State.RUNNING) {
                startLatch.countDown();
            }
        });
        LOGGER.debug("Started global store stream for topic {}.", getTopicName());
        storeStream.start();

        try {
            if (!startLatch.await(60, TimeUnit.SECONDS)) {
                LOGGER.error("Store stream never finished rebalancing on startup. Topic: {}",
                        getTopicName());
                throw new RuntimeException("Global store stream never finished rebalancing on startup");
            }
            var abc = 123;
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /* Properties are dedicated to Kafka Streams interface */
    private Properties createConsumerProperties(String applicationId) {
        final var properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getUri());
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + UUID.randomUUID().toString());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return properties;
    }

    private String getTopicName() {
        return configuration.getTopicName();
    }

    private String getStoreName() {
        return getTopicName() + "-store";
    }

    private Topology createFilteredToDataProviderTopology() {
        final var topologyBuilder = new StreamsBuilder();
        final var avroSerde = createAvroSerde();
        final var dataProviderName = optionalDataProvider.get();
        KStream<String, MeteoStation> topicStream = topologyBuilder.stream(getTopicName(),
                Consumed.with(Serdes.String(), avroSerde));
        topicStream.filter((id, record) -> record.getDataProvider().equals(dataProviderName))
        .toTable(Materialized.<String, MeteoStation, KeyValueStore<Bytes, byte[]>>as(getStoreName())
                .withKeySerde(Serdes.String()).withValueSerde(avroSerde));
        return topologyBuilder.build();
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题