pyspark 从行创建DataFrame会导致“推断架构问题”

8e2ybdfx  于 2023-03-17  发布在  Spark
关注(0)|答案(4)|浏览(162)

当我开始学习PySpark的时候,我使用列表来创建dataframe。现在从列表中推断模式已经被弃用了,我得到了一个警告,它建议我使用pyspark.sql.Row。然而,当我尝试使用Row创建一个时,我遇到了推断模式问题。这是我的代码:

>>> row = Row(name='Severin', age=33)
>>> df = spark.createDataFrame(row)

这将导致以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/spark2-client/python/pyspark/sql/session.py", line 526, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/spark2-client/python/pyspark/sql/session.py", line 390, in _createFromLocal
    struct = self._inferSchemaFromList(data)
  File "/spark2-client/python/pyspark/sql/session.py", line 322, in _inferSchemaFromList
    schema = reduce(_merge_type, map(_infer_schema, data))
  File "/spark2-client/python/pyspark/sql/types.py", line 992, in _infer_schema
    raise TypeError("Can not infer schema for type: %s" % type(row))
TypeError: Can not infer schema for type: <type 'int'>

所以我创建了一个模式

>>> schema = StructType([StructField('name', StringType()), 
...                      StructField('age',IntegerType())])
>>> df = spark.createDataFrame(row, schema)

但随后会抛出这个错误。

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/spark2-client/python/pyspark/sql/session.py", line 526, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/spark2-client/python/pyspark/sql/session.py", line 387, in _createFromLocal
    data = list(data)
  File "/spark2-client/python/pyspark/sql/session.py", line 509, in prepare
    verify_func(obj, schema)
  File "/spark2-client/python/pyspark/sql/types.py", line 1366, in _verify_type
    raise TypeError("StructType can not accept object %r in type %s" % (obj, type(obj)))
TypeError: StructType can not accept object 33 in type <type 'int'>
hs1ihplo

hs1ihplo1#

createDataFrame函数接受一个行列表(以及其他选项)加上模式,因此正确的代码如下所示:

from pyspark.sql.types import *
from pyspark.sql import Row

schema = StructType([StructField('name', StringType()), StructField('age',IntegerType())])
rows = [Row(name='Severin', age=33), Row(name='John', age=48)]
df = spark.createDataFrame(rows, schema)

df.printSchema()
df.show()

输出:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Severin| 33|
|   John| 48|
+-------+---+

在pyspark文档(link)中,您可以找到有关createDataFrame函数的更多详细信息。

to94eoyn

to94eoyn2#

你需要创建一个Row类型的列表,并把这个列表和schema一起传递给你的createDataFrame()方法。

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
department1 = Row(id='AAAAAAAAAAAAAA', type='XXXXX',cost='2')
department2 = Row(id='AAAAAAAAAAAAAA', type='YYYYY',cost='32')
department3 = Row(id='BBBBBBBBBBBBBB', type='XXXXX',cost='42')
department4 = Row(id='BBBBBBBBBBBBBB', type='YYYYY',cost='142')
department5 = Row(id='BBBBBBBBBBBBBB', type='ZZZZZ',cost='149')
department6 = Row(id='CCCCCCCCCCCCCC', type='XXXXX',cost='15')
department7 = Row(id='CCCCCCCCCCCCCC', type='YYYYY',cost='23')
department8 = Row(id='CCCCCCCCCCCCCC', type='ZZZZZ',cost='10')

schema = StructType([StructField('id', StringType()), StructField('type',StringType()),StructField('cost', StringType())])
rows = [department1,department2,department3,department4,department5,department6,department7,department8 ]
df = spark.createDataFrame(rows, schema)
oo7oh9g9

oo7oh9g93#

如果你只是制作一个panda Dataframe ,你可以将每行转换成一个dict,然后依赖panda的类型推断,如果这对你的需求足够好的话。

import pandas as pd

sample = output.head(5) #this returns a list of Row objects

df = pd.DataFrame([x.asDict() for x in sample])
sq1bmfud

sq1bmfud4#

我最近也遇到过类似的问题,这里的答案帮助我更好地理解这个问题。
我的代码:

row = Row(name="Alice", age=11)  
spark.createDataFrame(row).show()

导致了一个非常相似的错误

An error was encountered:  
Can not infer schema for type: <class 'int'>  
Traceback ...

问题的原因
createDataFrame需要一个行数组。因此,如果您只有一行,并且不想创建更多行,只需将其设置为数组即可:[行]

row = Row(name="Alice", age=11)
spark.createDataFrame([row]).show()

相关问题