集成测试flink和kafka与scalatest嵌入的kafka

wpcxdonn  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(443)

我想和flink和kafka一起进行集成测试。这个过程是从Kafka中读取数据,用flink进行一些操作,然后把数据流放到Kafka中。
我想从头到尾测试这个过程。现在我使用scalatest嵌入式Kafka。
我举了一个例子,我尽量简单:

import java.util.Properties

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.scalatest.{Matchers, WordSpec}

import scala.collection.mutable.ListBuffer

object SimpleFlinkKafkaTest {

  class CollectSink extends SinkFunction[String] {
    override def invoke(string: String): Unit = {
      synchronized {
        CollectSink.values += string
      }
    }
  }

  object CollectSink {
    val values: ListBuffer[String] = ListBuffer.empty[String]
  }

  val kafkaPort = 9092
  val zooKeeperPort = 2181

  val props = new Properties()
  props.put("bootstrap.servers", "localhost:" + kafkaPort.toString)
  props.put("schema.registry.url", "localhost:" + zooKeeperPort.toString)

  val inputString = "mystring"
  val expectedString = "MYSTRING"
}

class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {

  "runs with embedded kafka" should {

    "work" in {

      implicit val config = EmbeddedKafkaConfig(
        kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
        zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort
      )

      withRunningKafka {

        publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.inputString)

        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.setParallelism(1)

        val kafkaConsumer = new FlinkKafkaConsumer011(
          "input-topic",
          new SimpleStringSchema,
          SimpleFlinkKafkaTest.props
        )

        implicit val typeInfo = TypeInformation.of(classOf[String])

        val inputStream = env.addSource(kafkaConsumer)

        val outputStream = inputStream.map(_.toUpperCase)

        val kafkaProducer = new FlinkKafkaProducer011(
          "output-topic",
          new SimpleStringSchema(),
          SimpleFlinkKafkaTest.props
        )
        outputStream.addSink(kafkaProducer)
        env.execute()
        consumeFirstStringMessageFrom("output-topic") shouldEqual SimpleFlinkKafkaTest.expectedString

      }
    }
  }
}

我有个错误,所以我加了一行 implicit val typeInfo = TypeInformation.of(classOf[String]) 但我真的不明白我为什么要这么做。
现在这个代码不起作用,它运行时没有中断,但不停止,也不给出任何结果。
有人知道吗?更好的办法是测试这种管道。
谢谢!
编辑:添加 env.execute() 改变错误。

esbemjvw

esbemjvw1#

我想出了一个简单的解决办法。
其目的是:
启动kafka嵌入式服务器
创建测试主题(这里是输入和输出)
在将来启动flink作业以避免阻塞主线程
将消息发布到输入主题
检查输出主题的结果
以及工作原型:

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.scalatest.{Matchers, WordSpec}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {

    "runs with embedded kafka on arbitrary available ports" should {

        val env = StreamExecutionEnvironment.getExecutionEnvironment

        "work" in {
            val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2182)

            val properties = new Properties()
            properties.setProperty("bootstrap.servers", "localhost:9092")
            properties.setProperty("zookeeper.connect", "localhost:2182")
            properties.setProperty("group.id", "test")
            properties.setProperty("auto.offset.reset", "earliest")

            val kafkaConsumer = new FlinkKafkaConsumer011[String]("input", new SimpleStringSchema(), properties)
            val kafkaSink = new FlinkKafkaProducer011[String]("output", new SimpleStringSchema(), properties)
            val stream = env
                .addSource(kafkaConsumer)
                .map(_.toUpperCase)
                .addSink(kafkaSink)

            withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
                createCustomTopic("input")
                createCustomTopic("output")
                Future{env.execute()}
                publishStringMessageToKafka("input", "Titi")
                consumeFirstStringMessageFrom("output") shouldEqual "TITI"
            }
        }
    }
}

相关问题