我在pyspark做一些复杂的操作,最后一个操作是 flatMap
生成类型为 pyspark.rdd.PipelinedRDD
其内容只是字符串列表:
print(output_data.take(8))
> ['a', 'abc', 'a', 'aefgtr', 'bcde', 'bc', 'bhdsjfk', 'b']
我这样开始我的spark会话(用于测试的本地会话):
spark = SparkSession.builder.appName("my_app")\
.config('spark.sql.shuffle.partitions', '2').master("local").getOrCreate()
我的输入数据如下所示:
input_data = (('a', ('abc', [[('abc', 23)], 23, False, 3])),
('a', ('abcde', [[('abcde', 17)], 17, False, 5])),
('a', ('a', [[('a', 66)], 66, False, 1])),
('a', ('aefgtr', [[('aefgtr', 65)], 65, False, 6])),
('b', ('bc', [[('bc', 25)], 25, False, 2])),
('b', ('bcde', [[('bcde', 76)], 76, False, 4])),
('b', ('b', [[('b', 13)], 13, False, 1])),
('b', ('bhdsjfk', [[('bhdsjfk', 36)], 36, False, 7])))
input_data = sc.parallelize(input_data)
我想把输出rdd转换成一个Dataframe,其中一列如下:
schema = StructType([StructField("term", StringType())])
df = spark.createDataFrame(output_data, schema=schema)
这不起作用,我得到一个错误:
TypeError: StructType can not accept object 'a' in type <class 'str'>
所以我试过了 schema
出现了这个错误:
TypeError: Can not infer schema for type: <class 'str'>
编辑:尝试时也会发生同样的错误 toDF()
.
所以出于某种原因我有一个 pyspark.rdd.PipelinedRDD
其元素不是 StringType
但是标准的python str
.
我对pyspark比较陌生,所以有人能告诉我为什么会发生这种情况吗?
我很惊讶Pypark不能含蓄地 str
至 StringType
.
我不能发布整个代码,只是说我正在用字符串做一些复杂的事情,包括字符串比较和for循环。不过,我并没有明确地打字。
1条答案
按热度按时间dkqlctbz1#
一个解决方案是将您的rdd
String
变成一个Row
具体如下:有趣的是,正如您所提到的,为原始类型(如string)的rdd指定模式是行不通的。但是,如果我们只指定类型,它可以工作,但您不能指定名称。因此,另一种方法就是这样做,并将列重命名为
value
这样地: