如何让PySpark将列识别为datetime类型?

xpszyzbs  于 2023-05-23  发布在  Apache
关注(0)|答案(3)|浏览(245)

我使用SparkSession.createDataFrame从一个dict列表中创建一个Dataframe,如下所示:

data=[
    {
        'id':1,
        'create_time':datetime.datetime('2022','9','9','0','0','0')
    },
    {
        'id':2,
        'create_time':datetime.datetime('2022','9','9','0','0','0')
    }
]

dataframe = sparkSession.createDataFrame(data)

但是Spark引发了一个例外:
pyspark.sql.utils.AnalysisException:无法解析给定输入列的“create_time”
这是因为PySpark无法解析datetime.datetime类型吗?我应该如何转换'create_time'的值,以便让Spark将此列识别为datetime类型?

wlwcrazw

wlwcrazw1#

为了解决这个问题,我们需要了解列表、元组和数据类型。这是创建转换为 Dataframe 的Python结构的关键。但是,推断模式与定义模式同样重要。
首先,我将从两个元组创建一个dataframe。第一个字段是整数,第二个字段是字符串。我将数据和列作为参数提供。在这种情况下,Spark正在推断数据。

#
# 1 - Create sample dataframe + view
#

# array of tuples - data
dat1 = [
  (1, "2022-09-09T14:00:00"),
  (2, "2022-09-09T16:00:00")
]

# array of names - columns
col1 = ["event_id", "event_start"]

# make data frame
df1 = spark.createDataFrame(data=dat1, schema=col1)

# make temp hive view
df1.createOrReplaceTempView("event_data1")

# show schema
df1.printSchema()

下面的屏幕显示数据在源列表中被格式化为数字和字符串。由于我们只是将没有任何模式定义的列名传递给create data frame方法,因此将推断出结果数据类型。得到的 Dataframe 具有用于列的long和string数据类型。

其次,我们不仅可以在源列表中更改数据类型,还可以提供模式。对于大型ASCII格式(如CSV、JSON和XML),提供模式是关键。这会阻止Spark引擎阅读整个文件来推断数据类型。

#
# 2 - Create sample dataframe + view
#

from datetime import datetime
from pyspark.sql.types import *

# array of tuples - data
dat2 = [
  (1, datetime.strptime('2022-09-09 14:00:00',  '%Y-%m-%d %H:%M:%S') ),
  (2, datetime.strptime('2022-09-09 16:00:00', '%Y-%m-%d %H:%M:%S') )
]

# array of names - columns
col2 = StructType([
   StructField("event_id", IntegerType(), True),
   StructField("event_start", TimestampType(), True)])

# make data frame
df2 = spark.createDataFrame(data=dat2, schema=col2)

# make temp hive view
df2.createOrReplaceTempView("event_data2")

# show schema
df2.printSchema()

下面的图像显示我们现在有一个整数和时间戳数据类型的列表和 Dataframe 。

有时候,数据本身就是有问题的。因此,我们希望将数据作为字符串导入,然后应用转换函数。
第三,之后的数据转换很好地处理了畸形数据。

#
# 3 - Create sample dataframe + view
#

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import *

# array of tuples - data
dat3 = [
#  (1, '2022-09-09 14:00:00'),
  (1, '2'),
  (2, '2022-09-09 16:00:00')
]

# array of names - columns
col3 = StructType([
   StructField("event_id", IntegerType(), True),
   StructField("event_start", StringType(), True)])

# make data frame
df3 = spark.createDataFrame(data=dat3, schema=col3)
df3 = df3.withColumn("event_start", to_timestamp(col("event_start")))

# make temp hive view
df3.createOrReplaceTempView("event_data3")

# show schema
df3.printSchema()

下图显示了年份为“2”的日期被转换为空值,因为它无效。这种格式错误的数据会破坏上面的时间戳示例。

简而言之,了解您的传入数据。分析数据的错误值。然后确定加载数据的最佳方法。请始终记住,提供模式会加快某些类型文件的加载速度。

6rqinv9w

6rqinv9w2#

正如已经提到的评论:使用整数表示日期时间:

data=[
    {
        'id':1,
        'create_time':datetime.datetime(2022,9,9,0,0,0)
    },
    {
        'id':2,
        'create_time':datetime.datetime(2023,9,9,0,0,0)
    }
]

dataframe = spark.createDataFrame(data)

在这里,我建议遵循官方文档,并使用Spark为SparkSession处理相同的变量命名。
关于你在评论中的问题:
如果你检查你的 Dataframe

print(dataframe)

>>>DataFrame[create_time: timestamp, id: bigint]

您可能会注意到,create_timeid都有一个类型。这是合理的,因为每个数据项都需要一个数据类型。在Python中,数据类型是动态提供的。我在这里假设(我并不完全喜欢Spark)Spark Dataframe 使用静态数据类型。因此,即使您没有指定id列的类型,只要您使用createDataFrame方法,该类型将根据此时number变量类型的数据类型确定。所以基本上如果我用

data=[
    {
        'id':1.0,
        // ...

    },
    {
        'id':2.0,
        // ...
    }
]

它将不被表示为bigint,而是被表示为double。如果你尝试混合两个类型,比如第一个是double,第二个是bigint,你会看到这个错误消息:

TypeError: field id: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.LongType'>

这在某种程度上证明了我对静态类型的假设。
因此,即使您不想使用模式,Spark也会根据您的data输入确定模式,如下所示

dataframe.printSchema()
dataframe.show()
>>>root
    |-- create_time: timestamp (nullable = true)
    |-- id: double (nullable = true)

>>>+-------------------+---+
   |        create_time| id|
   +-------------------+---+
   |2022-09-09 00:00:00|  1|
   |2022-09-09 00:00:00|  2|
   +-------------------+---+

会出现的。

qhhrdooz

qhhrdooz3#

对于那些寻找更短版本的人:

>>> data = [{'ts': datetime.fromisoformat(s)} for s in ['2020-01-01', '2020-01-01T11:22:33', '2020-01-01T11:22:33.444']]
>>> data
[{'ts': datetime.datetime(2020, 1, 1, 0, 0)}, {'ts': datetime.datetime(2020, 1, 1, 11, 22, 33)}, {'ts': datetime.datetime(2020, 1, 1, 11, 22, 33, 444000)}]
>>> spark.createDataFrame(data=data).show(truncate=False)
+-----------------------+
|ts                     |
+-----------------------+
|2020-01-01 00:00:00    |
|2020-01-01 11:22:33    |
|2020-01-01 11:22:33.444|
+-----------------------+
>>>

相关问题