我正在用python对spark2.4.6进行实验,以满足我们的一个需求。
Sample data
device1, timestamp, parameter1, value
device1, timestamp, parameter2, value
device1, timestamp, parameter3, value
device1, timestamp, parameter4, value
device2, timestamp, parameter1, value
device2, timestamp, parameter2, value
device2, timestamp, parameter3, value
device2, timestamp, parameter4, value
有了这些数据,我想对给定的时间戳使用来自一个设备的所有参数执行自定义聚合。所以我使用devicename和timestamp进行分组,然后使用udf进行聚合。我尝试了以下两种模式。
# Create spark session
spark = SparkSession.builder.appName("paramAggregation").getOrCreate()
# Defince schema and load data
param_schema = [StructField('deviceName', StringType()),
StructField('timeStamp', TimestampType()),
StructField('parameterName', StringType()),
StructField('paramValue', IntegerType()]
df_struct = StructType(fields=param_schema)
df = spark.read.csv("<path>/paramdata.csv", df_struct)
在第一种方法中,我使用分组聚合和pandas
# @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def pandaUDAFAggregation(group_input):
..
..
return output_frame
# Perform aggregation
start = time.time()
df.groupBy("deviceName", "timeStamp").applyInPandas(pandaUDAFAggregation, df.schema).show()
diff = time.time() - start
print("Time taken - ", diff)
在第二种方法中,我尝试了以下方法:
def dfUDFAggregation(paramNames, values):
..
..
return aggValue
dfUDFAggregation = udf(dfUDFAggregation, StringType())
# Perform aggregation
start = time.time()
df.groupBy("deviceName", "timeStamp").agg(dfUDFAggregation(collect_list("parameterName"), collect_list("paramValue"))).show()
diff = time.time() - start
print("Time taken - ", diff)
对于一个350条记录的数据集,第一种方法大约需要一分钟,第二种方法大约需要6分钟。请注意,它不是需要时间的聚合函数。即使该函数保持为空并带有一些硬编码的返回,也需要花费大量的时间。对聚合函数的后续调用之间的间隔需要很多时间。
对于相同的代码,如果我使用内置聚合函数,它的速度非常快而且没有问题。
对于用户定义的聚合函数,有什么需要做的不同吗?有什么建议吗?
暂无答案!
目前还没有任何答案,快来回答吧!