如何在执行者上使用SparkContext
(创建SparkSession
或Cassandra会话)?
如果我把它作为一个参数传递给foreach
或foreachPartition
,那么它将有一个null
值。我应该在每个执行器中创建一个新的SparkContext
吗?
我想做的事情如下:
读取包含数百万个XML文件的转储目录:
dumpFiles = Directory.listFiles(dumpDirectory)
dumpFilesRDD = sparkContext.parallize(dumpFiles, numOfSlices)
dumpFilesRDD.foreachPartition(dumpFilePath->parse(dumpFilePath))
在parse()
中,每一个XML文件都被Spark SQL验证、解析并插入到多个表中。只有有效的XML文件才能提供可以保存的相同类型的对象。部分数据在插入到其中一个表之前需要用其他键替换。
为此,在函数parse
中需要SparkContext
来使用sparkContext.sql()
。
2条答案
按热度按时间aiazj4mn1#
如果我换个方式问你,你想要的是:
1.读取包含数百万个XML文件的目录
1.解析它们
1.将它们插入数据库
这是一个典型的提取、转换和加载(ETL)过程,在Spark SQL中 * 非常 * 简单。
可以使用单独的包spark-xml来加载XML文件:
spark-xml用于Spark SQL和DataFrames的Apache Spark解析和查询XML数据的库。结构和测试工具大多数是从Spark的CSV数据源复制的。
您可以使用
--packages
命令行选项“安装”软件包:引用spark-xml的Scala API(做了一些更改,改用
SparkSession
):这使得第一个需求几乎不需要动脑筋。Spark SQL已经处理了数百万个XML文件。
第2步是解析行(来自XML文件),并标记要保存到相应表中的行。
您的
parse
函数可以返回 something(作为主结果)和 something 应该保存到的表。使用 *table marker *,您可以将
parseDF
拆分为每个表的DataFrame。依此类推(每个表)。
这只是你可能真正想要的东西的一个草图,但这是要遵循的一般模式。把你的管道分解成可消化的块,一次处理一个块。
n1bvdmb62#
在Spark中,我们不应该按照执行者的方式进行编程,记住这一点很重要。
在Spark编程模型中,你的驱动程序大多是一个自包含的程序,其中的某些部分会自动转换成一个物理的执行计划,最终是一堆分布在工作者/执行者之间的任务。
当你需要为每个分区执行一些操作时,你可以使用mapPartitions()之类的函数。更多细节请参考Spark : DB connection per Spark RDD partition and do mapPartition。注意dbConnection对象是如何被包含在函数体中的。
不清楚你所说的参数是什么意思,如果它只是数据(不是DB连接或类似的),我认为你需要使用一个boradcast变量。