如何在spark scala Dataframe 视图上应用过滤器?

igetnqfo  于 2022-12-04  发布在  Scala
关注(0)|答案(1)|浏览(154)

我在这里粘贴了一个代码片段,在这里我遇到了BigQuery Read的问题。“wherePart”有更多的记录,因此BQ调用被一次又一次地调用。将过滤器保持在BQ Read之外会有所帮助。想法是,首先从BQ读取“mainTable”,将其存储在spark视图中,然后将“wherePart”筛选器应用于spark中的此视图。[***“subDate”是一个函数,用于从一个日期减去另一个日期,并返回两个日期之间的天数***]

val Df =  getFb(config, mainTable, ds)

  def getFb(config: DataFrame, mainTable: String, ds: String) : DataFrame = {

    val fb = config.map(row => Target.Pfb(
      row.getAs[String]("m1"),
      row.getAs[String]("m2"),
      row.getAs[Seq[Int]]("days")))
      .collect

    val wherePart = fb.map(x => (x.m1, x.m2, subDate(ds, x.days.max - 1))).
      map(x => s"(idata_${x._1} = '${x._2}' AND ds BETWEEN '${x._3}' AND '${ds}')").
      mkString(" OR ")

    val q = new Q()
    val tempView = "tempView"
    spark.readBigQueryTable(mainTable, wherePart).createOrReplaceTempView(tempView)
    val Df = q.mainTableLogs(tempView)
    Df
  }

谁来帮帮我。

3zwtqj6y

3zwtqj6y1#

您使用的是spark-bigquery-connector吗?如果是,正确的语法是

spark.read.format("bigquery")
  .load(mainTable)
  .where(wherePart)
  .createOrReplaceTempView(tempView)

相关问题