Apache Spark 将列表项转换为定义的数据类型RDD

wlzqhblo  于 2023-05-18  发布在  Apache
关注(0)|答案(1)|浏览(79)

实际上,我在Cloudera的Databricks中的Apache Spark Python工作区工作。这个想法是读取csv并格式化每个字段。
所以,第一步是读取csv:

uber = sc.textFile("dbfs:/mnt/uber/201601/pec2/uber_curated.csv")

下一步是将每一行转换为一个值列表:

uber_parsed = uber.map(lambda lin:lin.split(","))
print (uber_parsed.first())

结果是:

[u'B02765', u'2015-05-08 19:05:00', u'B02764', u'262', u'Manhattan',u'Yorkville East']

但是,现在我需要将下一个值列表中的每一项转换为下一个格式String,Date,String,Integer,String,String。

[[u'B02765', u'2015-05-08 19:05:00', u'B02764', u'262', u'Manhattan', u'Yorkville East'],
[u'B02767', u'2015-05-08 19:05:00', u'B02789', u'400', u'New York', u'Yorkville East']]

有人知道怎么做吗?

sqxo8psd

sqxo8psd1#

您可以使用csv阅读器。在Spark 1.x中,你需要一个外部依赖(spark-csv)。

from pyspark.sql.types import *

sqlContext.read.format("csv").schema(StructType([
    StructField("_1", StringType()),
    StructField("_2", TimestampType()),
    StructField("_3", StringType()),
    StructField("_4", IntegerType()),
    StructField("_5", StringType()),
    StructField("_6", StringType()),
])).load("dbfs:/mnt/uber/201601/pec2/uber_curated.csv").rdd

sqlContext.read.format("csv").schema(StructType([
    StructField("_1", StringType()),
    StructField("_2", DateType()),
    StructField("_3", StringType()),
    StructField("_4", IntegerType()),
    StructField("_5", StringType()),
    StructField("_6", StringType()),
])).option("dateFormat", "yyyy-dd-MM HH:mm:ss").load(
    "dbfs:/mnt/uber/201601/pec2/uber_curated.csv"
).rdd

您可以替换(_1_2.. _n),并带有描述性字段名称。

相关问题