我有一个scala代码,它使用来自kinesis流的数据,并作为作业的一部分执行处理功能。
它还被用于各种应用程序,这些应用程序使用相同的代码来设置从kinesis中找到的表的不同记录值。
作为重构代码的一部分,以减少视频流的负载。
我决定按以下方式做:
val unionStreams = ssc.union(kinesisStreams)
val data = unionStreams.map(x => new String(x))
var cnt = 0
var first_row = ""
val tableArray = tables.split(",")
println("tables : " + tables)
println("table array : " + tableArray)
for (currentTable <- tableArray) {
data.foreachRDD { messages =>
val batch = spark.read.schema(schema).json(messages).orderBy(asc("arrival_timestamp"))
batch.collect().foreach { reference_row =>
if (reference_row.getAs[String](1) == inputvalue && reference_row.getAs[String](2) == currentTable) {
if (cnt == 0) {
// first_row = reference_row.getAs[String](5)
cnt = 1
} else {
println(first_row + " " + reference_row.getAs[String](5))
processRecords(first_row, reference_row.getAs[String](5).toString())
first_row = reference_row.getAs[String](5)
}
}
}
}
}
我必须使用collect,这样我们就不会按不同的顺序处理记录。
由于引入了新的for循环,我们不能为if子句设置的值保持第一行常量,并且每次都会被覆盖。
我想添加一个Map来存储每个表的第一行,但它确实会给处理带来延迟。
有没有更好的办法来解决这个问题。提前谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!