我的目标是在kafka streams应用程序中拥有全局存储,其中包含从源主题筛选的记录。但是,当我查询商店时,里面没有记录。我不想用过滤过的记录创建一个新的主题,我想把它们过滤到kafka streams示例的全局存储中。如何做到这一点?
这是我的密码:
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class BrokerClient {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerClient.class);
private final BrokerConfiguration configuration;
private KafkaStreams storeStream;
private Optional<String> optionalDataProvider;
public BrokerClient(BrokerConfiguration configuration, String optionalDataProvider) {
this.configuration = configuration;
this.optionalDataProvider = Optional.ofNullable(optionalDataProvider);
initializeGlobalStoreStream();
}
private <MeteoStation> ReadOnlyKeyValueStore<String, MeteoStation> getRecordsStore() {
return storeStream.store(StoreQueryParameters.fromNameAndType(getStoreName(),
QueryableStoreTypes.keyValueStore()));
}
private SpecificAvroSerde<MeteoStation> createAvroSerde() {
final SpecificAvroSerde<MeteoStation> avroSerde = new SpecificAvroSerde<>();
final var serdeConfigMap = Collections.singletonMap(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
configuration.getSchemaRegistryUri());
avroSerde.configure(serdeConfigMap, false);
return avroSerde;
}
private void initializeGlobalStoreStream() {
final var applicationId = "Consumer-" + getTopicName();
final Topology topology = createFilteredToDataProviderTopology();
final var properties = createConsumerProperties(applicationId);
storeStream = new KafkaStreams(topology, properties);
Runtime.getRuntime().addShutdownHook(new Thread(storeStream::close));
storeStream.cleanUp();
final CountDownLatch startLatch = new CountDownLatch(1);
storeStream.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState != KafkaStreams.State.RUNNING) {
startLatch.countDown();
}
});
LOGGER.debug("Started global store stream for topic {}.", getTopicName());
storeStream.start();
try {
if (!startLatch.await(60, TimeUnit.SECONDS)) {
LOGGER.error("Store stream never finished rebalancing on startup. Topic: {}",
getTopicName());
throw new RuntimeException("Global store stream never finished rebalancing on startup");
}
var abc = 123;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/* Properties are dedicated to Kafka Streams interface */
private Properties createConsumerProperties(String applicationId) {
final var properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getUri());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + UUID.randomUUID().toString());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return properties;
}
private String getTopicName() {
return configuration.getTopicName();
}
private String getStoreName() {
return getTopicName() + "-store";
}
private Topology createFilteredToDataProviderTopology() {
final var topologyBuilder = new StreamsBuilder();
final var avroSerde = createAvroSerde();
final var dataProviderName = optionalDataProvider.get();
KStream<String, MeteoStation> topicStream = topologyBuilder.stream(getTopicName(),
Consumed.with(Serdes.String(), avroSerde));
topicStream.filter((id, record) -> record.getDataProvider().equals(dataProviderName))
.toTable(Materialized.<String, MeteoStation, KeyValueStore<Bytes, byte[]>>as(getStoreName())
.withKeySerde(Serdes.String()).withValueSerde(avroSerde));
return topologyBuilder.build();
}
暂无答案!
目前还没有任何答案,快来回答吧!