我有一个actor系统,我用它来调度函数的执行,该函数从flink的map操作符生成kafka主题中的事件。在例外情况下,actor系统终止,并在akka文件中声明(参见https://doc.akka.io/docs/akka/current/scheduler.html#from-akka actor actorsystem)所有计划的任务都应该执行。在我的例子中,当函数被执行时,会抛出一个java.lang.noclassdeffounderror,它与函数中使用的类有关。
new RichMapFunction[String, String] {
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
var myActorSystem: ActorSystem = _
var kafkaProducer: KafkaProducer[String, String] = _
var runtimeContext: RuntimeContext = _
override def map(value: String): String = {
value match {
case "stop" =>
throw new Exception("Stop command received")
case _ =>
myActorSystem.scheduler.scheduleOnce(FiniteDuration(5L, MINUTES)){
kafkaProducer.send(new ProducerRecord[String, String]("test", value.reverse))
}
}
s"scheduled function on event $value"
}
override def open(parameters: Configuration): Unit = {
myActorSystem = ActorSystem("testSystem")
kafkaProducer = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
// props.put("acks", "all")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](props)
}
runtimeContext = getRuntimeContext
}
override def close(): Unit = {
println("Terminate actor system...")
myActorSystem.terminate()
}
}
1条答案
按热度按时间xdnvmnnf1#
actor系统终止是异步的,所以我使用了下面的代码。