springxd:从kafka读取消息时,消息不包含头

9lowa7mx  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(551)

我有以下结构:
Kafka的主题
Kafka接收器/源的模块
Kafka来源之后的模块。基本上是为了阅读Kafka的资料。
问题是,当kafka接收器向kafka队列发送事件,并且kafka源已接收到来自kafka主题的消息,然后,下一个模块尝试读取头时,它失败,因为找不到头。
我提出以下解决方案:将消息头 Package 到消息中,这样,内部有效负载将包含一个原始有效负载+消息头。
我还有别的选择吗?。

jhdbpxl9

jhdbpxl91#

由于kafka本身不支持报头,因此kafka消息总线支持您所描述的确切机制—只是您必须选择要由总线传输的报头名称,以避免传输不需要的报头。
请参阅应用程序配置。
向下滚动到kafka消息总线属性。

messagebus:
 kafka:
  # connection properties
  brokers:                                localhost:9092  (1)
  zkAddress:                              localhost:2181  (2)
  socketBufferSize:                       2097152         (3)
  # operating mode
  mode:                                   embeddedHeaders (4)
  offsetManagement:                       kafkaTopic      
  headers:
  # comma-delimited list of additional header names to transport (6)
  ...

要由总线传输的自定义标头的列表。
编辑
对于rabbit,默认情况下传输所有标头:

replyHeaderPatterns:       STANDARD_REPLY_HEADERS,*   
    ...
    requestHeaderPatterns:     STANDARD_REQUEST_HEADERS,*

编辑
如果您的意思是希望使用相同的技术使用sink/source从kafka发送/接收数据,那么您可以使用xd的支持类来实现这一点-请参阅 EmbeddedHeadersMessageConverter . 头值被编码为json。

相关问题