kafka和akka(scala):如何创建source[committablemessage[array[byte],string],consumer.control]?

mrzz3bfm  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(256)

我想为单元测试创建一个带有可提交消息和使用者控件的源代码。
或者转换这样创建的源:

val message: Source[Array[Byte], NotUsed] = Source.single("one message".getBytes)

像这样的事情

Source[CommittableMessage[Array[Byte], String], Consumer.Control]

目标是单元测试参与者在消息上的行为,而不必在构建机器上安装kafka

0md85ypi

0md85ypi1#

您可以使用此帮助程序创建committablemessage:

package akka.kafka.internal

import akka.Done
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffsetBatch, GroupTopicPartition, PartitionOffset}
import akka.kafka.internal.ConsumerStage.Committer
import org.apache.kafka.clients.consumer.ConsumerRecord

import scala.collection.immutable
import scala.concurrent.Future

object AkkaKafkaHelper {

  private val committer = new Committer {
    def commit(offsets: immutable.Seq[PartitionOffset]): Future[Done] = Future.successful(Done)
    def commit(batch: CommittableOffsetBatch): Future[Done] = Future.successful(Done)
  }

  def commitableMessage[K, V](key: K, value: V, topic: String = "topic", partition: Int = 0, offset: Int = 0, groupId: String = "group"): CommittableMessage[K, V] = {
    val partitionOffset = PartitionOffset(GroupTopicPartition(groupId, topic, partition), offset)
    val record = new ConsumerRecord(topic, partition, offset, key, value)
    CommittableMessage(record, ConsumerStage.CommittableOffsetImpl(partitionOffset)(committer))
  }
}
j91ykkif

j91ykkif2#

使用 Consumer.committableSource 创建 Source[CommittableMessage[K, V], Control] . 这个想法是在你的测试中,你将产生一个或多个关于某个主题的消息,然后使用 committableSource 从同一主题中消费。
下面的示例演示了这种方法:它是对 IntegrationSpec 在阿克卡河Kafka项目。 IntegrationSpec 使用scalatest embedded kafka,它为scalatest规范提供内存中的kafka示例。

Source(1 to 100)
    .map(n => new ProducerRecord(topic1, partition0, null: Array[Byte], n.toString))
    .runWith(Producer.plainSink(producerSettings))

val consumerSettings = createConsumerSettings(group1)

val (control, probe1) = Consumer.committableSource(consumerSettings, TopicSubscription(Set(topic1)))
  .filterNot(_.record.value == InitialMsg)
  .mapAsync(10) { elem =>
    elem.committableOffset.commitScaladsl().map { _ => Done }
  }
  .toMat(TestSink.probe)(Keep.both)
  .run()

probe1
  .request(25)
  .expectNextN(25).toSet should be(Set(Done))

probe1.cancel()
Await.result(control.isShutdown, remainingOrDefault)

相关问题