在运行时向Spark作业传递值列表

fnvucqvd  于 2023-03-23  发布在  Apache
关注(0)|答案(1)|浏览(176)

我有一个地点列表(100,101,205,310等)。我想将此位置列表存储在使用此列表作为其查询的一部分的Spark作业外部的位置。然后,如果以任何方式修改列表,则不必单独触摸每个Spark作业,将存在编辑列表的全局位置,并且每个单独的作业将从该全局位置拉取。
例如:
其中位置在(100、101、205、310等)
将被替换为
Where location in('全局位置列表的路径')
我已经尝试创建一个单独的.conf文件来存储这些类型的值/列表,但不确定如何将新的.conf文件绑定到所有的job.conf文件。

jxct1oxe

jxct1oxe1#

所以,这个SO问题Read files sent with spark-submit by the driver显示了围绕这个主题的一些讨论。当我与我一起工作的数据工程师交谈时,没有很好地理解。例如,本地文件系统文件与--files.conf问题以及ClusterClient模式和加载目录的所有方面。
但是,如果您查看允许的过滤值方法的简单Seq,例如从https://sparkbyexamples.com/spark/spark-isin-is-not-in-operator-example/

val data = Seq(("James","Java"),("Michael","Spark"),("Robert","Python"))
import spark.implicits._
val df = data.toDF("name","language")

//Using isin() function to check value in list of values
val listValues = Seq("Java","Scala")
df.filter(df("language").isin(listValues:_*)).show()

//+-----+--------+
//| name|language|
//+-----+--------+
//|James|    Java|
//+-----+--------+

我的理解是你可以通过一个--files的方法用适当的编码生成一个Seq来进行isin的比较。
这些天我使用Databricks笔记本,因为我厌倦了重新安装Hive Metastore等,所以我不检查--files方法,而只是使用分布式文件方法来过滤值,因为这是您最初学习的并且更容易:

  • 要应用的过滤器值的文件
  • 读取分布式
  • 通过驱动程序转换为序列
  • 将序列发送到任务
  • 过滤

我从DS开始,使用RDD,我遇到了一个我不确定的问题,但是RDD是遗留的,所以不相关。代码:

// You can use DF, DS filtering or SQL, but it does not work how you state in question. Substituting the SQL with Spark SQL can be done, but why not
// just use the below approach. As we have DF or SQL on tempview, that amount to the same thing.

// Can be done with or without broadcast. Here we do broadcast.

// 1. Get the filter location criteria. Note the format.
val ds = spark.read.textFile("/FileStore/tables/loc_include.txt") 
ds.show()

// 2. Generate a Seq via Spark Driver collect and pass to the Workers / Executors.
val loc_include = ds.flatMap(_.split(",")).withColumn("valueInt",col("value").cast("int")).select("valueInt").distinct.map(_.getInt(0)).collect.toSeq
val broadcast_loc_include = sc.broadcast(loc_include)

// 3. Our data.
val df = Seq( (1, 2, 320), (2, 2, 300), (7, 6, 400), (1, 3, 650)).toDF("col1", "col2", "col3")    

// 4a. Without broadcast.
val df2 = df.filter(df("col3").isin(loc_include:_*))
df2.show()

// 4b. With broadcast.
val df3 = df.filter(df("col3").isin(broadcast_loc_include.value:_*))
df3.show()

输入和结果:

+-----------+
|      value|
+-----------+
|100,200,300|
|        400|
+-----------+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   2|   2| 300|
|   7|   6| 400|
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   2|   2| 300|
|   7|   6| 400|
+----+----+----+

在任何情况下,它都不会按照您想要的方式工作,从文件中包含排序。
模拟输入。
此外,what's the purpose and usecase of --files in spark-submit?也向您展示了一些观点。

相关问题