我想对我的Kafka流主题进行交互式查询。
目前,我可以将avro序列化的json对象发送到我的主题,并使用avro反序列化程序再次读取它们。在这个场景中,我使用了普通的messagechannel绑定器,它可以按预期工作。
现在我想用Kafka流活页夹,我不能让它工作。也许有人能帮我。
我的配置:
spring:
cloud:
bus:
enabled: true
stream:
schemaRegistryClient.endpoint: http://192.168.99.100:8081
bindings:
segments-in:
destination: segments
contentType: application/vnd.segments-value.v1+avro
segments-all:
destination: segments
group: segments-all
consumer:
headerMode: raw
useNativeDecoding: true
kafka:
binder:
zkNodes: 192.168.99.100:2181
brokers: 192.168.99.100:32768
streams:
bindings:
segments-all:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
Kafka配置类:
@Configuration
public class KafkaConfiguration {
@Bean
public MessageConverter classificationMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter();
converter.setSchema(Segment.SCHEMA$);
return converter;
}
}
架构配置
@Configuration
public class SchemaRegistryConfiguration {
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") final String endpoint) {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
}
现在我的界面
public interface Channels {
String EVENTS = "segments-in";
String ALLSEGMENTS = "segments-all";
@Input(Channels.EVENTS)
SubscribableChannel events();
@Input(Channels.ALLSEGMENTS)
KTable<?, ?> segmentsIn();
}
我总是得到以下错误(警告消息),但只有当我打开了第二个通道segmentsin()时。
org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-3] Connection to node -1 could not be established. Broker may not be available.
有了subscribablechannel(segments in),一切正常,我做错什么了?如何让所有通道段与kafka流api一起工作?
1条答案
按热度按时间cgh8pdjw1#
我使用以下配置建立了连接:
请参阅为kafka流添加的配置,但我无法用代码查询任何内容。
我使用以下代码段:
这个调度器:
输出总是:
有人能给我指出正确的方向吗?我做错什么了?