我有个问题。我有一个正在清理的sparkDataframe,在pandas中,我通常会使用: df['cases_since_prev_day'] = df.groupby(['county','state'])['cases'].diff().fillna(0).astype(int)
在pyspark和aggregation中有没有一个等价的语句可以使用?
编辑:原来的数据集来自github上的这个表- https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv
快速浏览如下:
date,county,state,fips,cases,deaths
2020-01-21,Snohomish,Washington,53061,1,0
2020-01-22,Snohomish,Washington,53061,1,0
2020-01-23,Snohomish,Washington,53061,1,0
2020-01-24,Cook,Illinois,17031,1,0
2020-01-24,Snohomish,Washington,53061,1,0
2020-01-25,Orange,California,06059,1,0
2020-01-25,Cook,Illinois,17031,1,0
2020-01-25,Snohomish,Washington,53061,1,0
我以前曾使用pandas清理过数据集,并以以下形式生成:
date,county,state,fips,cases,deaths,ISO3166_1,ISO3166_2,cases_since_prev_day,deaths_since_prev_day,Last_Update_Date,Last_Reported_Flag
2020-03-19,Abbeville,South Carolina,45001,1,0,US,SC,0,0,2020-10-21 22:34:14.644190,False
我想用pyspark实现同样的功能。到目前为止,我对代码和输出都有:
# Data from The New York Times, based on reports from state and local health agencies
import pandas as pd
import datetime
import pycountry
import numpy as np
import sys
sys.path.append('../utilities')
from utility_setup import create_spark_session, read_s3_to_dataframes
from pyspark.sql.functions import col, create_map, lit
from pyspark.sql.functions import udf
from write_to_s3 import _write_dataframe_to_csv
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Dataframe - read CSV
bucket = 'covid19datalakesafford'
key = 'us-counties.csv'
spark = create_spark_session('COVID-19 NYT - county cases')
df = read_s3_to_dataframes(spark, bucket, key)
df = df.withColumn("fips",df["fips"].cast('string')).withColumn("ISO3166_1",lit("US"))
# df.state.show(2)
subdivisions = {k.name: k.code.replace("US-", "") for k in pycountry.subdivisions.get(country_code="US")}
mapping_func = lambda x: subdivisions.get(x)
df = df.withColumn('ISO3166_2',udf(mapping_func)("state"))
# df_2["ISO3166_2"] = df_2["state"].apply(lambda x: subdivisions.get(x))
# Old way using Python alone
# df = pd.read_csv("https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv", \
# dtype={'fips': str})
# df = df.withColumnRenamed("ISO3166_1","US") \
# .withColumnRenamed("ISO3166_2","state")
# df.show(2)
# subdivision_udf = udf(lambda x: subdivisions.get(x))
# function for applying dictionary terms for subdivisions to column
df = df.sort('county', 'date', 'ISO3166_1', 'ISO3166_2')
df.show(2)
# Equivalent in PySpark for lines below?
# df['cases_since_prev_day'] = df.groupby(['county','state'])['cases'].diff().fillna(0).astype(int)
# df["Last_Update_Date"] = datetime.datetime.utcnow()
# df['Last_Reported_Flag'] = df['date'] == df['date'].max()
当前表(前两行):
+----------+---------+--------------+-----+-----+------+---------+---------+
| date| county| state| fips|cases|deaths|ISO3166_1|ISO3166_2|
+----------+---------+--------------+-----+-----+------+---------+---------+
|2020-03-19|Abbeville|South Carolina|45001| 1| 0| US| SC|
|2020-03-20|Abbeville|South Carolina|45001| 1| 0| US| SC|
+----------+---------+--------------+-----+-----+------+---------+---------+
编辑2:请注意,这是一个时间序列,我预计covid-19病例的列表每天都在增长,因为每个州的每个县/地区每天都会报告这些病例。目前,我的表中的行数接近95万行,使用pandas时速度很慢(需要9分钟才能完成)。
1条答案
按热度按时间xienkqul1#
这应该会让你(几乎)一直到那里(我没有你的Map函数,所以不能得到两个字母的状态缩写):