如何访问执行器上的SparkContext以将DataFrame保存到Cassandra?

bn31dyow  于 2022-11-05  发布在  Cassandra
关注(0)|答案(2)|浏览(131)

如何在执行者上使用SparkContext(创建SparkSession或Cassandra会话)?
如果我把它作为一个参数传递给foreachforeachPartition,那么它将有一个null值。我应该在每个执行器中创建一个新的SparkContext吗?
我想做的事情如下:
读取包含数百万个XML文件的转储目录:

dumpFiles = Directory.listFiles(dumpDirectory)
dumpFilesRDD = sparkContext.parallize(dumpFiles, numOfSlices)
dumpFilesRDD.foreachPartition(dumpFilePath->parse(dumpFilePath))

parse()中,每一个XML文件都被Spark SQL验证、解析并插入到多个表中。只有有效的XML文件才能提供可以保存的相同类型的对象。部分数据在插入到其中一个表之前需要用其他键替换。
为此,在函数parse中需要SparkContext来使用sparkContext.sql()

aiazj4mn

aiazj4mn1#

如果我换个方式问你,你想要的是:
1.读取包含数百万个XML文件的目录
1.解析它们
1.将它们插入数据库
这是一个典型的提取、转换和加载(ETL)过程,在Spark SQL中 * 非常 * 简单。
可以使用单独的包spark-xml来加载XML文件:

spark-xml用于Spark SQL和DataFrames的Apache Spark解析和查询XML数据的库。结构和测试工具大多数是从Spark的CSV数据源复制的。

您可以使用--packages命令行选项“安装”软件包:

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.11:0.4.1

引用spark-xml的Scala API(做了一些更改,改用SparkSession):

// Step 1. Loading XML files
val path = "the/path/to/millions/files/*.xml"
val spark: SparkSession = ???
val files = spark.read
  .format("com.databricks.spark.xml")
  .option("rowTag", "book")
  .load(path)

这使得第一个需求几乎不需要动脑筋。Spark SQL已经处理了数百万个XML文件。
第2步是解析行(来自XML文件),并标记要保存到相应表中的行。

// Step 2. Transform them (using parse)
def parse(line: String) = ???
val parseDF = files.map { line => parse(line) }

您的parse函数可以返回 something(作为主结果)和 something 应该保存到的表。
使用 *table marker *,您可以将parseDF拆分为每个表的DataFrame。

val table1DF = parseDF.filter($"table" === "table1")

依此类推(每个表)。

// Step 3. Insert into DB    
table1DF.write.option(...).jdbc(...)

这只是你可能真正想要的东西的一个草图,但这是要遵循的一般模式。把你的管道分解成可消化的块,一次处理一个块。

n1bvdmb6

n1bvdmb62#

在Spark中,我们不应该按照执行者的方式进行编程,记住这一点很重要。
在Spark编程模型中,你的驱动程序大多是一个自包含的程序,其中的某些部分会自动转换成一个物理的执行计划,最终是一堆分布在工作者/执行者之间的任务。
当你需要为每个分区执行一些操作时,你可以使用mapPartitions()之类的函数。更多细节请参考Spark : DB connection per Spark RDD partition and do mapPartition。注意dbConnection对象是如何被包含在函数体中的。
不清楚你所说的参数是什么意思,如果它只是数据(不是DB连接或类似的),我认为你需要使用一个boradcast变量。

相关问题