如何通过deliminator分割spark rdd的行

7vux5j2d  于 2021-05-30  发布在  Hadoop
关注(0)|答案(3)|浏览(303)

我正在尝试将spark中的数据拆分为 Array[String] . 目前,我已将该文件加载到的rdd中 String . > val csvFile = textFile("/input/spam.csv") 我想分一杯羹 , 除沫器。

57hvy0tb

57hvy0tb1#

您应该使用spark csv库,它能够解析您的文件,并允许您指定定界符。此外,它还可以很好地推断模式。我将让您阅读文档,以发现您可以使用的大量选项。
这可能是这样的:

sqlContext.read.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","your delimitor")
.load(pathToFile)

请注意,这将返回一个Dataframe,您可能需要使用它将其转换为rdd .rdd 功能。
当然,您必须将包加载到驱动程序中,它才能工作。

lvjbypge

lvjbypge2#

这是:

val csvFile = textFile("/input/spam.csv").map(line => line.split(","))

你回来了 RDD[Array[String]] .
如果你需要第一列 RDD 然后使用 map 函数仅返回数组中的第一个索引:

val firstCol = csvFile.map(_.(0))
eit6fx6z

eit6fx6z3#

// create spark session
val spark = org.apache.spark.sql.SparkSession.builder
        .master("local")
        .appName("Spark CSV Reader")
        .getOrCreate;

// read csv
val df = spark.read
         .format("csv")
         .option("header", "true") //reading the headers
         .option("mode", "DROPMALFORMED")
         .option("delimiter", ",")
         .load("/your/csv/dir/simplecsv.csv")

// convert dataframe to rdd[row]
val rddRow = df.rdd
// print 2 rows
rddRow.take(2)

// convert df to rdd[string] for specific column
val oneColumn = df.select("colName").as[(String)].rdd
oneColumn.take(2)

// convert df to rdd[string] for multiple columns
val multiColumn = df.select("col1Name","col2Name").as[(String, String)].rdd
multiColumn.take(2)

相关问题