我有一个场景,其中10k+正则表达式与其他各种列一起存储在一个表中,需要与传入的数据集连接起来。最初我使用的是“sparksqlrlike”方法,如下所示,它能够保持负载,直到传入的记录计数小于50k
ps:正则表达式引用数据是广播数据集。 dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
然后我编写了一个自定义的udf,使用scala本地regex搜索来转换它们,如下所示,
下面val以元组数组的形式收集引用数据。
val regexPreCalcArray: Array[(Int, Regex)] = {
regexDataset.value
.select( "col_1", "regex_column")
.collect
.map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
}
正则表达式匹配udf的实现,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
udf((input_column: String) => {
for {
text <- Option(input_column)
matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
if matches.nonEmpty
} yield matches.map(x => x._1).min
}, IntegerType)
}
联接如下所示,其中引用数据中的唯一id将在多个正则表达式匹配的情况下从udf返回,并使用唯一id与引用数据联接,以检索结果所需的其他列,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
但当记录数增加到1m以上时,执行过程中的偏差也会非常慢[1个执行者任务运行了很长时间]。spark建议不要使用udf,因为它会降低性能,我应该在这里应用任何其他最佳实践,或者是否有比我在这里编写的更好的scala regex匹配api?或者任何有效的建议都会很有帮助。
暂无答案!
目前还没有任何答案,快来回答吧!