在akka streams kafka中构造producermessage时如何配置offset参数?

1bqhqjot  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(617)

我使用的是scala 2.11和akka streams kafka 0.17。
我有一条小溪:
Source 是使用 Source.actorRef . 这里,actor被安排以一定的时间间隔运行,并连续生成消息,这些消息被发送到流中。
我附上了一份 Producer 作为一个 Flow . 制作人推动 ProducerMessage.Message Kafka的主题。
一些数据库操作。
我在构建 ProducerMessage.Message ,看起来像:

final case class Message[K, V, +PassThrough](
    record: ProducerRecord[K, V],
    passThrough: PassThrough
  )

我很容易通过考试 record 包含实际消息的参数。但我不知道接下来要传递什么 passThrough 参数。根据文件:
这个 passThrough 字段可以保存通过 Consumer#flow 包括在 Result . 当需要向下游操作传递某些上下文时,这非常有用。这可以通过unzip/zip来完成,但这更方便。例如,它可以是一个 ConsumerMessage.CommittableOffset 或者 ConsumerMessage.CommittableOffsetBatch 可以稍后在流中提交。
在我的例子中,没有任何Kafka消费者订阅Kafka主题并生成 Source ( comittableSource 或者 plainSource )为了我的小溪。在这种情况下,我将传递文档中描述的消费者补偿。但在我的例子中,一个演员正在模拟这样一个消费者。那意味着我没有权限 ConsumerMessage.CommittableOffset . 那我要把什么传给 passThrough 参数在这里?在这种情况下,最好的做法是什么?

s4chpxco

s4chpxco1#

把我的问题转达给 reactive-kafka 团队,我得到答案了。基本上,他们所说的是,如果你没有一个用例 pass through 任何情况下,您都可以尝试将其设置为none或notused,或者仅设置空字符串“”。
另外请注意,如果您使用 Producer.plainSink 您不需要构造 ProducerMessage.Message . 然后,你可以直接构造一个Kafka ProducerRecord . 那个 ProducerMessage.Message case类只是一个容器,用于 pass through 是需要的。除了要传递的元素之外,它只包含一个kafka ProducerRecord .

相关问题