无法分析数据

zrfyljdw  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(277)

val patterns=ctx.getbroadcaststate(patternstatedescriptor)
我做的进口货

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{MapStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

这是密码

val env = StreamExecutionEnvironment.getExecutionEnvironment

  val properties = new Properties()
  properties.setProperty("bootstrap.servers","localhost:9092")

  val patternStream = new FlinkKafkaConsumer010("patterns", new SimpleStringSchema, properties)

  val patterns = env.addSource(patternStream)

  var patternData = patterns.map {
    str =>
      val splitted_str = str.split(",")
      PatternStream(splitted_str(0).trim, splitted_str(1).trim, splitted_str(2).trim)
  }

  val logsStream = new FlinkKafkaConsumer010("logs", new SimpleStringSchema, properties)

//  logsStream.setStartFromEarliest()

  val logs = env.addSource(logsStream)

  var data = logs.map {
    str =>
      val splitted_str = str.split(",")
      LogsTest(splitted_str.head.trim, splitted_str(1).trim, splitted_str(2).trim)
  }

  val keyedData: KeyedStream[LogsTest, String] = data.keyBy(_.metric)

  val bcStateDescriptor = new MapStateDescriptor[Unit, PatternStream]("patterns", Types.UNIT, Types.of[PatternStream]) // first type defined is for the key and second data type defined is for the value

  val broadcastPatterns: BroadcastStream[PatternStream]  = patternData.broadcast(bcStateDescriptor)

  val alerts = keyedData
      .connect(broadcastPatterns)
      .process(new PatternEvaluator())

  alerts.print()

//   println(alerts.getClass)
//  val sinkProducer = new FlinkKafkaProducer010("output",  new SimpleStringSchema(), properties)

  env.execute("Flink Broadcast State Job")
}

class PatternEvaluator()
  extends KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)] {

  private lazy val patternStateDescriptor = new MapStateDescriptor("patterns", classOf[String], classOf[String])

  private var lastMetricState: ValueState[String] = _

  override def open(parameters: Configuration): Unit = {
    val lastMetricDescriptor = new ValueStateDescriptor("last-metric", classOf[String])

    lastMetricState = getRuntimeContext.getState(lastMetricDescriptor)
  }

  override def processElement(reading: LogsTest,
                              readOnlyCtx: KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)]#ReadOnlyContext,
                              out: Collector[(String, String, String)]): Unit = {

    val metrics = readOnlyCtx.getBroadcastState(patternStateDescriptor)
    if (metrics.contains(reading.metric)) {
      val metricPattern: String = metrics.get(reading.metric)
      val metricPatternValue: String = metrics.get(reading.value)
      val lastMetric = lastMetricState.value()

      val logsMetric = (reading.metric)
      val logsValue = (reading.value)

      if (logsMetric == metricPattern) {
        if (metricPatternValue == logsValue) {
          out.collect((reading.timestamp, reading.value, reading.metric))
        }
      }
    }
  }

  override def processBroadcastElement(
                                        update: PatternStream,
                                        ctx: KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)]#Context,
                                        out: Collector[(String, String, String)]
                                      ): Unit = {
    val patterns = ctx.getBroadcastState(patternStateDescriptor)

    if (update.metric == "IP") {
      patterns.put(update.metric /*,update.operator*/ , update.value)
    }
    //    else if (update.metric == "username"){
    //      patterns.put(update.metric, update.value)
    //    }
    //    else {
    //      println("No required data found")
    //    }
    //  }

  }
}

示例数据:-日志流

"21/09/98","IP", "5.5.5.5"

模式流

"IP","==","5.5.5.5"

我无法通过得到期望的结果来分析数据,即=21/09/98,ip,5.5.5.5
目前没有错误,只是没有分析数据
代码正在读取流(选中)

jfewjypa

jfewjypa1#

在这种情况下,一个常见的问题是api不能控制模式和数据的接收顺序。可能是在processbroadcastelement之前调用了processelement。

相关问题