pyspark:值错误:在推断后无法确定某些类型

g6ll5ycj  于 2022-11-01  发布在  Spark
关注(0)|答案(6)|浏览(280)

我有一个Pandas数据框my_dfmy_df.dtypes给出:

ts              int64
fieldA         object
fieldB         object
fieldC         object
fieldD         object
fieldE         object
dtype: object

然后,我尝试通过执行以下操作将Pandas Dataframe my_df转换为Spark Dataframe :

spark_my_df = sc.createDataFrame(my_df)

但是,我得到了以下错误:

ValueErrorTraceback (most recent call last)
<ipython-input-29-d4c9bb41bb1e> in <module>()
----> 1 spark_my_df = sc.createDataFrame(my_df)
      2 spark_my_df.take(20)

/usr/local/spark-latest/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio)
    520             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    521         else:
--> 522             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    523         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    524         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/usr/local/spark-latest/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    384 
    385         if schema is None or isinstance(schema, (list, tuple)):
--> 386             struct = self._inferSchemaFromList(data)
    387             if isinstance(schema, (list, tuple)):
    388                 for i, name in enumerate(schema):

/usr/local/spark-latest/python/pyspark/sql/session.py in _inferSchemaFromList(self, data)
    318         schema = reduce(_merge_type, map(_infer_schema, data))
    319         if _has_nulltype(schema):
--> 320             raise ValueError("Some of types cannot be determined after inferring")
    321         return schema
    322 

ValueError: Some of types cannot be determined after inferring

有人知道上面的错误是什么意思吗?谢谢!

pbwdgjma

pbwdgjma1#

为了推断字段类型,PySpark会查看每个字段中的非none记录。如果字段只有None记录,PySpark就无法推断类型,并会引发该错误。
手动定义方案将解决此问题

>>> from pyspark.sql.types import StructType, StructField, StringType
>>> schema = StructType([StructField("foo", StringType(), True)])
>>> df = spark.createDataFrame([[None]], schema=schema)
>>> df.show()
+----+
|foo |
+----+
|null|
+----+
isr3a4wc

isr3a4wc2#

要解决这个问题,您可以提供自己定义的模式。
例如:
若要重现错误:

>>> df = spark.createDataFrame([[None, None]], ["name", "score"])

若要修正错误:

>>> from pyspark.sql.types import StructType, StructField, StringType, DoubleType
>>> schema = StructType([StructField("name", StringType(), True), StructField("score", DoubleType(), True)])
>>> df = spark.createDataFrame([[None, None]], schema=schema)
>>> df.show()
+----+-----+
|name|score|
+----+-----+
|null| null|
+----+-----+
szqfcxe2

szqfcxe23#

如果您使用的是RDD[Row].toDF() monkey-patched方法,则可以在推断类型时增加采样率以检查100条以上的记录:


# Set sampleRatio smaller as the data size increases

my_df = my_rdd.toDF(sampleRatio=0.01)
my_df.show()

假设RDD中的所有字段都有非空行,那么当sampleRatio向1.0增加时,找到它们的可能性更大。

brccelvz

brccelvz4#

我也遇到过同样的问题,如果你不需要空的列,你可以在导入到spark之前直接从panda Dataframe 中删除它们:

my_df = my_df.dropna(axis='columns', how='all') # Drops columns with all NA values
spark_my_df = sc.createDataFrame(my_df)
oymdgrw7

oymdgrw75#

这可能是因为列的值全部为空。在将这些列转换为spark Dataframe 之前,应删除这些列

jucafojl

jucafojl6#

出现此错误的原因是Spark无法确定panda Dataframe 的数据类型,因此,解决此问题的一种方法是将schema单独传递给spark的createDataFrame函数。
例如,您的Pandas Dataframe 如下所示

d = {
  'col1': [1, 2],
  'col2': ['A', 'B]
}
df = pd.DataFrame(data = d)
print(df)

   col1 col2
0    1   A
1    2   B

当您想要将其转换为Spark Dataframe 时,首先定义模式并将其添加到您的createDataFrame中,如下所示

from pyspark.sql.types import StructType, StructField, LongType, StringType

schema = StructType([
  StructField("col1", LongType()),
  StructField("col2", StringType()),
])

spark_df = spark.createDataFrame(df, schema = schema)

相关问题