需要一些帮助,以整合Kafka与Spring云流。这个应用程序非常简单,有两个部分(作为单独的java进程运行)
使用者-将请求放入requesttopic并从responsetopic获得响应
生产者-从requesttopic获取请求并将响应放回responsetopic。
我已经为consumer创建了requestsenderchannel和responsereceiverchannel接口,为producer应用程序创建了requestreceiverchannel和responsesenderchannel接口。它们共享同一个yaml文件。根据文档spring.cloud.stream.bindings..destination应该指定发送或接收消息的主题。但是当我运行应用程序时,应用程序会在kafka中创建“requestsender”、“requestreceiver”、“responsesender”和“responsereceiver”主题
我的假设是:由于yaml文件中的destination只指定了两个主题'requesttopic'和'responsetopic',它应该已经创建了这些主题。但它会为yaml文件中“spring.cloud.stream.bindings”指定的属性创建kafka主题。有人能指出配置/代码中的问题吗?
public interface RequestReceiverChannel
{
String requestReceiver ="RequestReceiver";
@Input(requestReceiver)
SubscribableChannel pathQueryRequest();
}
public interface RequestSenderChannel
{
String RequestSender ="RequestSender";
@Output(RequestSender)
MessageChannel pathQueryRequestSender();
}
public interface ResponseReceiverChannel
{
String ResponseReceiver = "ResponseReceiver";
@Input(ResponseReceiver)
SubscribableChannel pceResponseServiceReceiver();
}
public interface ResponseSenderChannel
{
String ResponseSender = "ResponseSender";
@Output(ResponseSender)
MessageChannel pceResponseService();
}
'''
yaml配置文件
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
RequestSender:
binder: kafka
destination: RequestTopic
content-type: application/protobuf
group: consumergroup
ResponseSender:
binder: kafka
destination: ResponseTopic
content-type: application/protobuf
group: consumergroup
RequestReceiver:
binder: kafka
destination: RequestTopic
content-type: application/protobuf
group: consumergroup
ResponseReceiver:
binder: kafka
destination: ResponseTopic
content-type: application/protobuf
group: consumergroup
kafka:
bindings:
RequestTopic:
consumer:
autoCommitOffset: false
ResponseTopic:
consumer:
autoCommitOffset: false
binder:
brokers: ${SERVICE_KAFKA_HOST:localhost}
zkNodes: ${SERVICE_ZOOKEEPER_HOST:127.0.0.1}
defaultZkPort: ${SERVICE_ZOOKEEPER_PORT:2181}
defaultBrokerPort: ${SERVICE_KAFKA_PORT:9092}
1条答案
按热度按时间vlju58qv1#
通过做
spring.cloud.stream.bindings.<binding-name>.destination=foo
您表示希望Map由指定的绑定<binding-name>
(例如。,RequestSender
)到名为foo
. 如果这样的目的地不存在,它将被自动配置。所以没有问题。也就是说,我们刚刚发布了horsham.release(cloud-hoxton.release的一部分),并且我们正在从您当前使用的基于注解的模型转向一个非常简单的功能模型。您可以在我们的发布博客中阅读更多关于它的内容,该博客还提供了4篇文章的链接,在这里我们详细阐述并提供了更多关于函数式编程范例的示例。