Pyspark Dataframe 中的异常行为

bxfogqkk  于 2023-03-03  发布在  Apache
关注(0)|答案(1)|浏览(151)

我有以下pyspark Dataframe ,其中包含ID和QUARTER两个字段:

pandas_df = pd.DataFrame({"ID":[1, 2, 3,4, 5, 3,5,6,3,7,2,6,8,9,1,7,5,1,10],"QUARTER":[1, 1, 1, 1, 1,2,2,2,3,3,3,3,3,4,4,5,5,5,5]})
spark_df = spark.createDataFrame(pandas_df)
spark_df.createOrReplaceTempView('spark_df')

我有下面的列表,其中包含我想要的5个季度中每个季度的条目数

numbers=[2,1,3,1,2]

我希望每次从每个季度中选择的行数等于列表“numbers”中指示的数字。我应该尊重ID在末尾应该是唯一的。这意味着如果我在某个季度中选择了一个ID,我不应该在其他季度中再次重新选择它。
为此,我使用了下面的pyspark代码:

quart=1 # the first quarter
liste_unique=[] # an empty list that will contains the unique Id values to compare with
for i in range(0,len(numbers)):
  tmp=spark_df.where(spark_df.QUARTER==quart)# select only rows with the chosed quarter
  tmp=tmp.where(tmp.ID.isin(liste_unique)==False)# the selected id were not selected before
  w = Window().partitionBy(lit('col_count0')).orderBy(lit('col_count0'))#dummy column
  df_final=tmp.withColumn("row_num", row_number().over(w)).filter(col("row_num").between(1,numbers[i])) # number of rows needed from the 'numbers list'
  df_final=df_final.drop(col("row_num")) # drop the row num column
  liste_tempo=df_final.select(['ID']).rdd.map(lambda x : x[0]).collect() # transform the selected  id into list 

 liste_unique.extend(liste_tempo) # extend the list of unique id each time we select new rows from a quarter
  
  df0=df0.union(df_final) # union the empty list each time with the selected data in each quarter
  
  quart=quart+1 #increment the quarter

df0在开始时只是一个空列表。它将在末尾包含所有数据,它可以声明如下

spark = SparkSession.builder.appName('Empty_Dataframe').getOrCreate()
 
# Create an empty schema

columns = StructType([StructField('ID',
                                  StringType(), True),
                    StructField('QUARTER',
                                StringType(), True)
                      ])

df0 = spark.createDataFrame(data = [],
                           schema = columns)

代码工作正常,没有错误,除了我可以找到重复的ID在不同的季度,这是不正确的。此外,一个奇怪的行为是当我试图在df0 Dataframe 计数唯一ID的数量(在一个新的不同的单元格)

print(df0.select('ID').distinct().count())

它在每次执行时给出不同的值,即使 Dataframe 没有与任何其他进程接触(在较大的数据集上比示例更清楚)。我无法理解这种行为,我尝试使用unpersist(True)删除该高速缓存或临时变量,但没有任何变化。我怀疑Union函数使用错误,但我在pyspark中没有找到任何替代方法。

8xiog9wr

8xiog9wr1#

我试着清理这个逻辑和python代码,因为它让我感到困惑。下面是我的版本,其中包含语句背后的逻辑和推理。在第1节中,我创建了一个 Dataframe 和临时视图。请注意,不需要使用panda。此代码在Azure Databricks上进行了测试。

#
# 1 - create data frame + temporary view
#

# two lists
data1 = [1, 2, 3, 4, 5, 3, 5, 6, 3, 7, 2, 6, 8, 9, 1, 7, 5, 1, 10]
data2 = [1, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 5, 5, 5, 5]
  
# columns
columns1 = ['id', 'quarter']
  
# creating df
df1 = spark.createDataFrame(zip(data1, data2), columns1)

# create view
df1.createOrReplaceTempView('sample_data')

# show data
display(df1)

上图显示了所有19条记录。我感兴趣的是,您有5个季度。如果计算财政季度,则只有4个季度。实际上,旧代码会根据数字数组的长度不断递增。
业务逻辑规定,从第1季度选择n(0)个id。然后从第2季度选择n(1)个id并进行替换,以此类推。因此,最终结果将等于或小于不同id的总数。n[]再次是input picks数组。

如果我们对临时视图执行一个简单的查询,我们会看到列出了ID 1到10。
为了简化工作,我将业务逻辑打包到用户定义的函数中。

#
# 2 - write function for business logic
#

def pick_logic(num, qtr, ary):
  
  # which qtr to select?
  stmt = "select id from sample_data where quarter = '{}'".format(qtr)
  
  # which ids to not repick
  if (len(ary) > 0):
    stmt = stmt + " and id not in ({})".format(','.join([str(i) for i in ary]))

  # how many to limit
  stmt = stmt + " order by rand() limit {};".format(num)
   
  # debugging
  print(stmt)
  
  # create df
  df = spark.sql(stmt)
  
  # convert df -> lst + return
  return list(df.select('id').toPandas()['id'])

该逻辑选择给定季度的所有数据,所选数字的输入数组用于确保我们选择的是没有替换的新数字,我使用兰德()函数在限制结果之前对数据进行排序。

#
# 3 - call functions with tuples
#

# picks per qtr
picks_per_qtr = [(2,1), (1,2), (3,3), (1,4), (2,5), (3,1)]

# list of ids
i = []

# for each tuple, call business logic
for items in picks_per_qtr:
  
  # choose n values from quarter q
  n, q = items
  
  # get an array of new ids
  r = pick_logic(n, q, i)
  
  # show picking
  print(r)
  
  # append to id list
  i = i + r

print("final result is {}".format(i))

我冒昧地将numbers数组转换为元组数组(picks,quarter),并保留了调试代码,以便您可以看到正在执行的动态查询和返回的结果。
与我第一次使用Spark SQL时没有兰德()命令不同,每次执行代码时都会得到不同的结果,但是,最后的数组只有x个元素,等于或小于不同id的数量。

上图显示数字9还没有被选中,而且,我没有被告知是否需要记录选中数字的季度,如果是这样,修改代码以返回元组列表(id,qtr)。
总之,逻辑是健全的,我希望它有助于解决您的业务问题。

相关问题