我有以下pyspark Dataframe ,其中包含ID和QUARTER两个字段:
pandas_df = pd.DataFrame({"ID":[1, 2, 3,4, 5, 3,5,6,3,7,2,6,8,9,1,7,5,1,10],"QUARTER":[1, 1, 1, 1, 1,2,2,2,3,3,3,3,3,4,4,5,5,5,5]})
spark_df = spark.createDataFrame(pandas_df)
spark_df.createOrReplaceTempView('spark_df')
我有下面的列表,其中包含我想要的5个季度中每个季度的条目数
numbers=[2,1,3,1,2]
我希望每次从每个季度中选择的行数等于列表“numbers”中指示的数字。我应该尊重ID
在末尾应该是唯一的。这意味着如果我在某个季度中选择了一个ID,我不应该在其他季度中再次重新选择它。
为此,我使用了下面的pyspark代码:
quart=1 # the first quarter
liste_unique=[] # an empty list that will contains the unique Id values to compare with
for i in range(0,len(numbers)):
tmp=spark_df.where(spark_df.QUARTER==quart)# select only rows with the chosed quarter
tmp=tmp.where(tmp.ID.isin(liste_unique)==False)# the selected id were not selected before
w = Window().partitionBy(lit('col_count0')).orderBy(lit('col_count0'))#dummy column
df_final=tmp.withColumn("row_num", row_number().over(w)).filter(col("row_num").between(1,numbers[i])) # number of rows needed from the 'numbers list'
df_final=df_final.drop(col("row_num")) # drop the row num column
liste_tempo=df_final.select(['ID']).rdd.map(lambda x : x[0]).collect() # transform the selected id into list
liste_unique.extend(liste_tempo) # extend the list of unique id each time we select new rows from a quarter
df0=df0.union(df_final) # union the empty list each time with the selected data in each quarter
quart=quart+1 #increment the quarter
df0在开始时只是一个空列表。它将在末尾包含所有数据,它可以声明如下
spark = SparkSession.builder.appName('Empty_Dataframe').getOrCreate()
# Create an empty schema
columns = StructType([StructField('ID',
StringType(), True),
StructField('QUARTER',
StringType(), True)
])
df0 = spark.createDataFrame(data = [],
schema = columns)
代码工作正常,没有错误,除了我可以找到重复的ID在不同的季度,这是不正确的。此外,一个奇怪的行为是当我试图在df0 Dataframe 计数唯一ID的数量(在一个新的不同的单元格)
print(df0.select('ID').distinct().count())
它在每次执行时给出不同的值,即使 Dataframe 没有与任何其他进程接触(在较大的数据集上比示例更清楚)。我无法理解这种行为,我尝试使用unpersist(True)
删除该高速缓存或临时变量,但没有任何变化。我怀疑Union
函数使用错误,但我在pyspark中没有找到任何替代方法。
1条答案
按热度按时间8xiog9wr1#
我试着清理这个逻辑和python代码,因为它让我感到困惑。下面是我的版本,其中包含语句背后的逻辑和推理。在第1节中,我创建了一个 Dataframe 和临时视图。请注意,不需要使用panda。此代码在Azure Databricks上进行了测试。
上图显示了所有19条记录。我感兴趣的是,您有5个季度。如果计算财政季度,则只有4个季度。实际上,旧代码会根据数字数组的长度不断递增。
业务逻辑规定,从第1季度选择n(0)个id。然后从第2季度选择n(1)个id并进行替换,以此类推。因此,最终结果将等于或小于不同id的总数。n[]再次是input picks数组。
如果我们对临时视图执行一个简单的查询,我们会看到列出了ID 1到10。
为了简化工作,我将业务逻辑打包到用户定义的函数中。
该逻辑选择给定季度的所有数据,所选数字的输入数组用于确保我们选择的是没有替换的新数字,我使用兰德()函数在限制结果之前对数据进行排序。
我冒昧地将numbers数组转换为元组数组(picks,quarter),并保留了调试代码,以便您可以看到正在执行的动态查询和返回的结果。
与我第一次使用Spark SQL时没有兰德()命令不同,每次执行代码时都会得到不同的结果,但是,最后的数组只有x个元素,等于或小于不同id的数量。
上图显示数字9还没有被选中,而且,我没有被告知是否需要记录选中数字的季度,如果是这样,修改代码以返回元组列表(id,qtr)。
总之,逻辑是健全的,我希望它有助于解决您的业务问题。