如何使用spark不断更新hdfs目录并基于字符串(行)将输出拆分为多个hdfs文件进行读取?

pdkcd3nj  于 2021-06-01  发布在  Hadoop
关注(0)|答案(3)|浏览(311)

详细的场景->hdfs目录,其中“fed”了多种类型的银行账户活动的新日志数据。每行表示一个随机活动类型,每行(字符串)包含文本“activitytype”= <TheTypeHere> ".
在spark scala中,读取hdfs目录中的输入文件并输出多个hdfs文件(其中每个activitytype都写入自己的新文件)的最佳方法是什么?

nbnkbykc

nbnkbykc1#

您可以使用RDD执行类似操作,我假设您有可变长度的文件,然后转换为dfs:

val rdd = sc.textFile("/FileStore/tables/activity.txt")
val rdd2 = rdd.map(_.split(","))
          .keyBy(_(0))
val rdd3 = rdd2.map(x => (x._1, x._2.mkString(",")))
val df = rdd3.toDF("K", "V")  
//df.show(false)

df.write.partitionBy("K").text("SO_QUESTION")

输入为:

ActivityType=<ACT_001>,34,56,67,89,90
ActivityType=<ACT_002>,A,1,2
ActivityType=<ACT_003>,ABC

然后我得到3个输出文件,在本例中每个记录1个。在databricks中也很难表现出来。
您可以调整您的输出格式和位置等。分区是这里的关键。

ybzsozfc

ybzsozfc2#

您可以对此使用multipleoutputformat。请将rdd转换为键值对,以便activitytype是键。spark将为不同的键创建不同的文件。您可以根据键决定文件的放置位置及其名称。

vaqhlq81

vaqhlq813#

修改了对声明的第一个答复:
“key”字符串在父字符串中的位置是随机的,唯一可以保证的是它包含子字符串,在本例中是“activitytype”,后跟一些val。
问题真的是关于这个。下面是:

// SO Question
val rdd = sc.textFile("/FileStore/tables/activitySO.txt")  
val rdd2 = rdd.map(x => (x.slice (x.indexOfSlice("ActivityType=<")+14, x.indexOfSlice(">", (x.indexOfSlice("ActivityType=<")+14))), x))
val df = rdd2.toDF("K", "V")
df.write.partitionBy("K").text("SO_QUESTION2")

输入为:

ActivityType=<ACT_001>,34,56,67,89,90
3,4,4,ActivityType=<ACT_002>,A,1,2
ABC,ActivityType=<ACT_0033>
DEF,ActivityType=<ACT_0033>

输出是3个文件,其中键不是activitytype=,而是act\u 001等。键数据没有被剥离,它仍然在字符串中。如果需要,还可以修改输出位置和格式。

相关问题