pyspark 错误:Spark驱动程序因内存问题意外停止

sqyvllje  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(131)

我有下面的代码,我需要重用前一天的标志。所以我在循环运行。我不能在这里使用偏移量,因为一旦我知道前一天的国旗,那么只有我可以在今天使用它。所以,这个循环运行了1000次,之后每当我尝试对'data_wt_flag1'进行一些操作时,它都会花费太多时间,过了一段时间,就会导致“spark driver stopped unexpectedly”错误。我认为这是由于记忆问题。有没有更好的方法来写这个逻辑?正如我提到的,我不能使用偏移。

DateList=data.select("Date").distinct().orderBy('AsOfDate').rdd.flatMap(lambda x: x).collect()
Flag_list=[]

data_wt_flag1=spark.createDataFrame(data = [],schema = StructType([]))

for daily_date in DateList:
  print(daily_date)  
  Temp_data_daily=data.filter(col("Date").isin(daily_date))
  Temp_data_daily=Temp_data_daily.withColumn('lag_1',when(col("identifier").isin(Flag_list),1).otherwise(0))
    
  Temp_data_daily=Temp_data_daily.withColumn("condition_1", when(((col('col_1')==1) & ((col('col_2')==1) | (col('lag_1')==1))),1).otherwise(0))
  Flag_list=Temp_data_daily.filter(col('condition_1')==1).select("identifier").distinct().rdd.flatMap(lambda x: x).collect() 
  data_wt_flag1=data_wt_flag1.unionByName(Temp_data_daily,allowMissingColumns=True)

Word中代码的逻辑:
如果(col_1==1且(col_2==1或昨天(condition_1)==1)),则今天(condition_1)=1,否则为0。
因此,对于数据中的第一个日期,yesterday(condition_1)对于所有标识符都将为0,因此我最初在循环中传递null flag_list,然后它将在每次迭代中保持变化,并将用于在下一次迭代中标记标识符,从而创建lag_condition_1
下面是示例数据。我只显示了需要的列。
| 标识符|日期|col_1| col_2|
| --|--|--|--|
| ABC| 2023-08- 20 2023-08-20 2023-08-20| 1 | 1 |
| GHI| 2023-08- 20 2023-08-20 2023-08-20| 0 | 0 |
| ABC| 2023-08-21| 1 | 0 |
| GHI| 2023-08-21| 1 | 0 |
| ABC| 2023-08-22| 1 | 0 |
| GHI| 2023-08-22| 1 | 0 |
| ABC| 2023-08-23 2023-08-23| 1 | 0 |
| GHI| 2023-08-23 2023-08-23| 0 | 0 |
下表显示了所需的输出。
| 标识符|日期|col_1| col_2|滞后条件1|条件_1|
| --|--|--|--|--|--|
| ABC| 2023-08- 20 2023-08-20 2023-08-20| 1 | 1 | 0 | 1 |
| GHI| 2023-08- 20 2023-08-20 2023-08-20| 0 | 0 | 0 | 0 |
| ABC| 2023-08-21| 1 | 0 | 1 | 1 |
| GHI| 2023-08-21| 1 | 0 | 0 | 0 |
| ABC| 2023-08-22| 1 | 0 | 1 | 1 |
| GHI| 2023-08-22| 1 | 0 | 0 | 0 |
| ABC| 2023-08-23 2023-08-23| 1 | 0 | 1 | 1 |
| GHI| 2023-08-23 2023-08-23| 0 | 0 | 0 | 0 |
这里,对于第一个日期,所有lag_1都是0,因为我传递的是一个空列表。然后对于第二个日期,我们将使ABC和MNO的lag_1=1,因为它在前一个日期中具有condition_1=1。

3pmvbmvn

3pmvbmvn1#

是的,窗口函数是你的首选:

import org.apache.spark.sql.{Row, functions => F}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{DateType, StructType, StructField, StringType, IntegerType}

// Sample data
val data = Seq(
    ("ABC", "2023-08-20", 1),
    ("DEF", "2023-08-20", 0),
    ("GHI", "2023-08-20", 0),
    ("MNO", "2023-08-20", 1),
    ("XYZ", "2023-08-20", 0),
    ("ABC", "2023-08-21", 0),
    ("DEF", "2023-08-21", 1),
    ("GHI", "2023-08-21", 0),
    ("MNO", "2023-08-21", 0),
    ("XYZ", "2023-08-21", 0),
)

// Optional: define the schema
val schema = StructType(Seq(
  StructField("Identifier", StringType, nullable = false),
  StructField("Date", DateType, nullable = false),
  StructField("col_1", IntegerType, nullable = false)
))

// Create RDD of Rows
val rowsRDD = spark.sparkContext.parallelize(data.map { case (id, date, col1) => Row(id, java.sql.Date.valueOf(date), col1) })

// Create DataFrame
var df = spark.createDataFrame(rowsRDD, schema)

// Define operations on your data
val windowSpec = Window.partitionBy("Identifier").orderBy("Date")

df = df.withColumn("lag_1", F.lag("col_1", 1, 0).over(windowSpec))
df = df.withColumn(
  "condition_1",
  F.when(F.col("col_1") === 1 || F.col("lag_1") === 1, 1).otherwise(0)
)

df = df.orderBy("Date", "Identifier")

// Perform an action only once (at the end of your program) to collect results
val result = df.collect()  // Or do df.show() to print out the dataframe

我过于简化了,但作为一个经验法则,当你调用Spark DataFrame的方法时(例如:.orderBy())或使用Spark库中的内容(例如,窗口函数,如lag),您正在使用Spark作为分布式数据处理工具(因为它应该被使用)。在所有数据操作完成后,您可以使用collect()show()收集结果,并将最终结果发送给驱动程序。与在循环中调用collect()以将中间结果获取到数组中(在您的代码片段中为Flag_list),然后在驱动程序上迭代它以进行其他处理相反,这通常是一种不好的做法。

相关问题