pyspark相当于pandas语句

6tqwzwtp  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(481)

我有个问题。我有一个正在清理的sparkDataframe,在pandas中,我通常会使用: df['cases_since_prev_day'] = df.groupby(['county','state'])['cases'].diff().fillna(0).astype(int) 在pyspark和aggregation中有没有一个等价的语句可以使用?
编辑:原来的数据集来自github上的这个表- https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv 快速浏览如下:

date,county,state,fips,cases,deaths
2020-01-21,Snohomish,Washington,53061,1,0
2020-01-22,Snohomish,Washington,53061,1,0
2020-01-23,Snohomish,Washington,53061,1,0
2020-01-24,Cook,Illinois,17031,1,0
2020-01-24,Snohomish,Washington,53061,1,0
2020-01-25,Orange,California,06059,1,0
2020-01-25,Cook,Illinois,17031,1,0
2020-01-25,Snohomish,Washington,53061,1,0

我以前曾使用pandas清理过数据集,并以以下形式生成:

date,county,state,fips,cases,deaths,ISO3166_1,ISO3166_2,cases_since_prev_day,deaths_since_prev_day,Last_Update_Date,Last_Reported_Flag
2020-03-19,Abbeville,South Carolina,45001,1,0,US,SC,0,0,2020-10-21 22:34:14.644190,False

我想用pyspark实现同样的功能。到目前为止,我对代码和输出都有:


# Data from The New York Times, based on reports from state and local health agencies

import pandas as pd
import datetime
import pycountry
import numpy as np
import sys
sys.path.append('../utilities')

from utility_setup import create_spark_session, read_s3_to_dataframes
from pyspark.sql.functions import col, create_map, lit
from pyspark.sql.functions import udf
from write_to_s3 import _write_dataframe_to_csv

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Dataframe - read CSV

bucket = 'covid19datalakesafford'
key = 'us-counties.csv'
spark = create_spark_session('COVID-19 NYT - county cases')
df = read_s3_to_dataframes(spark, bucket, key)
df = df.withColumn("fips",df["fips"].cast('string')).withColumn("ISO3166_1",lit("US"))

# df.state.show(2)

subdivisions = {k.name: k.code.replace("US-", "") for k in pycountry.subdivisions.get(country_code="US")}
mapping_func = lambda x: subdivisions.get(x) 
df = df.withColumn('ISO3166_2',udf(mapping_func)("state"))

# df_2["ISO3166_2"] = df_2["state"].apply(lambda x: subdivisions.get(x))

# Old way using Python alone

# df = pd.read_csv("https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv", \

# dtype={'fips': str})

# df = df.withColumnRenamed("ISO3166_1","US") \

# .withColumnRenamed("ISO3166_2","state")

# df.show(2)

# subdivision_udf = udf(lambda x: subdivisions.get(x))

# function for applying dictionary terms for subdivisions to column

df = df.sort('county', 'date', 'ISO3166_1', 'ISO3166_2')
df.show(2)

# Equivalent in PySpark for lines below?

# df['cases_since_prev_day'] = df.groupby(['county','state'])['cases'].diff().fillna(0).astype(int)

# df["Last_Update_Date"] = datetime.datetime.utcnow()

# df['Last_Reported_Flag'] = df['date'] == df['date'].max()

当前表(前两行):

+----------+---------+--------------+-----+-----+------+---------+---------+
|      date|   county|         state| fips|cases|deaths|ISO3166_1|ISO3166_2|
+----------+---------+--------------+-----+-----+------+---------+---------+
|2020-03-19|Abbeville|South Carolina|45001|    1|     0|       US|       SC|
|2020-03-20|Abbeville|South Carolina|45001|    1|     0|       US|       SC|
+----------+---------+--------------+-----+-----+------+---------+---------+

编辑2:请注意,这是一个时间序列,我预计covid-19病例的列表每天都在增长,因为每个州的每个县/地区每天都会报告这些病例。目前,我的表中的行数接近95万行,使用pandas时速度很慢(需要9分钟才能完成)。

xienkqul

xienkqul1#

这应该会让你(几乎)一直到那里(我没有你的Map函数,所以不能得到两个字母的状态缩写):

from pyspark.sql.window import Window
from pyspark.sql import functions as F

# county-level windowing - like in SQL

win = Window().partitionBy('county', 'state').orderBy('date')

# lags and final date

df = df.withColumn('cases_since_prev_day', F.lag('cases').over(win))
df = df.withColumn('deaths_since_prev_day', F.lag('deaths').over(win))
df = df.withColumn('Last_Update_Date', F.last('date').over(win))

相关问题