我想和flink和kafka一起进行集成测试。这个过程是从Kafka中读取数据,用flink进行一些操作,然后把数据流放到Kafka中。
我想从头到尾测试这个过程。现在我使用scalatest嵌入式Kafka。
我举了一个例子,我尽量简单:
import java.util.Properties
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.scalatest.{Matchers, WordSpec}
import scala.collection.mutable.ListBuffer
object SimpleFlinkKafkaTest {
class CollectSink extends SinkFunction[String] {
override def invoke(string: String): Unit = {
synchronized {
CollectSink.values += string
}
}
}
object CollectSink {
val values: ListBuffer[String] = ListBuffer.empty[String]
}
val kafkaPort = 9092
val zooKeeperPort = 2181
val props = new Properties()
props.put("bootstrap.servers", "localhost:" + kafkaPort.toString)
props.put("schema.registry.url", "localhost:" + zooKeeperPort.toString)
val inputString = "mystring"
val expectedString = "MYSTRING"
}
class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {
"runs with embedded kafka" should {
"work" in {
implicit val config = EmbeddedKafkaConfig(
kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort
)
withRunningKafka {
publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.inputString)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val kafkaConsumer = new FlinkKafkaConsumer011(
"input-topic",
new SimpleStringSchema,
SimpleFlinkKafkaTest.props
)
implicit val typeInfo = TypeInformation.of(classOf[String])
val inputStream = env.addSource(kafkaConsumer)
val outputStream = inputStream.map(_.toUpperCase)
val kafkaProducer = new FlinkKafkaProducer011(
"output-topic",
new SimpleStringSchema(),
SimpleFlinkKafkaTest.props
)
outputStream.addSink(kafkaProducer)
env.execute()
consumeFirstStringMessageFrom("output-topic") shouldEqual SimpleFlinkKafkaTest.expectedString
}
}
}
}
我有个错误,所以我加了一行 implicit val typeInfo = TypeInformation.of(classOf[String])
但我真的不明白我为什么要这么做。
现在这个代码不起作用,它运行时没有中断,但不停止,也不给出任何结果。
有人知道吗?更好的办法是测试这种管道。
谢谢!
编辑:添加 env.execute()
改变错误。
1条答案
按热度按时间esbemjvw1#
我想出了一个简单的解决办法。
其目的是:
启动kafka嵌入式服务器
创建测试主题(这里是输入和输出)
在将来启动flink作业以避免阻塞主线程
将消息发布到输入主题
检查输出主题的结果
以及工作原型: