我想在Kafka streams上下文之外的上下文中访问全局ktable。
举个例子,我有这个示例应用程序:
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
配置了此GlobalKTable
@Component
public class Table {
@Bean("myTable")
public GlobalKTable<String, String> buildTopo(@Qualifier("ksConfig") StreamsBuilder builder) {
return builder.globalTable("my-topic",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(
"my-state-store" /* table/store name */)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
);
}
}
这个Kafka流配置:
@Configuration
public class KafkaConfiguration {
@Bean("ksConfig")
public StreamsBuilderFactoryBean kafkaStreams(KafkaProperties kafkaProperties,
@Value("${spring.application.name}") String appName) {
var props = new HashMap<String, Object>(kafkaProperties.getProperties());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
var stateDir = kafkaProperties.getStreams().getStateDir() != null ?
kafkaProperties.getStreams().getStateDir() : System.getProperty("java.io.tmpdir");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
var config = new KafkaStreamsConfiguration(props);
return new StreamsBuilderFactoryBean(config);
}
现在假设我有一个Web端点:
@Path("/api")
@Service
public class Web {
@GET
@Path("/test")
@Produces(MediaType.APPLICATION_JSON)
public Response test(@QueryParam("key") String key) {
String value = // get value from global ktable with Key
return Response.status(200).entity(value).build();
}
}
如何从这个Web::test(...)
方法中获取ktable?
我尝试在Web
中自动加载一些预期的Kafka流类,如KafkaStreams
和ProcessorContext
。Spring抛出一个错误,说它找不到这些bean。
1条答案
按热度按时间j9per5c41#
InteractiveQueryService
实际上只有在使用spring-cloud时才需要,因为在那里你可以在一个应用程序中拥有多个kafka-streams示例。在你的例子中,你可以通过StreamsBuilderFactoryBean
访问KafkaStreams
对象并从那里获取store: