我正在尝试过滤一个pyspark.sql.DataFrame并显示结果。我有理由相信过滤器正在工作,但是display/show/collect命令显示数据,这违反了过滤器。
在Azure上的数据块上,我加载了一个delta表,作为一个pyspark.sql.DataFrame,包含40多列和7百万行。为了测试新的分区/版本控制策略,我用不同的datetime值初始化了一个列。
from datetime import datetime, timedelta
import random
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType
timestamps = [datetime.now() - timedelta(days=random.randint(0, 1000)) for i in range(0, 20)]
def create_random_datetime(col):
return random.choice(timestamps)
dtf = F.udf(create_random_datetime, TimestampType())
df = df.withColumn('load_date', dtf(F.col('load_date')))
同样,对于分区,我从时间戳中提取年和月作为新的整数类型列。
df = df.withColumn('load_month', F.month(F.col('load_date')))
df = df.withColumn('load_year', F.year(F.col('load_date')))
现在我想过滤到最后11个时间戳:
cutoff_datetime = df.select('load_date').distinct().orderBy('load_date', ascending=False).collect()[11][0]
cutoff_datetime # datetime.datetime(2022, 3, 25, 6, 37, 27, 261903)
df.select('load_date').filter(F.col('load_date') > cutoff_datetime).distinct().collect()
这显示了包含2021年以来日期的行列表:
[Row(load_date=datetime.datetime(2023, 1, 29, 6, 37, 27, 261897)),
...
Row(load_date=datetime.datetime(2021, 3, 15, 6, 37, 27, 261889)),
...
Row(load_date=datetime.datetime(2021, 3, 28, 6, 37, 27, 261920))]
我有一种直觉,我可能在处理Timestamp数据类型时犯了一个错误。因此,我尝试过滤load_year
和load_month
列。
df_filtered = df.filter('load_year > 2022')
df_filtered.select('load_year', 'load_month', 'load_date').limit(200).collect()
仍然显示2021年的行:
[Row(load_year=2022, load_month=11, load_date=datetime.datetime(2022, 11, 1, 6, 37, 27, 261895)),
Row(load_year=2021, load_month=12, load_date=datetime.datetime(2021, 12, 1, 6, 37, 27, 261924)),
Row(load_year=2021, load_month=2, load_date=datetime.datetime(2021, 2, 5, 6, 37, 27, 261911)),
...
Row(load_year=2023, load_month=6, load_date=datetime.datetime(2023, 6, 6, 6, 37, 27, 261868))]
不同的语法变体(使用F.col
或不将过滤条件定义为字符串文字)会导致相同的结果。使用show、display或collect方法显示相同的意外日期。
我的印象是过滤步骤工作正常:df.filter('load_year > 2023')
返回空DataFrame。
命令
print(f"{df.select('load_year').filter(df.load_year > 2022).count()=:,}, {df.select('load_year').count()=:,}")
print(f"{df.select('load_year').filter(df.load_year > 2021).count()=:,}, {df.select('load_year').count()=:,}")
print(f"{df.select('load_year').filter(df.load_year > 2020).count()=:,}, {df.select('load_year').count()=:,}")
指纹
df.select('load_year').filter(df.load_year > 2022).count()=729,915, df.select('load_year').count()=7,296,062
df.select('load_year').filter(df.load_year > 2021).count()=4,378,659, df.select('load_year').count()=7,296,062
df.select('load_year').filter(df.load_year > 2020).count()=7,296,062, df.select('load_year').count()=7,296,062
这些数字在统计上是合理的,似乎表明过滤器工作正常(尾部的=
使python打印表达式及其值,:,
使值被格式化为1000步之间的命令)。
show/display/collect方法产生错误数据的原因是什么?
重置notebook状态并按顺序运行所有单元格以及重新启动群集都没有改变结果。我还尝试了不同的浏览器(edge和Chrome)。
技术详情:DB运行时:10.4 LTS(包括Apache Spark 3.2.1,Scala 2.12)使用光子和点示例Worker:Standard_DS3_v2(最少2个,最多8个工作者)
1条答案
按热度按时间nvbavucw1#
错误在于没有将udf标记为生成随机时间戳的 * 非确定性 *。
修改udf定义:
dtf = F.udf(create_random_datetime, TimestampType()).asNondeterministic()
使后续筛选器和display/show/collect语句正确工作。
为什么?
我的猜测是,不将函数标记为非确定性允许spark应用快捷方式进行优化。在本例中,查看udf生成的第一个日期,并期望其他调用将返回相同的值。在这种情况下,过滤器可能会应用于 * 预期 * 值,而不是实际值。