如何使用sparklyr将RDD转换为spark Dataframe ?

zed5wv10  于 2023-02-10  发布在  Spark
关注(0)|答案(1)|浏览(174)

我在很多文件夹的blob存储上有很多文件,里面有azure IOT推送的文本数据,我想读取它们,有一个delta lake表,一个文件每行一行,我以前是一个文件一个文件地读取,但是太花时间了,所以我想用spark来加快这个处理,它需要集成一个用R做的databricks工作流。
我发现spark_read_text函数可以读取文本文件,但它不能递归读取目录,它只能理解所有文件是否都在一个目录中。
以下是文件路径的示例(appid/partition/年/月/日/小时/分钟/文件):应用程序标识/10/2023/02/06/08/42/gdedir22hccjq
分区是Azure IoT创建的一个随机文件夹(目前大约有30个),似乎是为了并行处理数据,因此同一日期的数据可以拆分到几个文件夹中,这并没有简化读取效率。
所以我发现的唯一一个函数是spark.textFile,它可以处理jokers并递归地处理目录,唯一的问题是它返回一个RDD,我找不到一种方法将它转换成spark Dataframe ,最终可以使用tbl_spark R对象访问它。
以下是我目前所做的工作:
你需要设置配置来递归读取文件夹(这里我在一个专用的python单元中的数据块上做这个):

%py
sc._jsc.hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true")

然后我可以创建一个RDD:

j_rdd <- spark_context(sc) %>%
  invoke("textFile", "/mnt/my_cont/app_id/*/2022/11/17/*", 10L)

这将创建RDD,正如您所看到的,我可以用""Map所有分区(在一年之前),以及用""在末尾递归Map文件夹四小时零分钟。
我可以收集它并创建一个R Dataframe :

lst <- invoke(j_rdd, "collect")
data.frame(row = unlist(lst))

这正确地得到了我的数据,一列文本和一行的每一行的每一个文件(我不能显示一个例子的隐私原因,但这并不重要)。
问题是我不想收集数据,但想用这些数据更新一个delta表,却找不到一种方法来获得一个我可以使用的sparklyr对象。

>j_obj
<jobj[2666]>
  org.apache.spark.rdd.MapPartitionsRDD
  /mnt/my_cont/app_id/*/2022/11/17/* MapPartitionsRDD[80] at textFile at NativeMethodAccessorImpl.java:0

到目前为止我越接近:我尝试复制代码here,使用invoke将数据转换为 Dataframe ,但似乎没有正确执行:

contents_field <- invoke_static(sc, "sparklyr.SQLUtils", "createStructField", "contents", "character", TRUE)
schema <- invoke_static(sc, "sparklyr.SQLUtils", "createStructType", list(contents_field))
j_df <- invoke(hive_context(sc), "createDataFrame", j_rdd, schema)
invoke(j_df, "createOrReplaceTempView", "tmp_test")
dfs <- tbl(sc, "tmp_test")
dfs %>% sdf_nrow()

我只有一个列中有字符,所以我认为它会工作,但我得到这个错误:

Error : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 25.0 failed 4 times, most recent failure: Lost task 14.3 in stage 25.0 (TID 15158) (10.221.193.133 executor 2): java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, contents), StringType, false), true, false, true) AS contents#366
    at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1192)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:236)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:208)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:95)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:233)
    ... 28 more

有没有人知道如何转换这个RDD对象(使用R/sparklyr),我在调用函数的返回中得到的这个RDD对象可以在不收集数据的情况下使用?

xwbd5t1u

xwbd5t1u1#

最后,我发现spark_read_text也可以用小丑读取多个文件,但是你必须为每个目录和文件放一个小丑,它不能递归地发现文件夹。
例如:

dfs <- spark_read_text(sc, "/mnt/container/app_id/10/2023/02/06/*")

......不起作用。但是:

dfs <- spark_read_text(sc, "/mnt/container/app_id/10/2023/02/06/*/*/*")

......有效。另外:

dfs <- spark_read_text(sc, "/mnt/container/app_id/*/2023/02/06/*/*/*")

......日期上方有一个小丑也可以。
由于目录深度在我的例子中没有改变,这对我来说已经足够了。

相关问题