pyspark:由于数据类型str而不是stringtype,无法将rdd转换为Dataframe

zf9nrax1  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(1124)

我在pyspark做一些复杂的操作,最后一个操作是 flatMap 生成类型为 pyspark.rdd.PipelinedRDD 其内容只是字符串列表:

print(output_data.take(8))
> ['a', 'abc', 'a', 'aefgtr', 'bcde', 'bc', 'bhdsjfk', 'b']

我这样开始我的spark会话(用于测试的本地会话):

spark = SparkSession.builder.appName("my_app")\
    .config('spark.sql.shuffle.partitions', '2').master("local").getOrCreate()

我的输入数据如下所示:

input_data = (('a', ('abc', [[('abc', 23)], 23, False, 3])),
              ('a', ('abcde', [[('abcde', 17)], 17, False, 5])),
              ('a', ('a', [[('a', 66)], 66, False, 1])),
              ('a', ('aefgtr', [[('aefgtr', 65)], 65, False, 6])),
              ('b', ('bc', [[('bc', 25)], 25, False, 2])),
              ('b', ('bcde', [[('bcde', 76)], 76, False, 4])),
              ('b', ('b', [[('b', 13)], 13, False, 1])),
              ('b', ('bhdsjfk', [[('bhdsjfk', 36)], 36, False, 7])))
input_data = sc.parallelize(input_data)

我想把输出rdd转换成一个Dataframe,其中一列如下:

schema = StructType([StructField("term", StringType())])
df = spark.createDataFrame(output_data, schema=schema)

这不起作用,我得到一个错误:

TypeError: StructType can not accept object 'a' in type <class 'str'>

所以我试过了 schema 出现了这个错误:

TypeError: Can not infer schema for type: <class 'str'>

编辑:尝试时也会发生同样的错误 toDF() .
所以出于某种原因我有一个 pyspark.rdd.PipelinedRDD 其元素不是 StringType 但是标准的python str .
我对pyspark比较陌生,所以有人能告诉我为什么会发生这种情况吗?
我很惊讶Pypark不能含蓄地 strStringType .
我不能发布整个代码,只是说我正在用字符串做一些复杂的事情,包括字符串比较和for循环。不过,我并没有明确地打字。

dkqlctbz

dkqlctbz1#

一个解决方案是将您的rdd String 变成一个 Row 具体如下:

from pyspark.sql import Row
df = spark.createDataFrame(output_data.map(lambda x: Row(x)), schema=schema)

# or with a simple list of names as a schema

df = spark.createDataFrame(output_data.map(lambda x: Row(x)), schema=['term'])

# or even use `toDF`:

df = output_data.map(lambda x: Row(x)).toDF(['term'])

# or another variant

df = output_data.map(lambda x: Row(term=x)).toDF()

有趣的是,正如您所提到的,为原始类型(如string)的rdd指定模式是行不通的。但是,如果我们只指定类型,它可以工作,但您不能指定名称。因此,另一种方法就是这样做,并将列重命名为 value 这样地:

from pyspark.sql import functions as F
df = spark.createDataFrame(output_data, StringType())\
          .select(F.col('value').alias('term'))

# or similarly

df = spark.createDataFrame(output_data, "string")\
          .select(F.col('value').alias('term'))

相关问题