java 如何在spring Boot 中访问Kafka streams上下文之外的GlobalKTable?

kxeu7u2r  于 2023-04-19  发布在  Java
关注(0)|答案(1)|浏览(109)

我想在Kafka streams上下文之外的上下文中访问全局ktable。
举个例子,我有这个示例应用程序:

@SpringBootApplication
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

配置了此GlobalKTable

@Component
public class Table {

    @Bean("myTable")
    public GlobalKTable<String, String> buildTopo(@Qualifier("ksConfig") StreamsBuilder builder) {
        return builder.globalTable("my-topic",
                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(
                                "my-state-store" /* table/store name */)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.String())
        );
    }
}

这个Kafka流配置:

@Configuration
public class KafkaConfiguration {

    @Bean("ksConfig")
    public StreamsBuilderFactoryBean kafkaStreams(KafkaProperties kafkaProperties,
                                                  @Value("${spring.application.name}") String appName) {
        var props = new HashMap<String, Object>(kafkaProperties.getProperties());
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
        var stateDir = kafkaProperties.getStreams().getStateDir() != null ?
                kafkaProperties.getStreams().getStateDir() : System.getProperty("java.io.tmpdir");
        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
        var config = new KafkaStreamsConfiguration(props);
        return new StreamsBuilderFactoryBean(config);
    }

现在假设我有一个Web端点:

@Path("/api")
@Service
public class Web {

    @GET
    @Path("/test")
    @Produces(MediaType.APPLICATION_JSON)
    public Response test(@QueryParam("key") String key) {

        String value =  // get value from global ktable with Key

        return Response.status(200).entity(value).build();
    }
}

如何从这个Web::test(...)方法中获取ktable?
我尝试在Web中自动加载一些预期的Kafka流类,如KafkaStreamsProcessorContext。Spring抛出一个错误,说它找不到这些bean。

j9per5c4

j9per5c41#

InteractiveQueryService实际上只有在使用spring-cloud时才需要,因为在那里你可以在一个应用程序中拥有多个kafka-streams示例。在你的例子中,你可以通过StreamsBuilderFactoryBean访问KafkaStreams对象并从那里获取store:

@Path("/api")
@Service
public class Web {

    @Autowired
    @Qualifier("ksConfig")
    private StreamsBuilderFactoryBean streamsBuilderFactoryBean;

    @GET
    @Path("/test")
    @Produces(MediaType.APPLICATION_JSON)
    public Response test(@QueryParam("key") String key) {

        String value =  getValue(key);

        return Response.status(200).entity(value).build();
    }
    
    private String getValue(String key) {
        QueryableStoreType<ReadOnlyKeyValueStore<String, String>> storeType = QueryableStoreTypes.keyValueStore();
        ReadOnlyKeyValueStore<String, String> store = streamsBuilderFactoryBean.getKafkaStreams().store(StoreQueryParameters.fromNameAndType("my-state-store", storeType));
        return store.get(key);
    }
}

相关问题