scala—如何在spark kinesis应用程序中循环多个记录并保持顺序

a64a0gku  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(257)

我有一个scala代码,它使用来自kinesis流的数据,并作为作业的一部分执行处理功能。
它还被用于各种应用程序,这些应用程序使用相同的代码来设置从kinesis中找到的表的不同记录值。
作为重构代码的一部分,以减少视频流的负载。
我决定按以下方式做:

val unionStreams = ssc.union(kinesisStreams)
      val data = unionStreams.map(x => new String(x))
      var cnt = 0
      var first_row = ""

      val tableArray = tables.split(",")
      println("tables : " + tables)
      println("table array : " + tableArray)

      for (currentTable <- tableArray) {

      data.foreachRDD { messages =>
        val batch = spark.read.schema(schema).json(messages).orderBy(asc("arrival_timestamp"))

          batch.collect().foreach { reference_row =>

            if (reference_row.getAs[String](1) == inputvalue && reference_row.getAs[String](2) == currentTable) {
              if (cnt == 0) {
            //    first_row = reference_row.getAs[String](5)
                cnt = 1
              } else {
                println(first_row + " " + reference_row.getAs[String](5))
                processRecords(first_row, reference_row.getAs[String](5).toString())
                first_row = reference_row.getAs[String](5)
              }
            }
          }
        }
      }

我必须使用collect,这样我们就不会按不同的顺序处理记录。
由于引入了新的for循环,我们不能为if子句设置的值保持第一行常量,并且每次都会被覆盖。
我想添加一个Map来存储每个表的第一行,但它确实会给处理带来延迟。
有没有更好的办法来解决这个问题。提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题