没有任何东西是从Flink图案流打印出来的

tf7tbtn2  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(543)

我有以下代码:

import java.util.Properties

import com.google.gson._
import com.typesafe.config.ConfigFactory
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object WindowedWordCount {
  val configFactory = ConfigFactory.load()
  def main(args: Array[String]) = {
    val brokers = configFactory.getString("kafka.broker")
    val topicChannel1 = configFactory.getString("kafka.topic1")

    val props = new Properties()
    ...

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))

    val partitionedInput = dataStream.keyBy(jsonString => {
      val jsonParser = new JsonParser()
      val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
      jsonObject.get("account")
    })

    val priceCheck = Pattern.begin[String]("start").where({jsonString =>
      val jsonParser = new JsonParser()
      val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
      jsonObject.get("account").toString == "iOS"})

    val pattern = CEP.pattern(partitionedInput, priceCheck)

    val newStream = pattern.select(x =>
      x.get("start").map({str =>
        str
      })
    )

    newStream.print()

    env.execute()
  }
}

出于某种原因在上面的代码中 newStream.print() 什么都没有打印出来。我肯定Kafka中有数据符合我上面定义的模式,但由于某种原因没有打印出来。
有人能帮我找出这个代码中的错误吗?
编辑:

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {

  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    val jsonParser = new JsonParser()
    val context = jsonParser.parse(e).getAsJsonObject.getAsJsonObject("context")
    Instant.parse(context.get("serverTimestamp").toString.replaceAll("\"", "")).toEpochMilli
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis())
  }
}

我在Flink医生上看到你可以回来 prevElementTimestampextractTimestamp 方法(如果您使用的是kafka010)和 new Watermark(System.currentTimeMillis)getCurrentWatermark 方法。
但我不明白什么 prevElementTimestamp 是或者为什么一个人会回来 new Watermark(System.currentTimeMillis) 作为水印而不是其他东西。你能详细说明我们为什么要这样做吗 WaterMark 以及 EventTime 一起工作好吗?

8ljdwjyq

8ljdwjyq1#

你把你的工作安排在 EventTime ,但不提供时间戳和水印提取器。
有关在事件时间中工作的更多信息,请参阅这些文档。如果你想使用Kafka嵌入的时间戳这个文件可能会帮助你。
EventTime cep库在水印到达时缓冲事件,以便正确处理无序事件。在您的例子中,没有生成水印,因此事件被无限缓冲。
编辑:
对于 prevElementTimestamp 我觉得文件很清楚:
使用Kafka的时间戳时,不需要定义时间戳提取器。extracttimestamp()方法的previouselementtimestamp参数包含kafka消息携带的时间戳。
因为kafka 0.10.x kafka消息可以嵌入时间戳。
生成 Watermark 作为 new Watermark(System.currentTimeMillis) 在这种情况下不是个好主意。你应该创造 Watermark 基于你对数据的了解。了解如何 Watermark 以及 EventTime 一起工作我再清楚不过了

相关问题