将Pandas Dataframe 转换为Spark Dataframe 错误

fhg3lkii  于 2023-01-21  发布在  Apache
关注(0)|答案(7)|浏览(189)

我正试着把PandasDF转换成Spark一号

10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691

代码:

dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)

我得到一个错误:

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
pqwbnv8z

pqwbnv8z1#

我做了这个脚本,它适用于我的10只Pandas

from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return DoubleType()
    elif f == 'float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

gist中也可以看到
使用此函数,只需调用spark_df = pandas_to_spark(pandas_df)

lp0sw83n

lp0sw83n2#

你需要确保你的panda Dataframe 列适合spark正在推断的类型。如果你的panda Dataframe 列出了如下内容:

pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol                    5062 non-null object
Col2                       5062 non-null object

你得到的错误尝试:

df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)

现在,确保.astype(str)是你想要的列类型,基本上,当底层Java代码试图从python中的一个对象推断类型时,它会使用一些观察结果并做出猜测,如果这个猜测不适用于列中的所有数据,它将尝试从panda转换为spark,它将失败。

k10s72fa

k10s72fa3#

类型相关的错误可以通过强制模式来避免,如下所示:

    • 注**:使用原始数据(如上所述)创建文本文件(* test. csv *),并插入假设列名("col1"、"col2"、......、"col25")。
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

pdDF = pd.read_csv("test.csv")

Pandas数据框架的内容:

col1     col2    col3    col4    col5    col6    col7    col8   ... 
0      10000001 1       0       1       12:35   OK      10002   1      ...
1      10000001 2       0       1       12:36   OK      10002   1      ...
2      10000002 1       0       4       12:19   PA      10003   1      ...

接下来,创建架构:

from pyspark.sql.types import *

mySchema = StructType([ StructField("col1", LongType(), True)\
                       ,StructField("col2", IntegerType(), True)\
                       ,StructField("col3", IntegerType(), True)\
                       ,StructField("col4", IntegerType(), True)\
                       ,StructField("col5", StringType(), True)\
                       ,StructField("col6", StringType(), True)\
                       ,StructField("col7", IntegerType(), True)\
                       ,StructField("col8", IntegerType(), True)\
                       ,StructField("col9", IntegerType(), True)\
                       ,StructField("col10", IntegerType(), True)\
                       ,StructField("col11", StringType(), True)\
                       ,StructField("col12", StringType(), True)\
                       ,StructField("col13", IntegerType(), True)\
                       ,StructField("col14", IntegerType(), True)\
                       ,StructField("col15", IntegerType(), True)\
                       ,StructField("col16", IntegerType(), True)\
                       ,StructField("col17", IntegerType(), True)\
                       ,StructField("col18", IntegerType(), True)\
                       ,StructField("col19", IntegerType(), True)\
                       ,StructField("col20", IntegerType(), True)\
                       ,StructField("col21", IntegerType(), True)\
                       ,StructField("col22", IntegerType(), True)\
                       ,StructField("col23", IntegerType(), True)\
                       ,StructField("col24", IntegerType(), True)\
                       ,StructField("col25", IntegerType(), True)])
    • 注意**:True(暗示允许为空)

创建pyspark Dataframe :

df = spark.createDataFrame(pdDF,schema=mySchema)

确认Pandas Dataframe 现在是pyspark Dataframe :

type(df)

输出:

pyspark.sql.dataframe.DataFrame
    • 旁白**

要解决Kate下面的评论--要强加一个通用(字符串)模式,您可以执行以下操作:

df=spark.createDataFrame(pdDF.astype(str))
bcs8qyzn

bcs8qyzn4#

在spark版本〉= 3中,您可以在一行中将panda Dataframe 转换为pyspark Dataframe
创建 Dataframe (pandasDF)

dataset = pd.read_csv("data/AS/test_v2.csv")

sparkDf = spark.createDataFrame(dataset);

如果您对Spark会话变量感到困惑,Spark会话如下所示

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()
gdrx4gfi

gdrx4gfi5#

我已经尝试了这个与您的数据和它的工作:

%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()
cigdeys3

cigdeys36#

我稍微整理/简化了上面的答案:

import pyspark.sql.types as ps_types

def get_equivalent_spark_type(pandas_type):
    """
        This method will retrieve the corresponding spark type given a pandas
        type.

        Args:
            pandas_type (str): pandas data type

        Returns:
            spark data type
    """
    type_map = {
        'datetime64[ns]': ps_types.TimestampType(),
        'int64': ps_types.LongType(),
        'int32': ps_types.IntegerType(),
        'float64': ps_types.DoubleType(),
        'float32': ps_types.FloatType()}
    if pandas_type not in type_map:
        return ps_types.StringType()
    else:
        return type_map[pandas_type]

def pandas_to_spark(spark, pandas_df):
    """
        This method will return a spark dataframe given a pandas dataframe.

        Args:
            spark (pyspark.sql.session.SparkSession): pyspark session
            pandas_df (pandas.core.frame.DataFrame): pandas DataFrame

        Returns:
            equivalent spark DataFrame
    """
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    p_schema = ps_types.StructType([
        ps_types.StructField(column, get_equivalent_spark_type(pandas_type))
        for column, pandas_type in zip(columns, types)])

    return spark.createDataFrame(pandas_df, p_schema)
qhhrdooz

qhhrdooz7#

我收到过一次类似的错误信息,在我的情况下,这是因为我的Pandas Dataframe 包含空。我会建议尝试&处理这个Pandas之前,转换到Spark(这解决了我的情况下的问题)。

相关问题