我正在尝试使用Palantir Foundry中的自定义累加器来聚合用户定义函数中的数据,该函数应用于语句df.withColumn(...)
中 Dataframe 的每一行。
从生成的 Dataframe 中,我看到累加器值的增量按预期发生。但是,脚本中累加器变量本身的值在执行过程中不会更改。我看到脚本中累加器变量的Python ID与用户定义函数中累加器的Python ID不同。但这可能是预期的...
我如何访问累加器值,在执行后,可以从调用脚本内的结果 Dataframe 列中观察累加器值的增量,因为这是我正在寻找的信息?
from transforms.api import transform_df, Input, Output
import numpy as np
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.functions import udf, struct
global accum
@transform_df(
Output("ri.foundry.main.dataset.xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"),
)
def compute(ctx):
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
data2 = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = ctx.spark_session.createDataFrame(data=data2, schema=schema)
####################################
class AccumulatorNumpyArray(AccumulatorParam):
def zero(self, zero: np.ndarray):
return zero
def addInPlace(self, v1, v2):
return v1 + v2
# from pyspark.context import SparkContext
# sc = SparkContext.getOrCreate()
sc = ctx.spark_session.sparkContext
shape = 3
global accum
accum = sc.accumulator(
np.zeros(shape, dtype=np.int64),
AccumulatorNumpyArray(),
)
def func(row):
global accum
accum += np.ones(shape)
return str(accum) + '_' + str(id(accum))
user_defined_function = udf(func, StringType())
new = df.withColumn("processed", user_defined_function(struct([df[col] for col in df.columns])))
new.show(2)
print(accum)
return df
导致
+---------+----------+--------+-----+------+------+--------------------+
|firstname|middlename|lastname| id|gender|salary| processed|
+---------+----------+--------+-----+------+------+--------------------+
| James| | Smith|36636| M| 3000|[1. 1. 1.]_140388...|
| Michael| Rose| |40288| M| 4000|[2. 2. 2.]_140388...|
+---------+----------+--------+-----+------+------+--------------------+
only showing top 2 rows
以及
> accum
Accumulator<id=0, value=[0 0 0]>
> id(accum)
140574405092256
如果铸造锅炉钢板被拆除,导致
import numpy as np
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
spark = (
SparkSession.builder.appName("Python Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
# ctx = spark.sparkContext.getOrCreate()
data2 = [
("James", "", "Smith", "36636", "M", 3000),
("Michael", "Rose", "", "40288", "M", 4000),
("Robert", "", "Williams", "42114", "M", 4000),
("Maria", "Anne", "Jones", "39192", "F", 4000),
("Jen", "Mary", "Brown", "", "F", -1),
]
schema = StructType(
[
StructField("firstname", StringType(), True),
StructField("middlename", StringType(), True),
StructField("lastname", StringType(), True),
StructField("id", StringType(), True),
StructField("gender", StringType(), True),
StructField("salary", IntegerType(), True),
]
)
# df = ctx.spark_session.createDataFrame(data=data2, schema=schema)
df = spark.createDataFrame(data=data2, schema=schema)
####################################
class AccumulatorNumpyArray(AccumulatorParam):
def zero(self, zero: np.ndarray):
return zero
def addInPlace(self, v1, v2):
return v1 + v2
sc = SparkContext.getOrCreate()
shape = 3
global accum
accum = sc.accumulator(
np.zeros(shape, dtype=np.int64),
AccumulatorNumpyArray(),
)
def func(row):
global accum
accum += np.ones(shape)
return str(accum) + "_" + str(id(accum))
user_defined_function = udf(func, StringType())
new = df.withColumn(
"processed", user_defined_function(struct([df[col] for col in df.columns]))
)
new.show(2, False)
print(id(accum))
print(accum)
在Ubuntu上使用pyspark 3.3.1版的常规Python环境中获得的输出符合预期,并且
+---------+----------+--------+-----+------+------+--------------------------+
|firstname|middlename|lastname|id |gender|salary|processed |
+---------+----------+--------+-----+------+------+--------------------------+
|James | |Smith |36636|M |3000 |[1. 1. 1.]_139642682452576|
|Michael |Rose | |40288|M |4000 |[1. 1. 1.]_139642682450224|
+---------+----------+--------+-----+------+------+--------------------------+
only showing top 2 rows
140166944013424
[3. 3. 3.]
1条答案
按热度按时间vtwuwzda1#
在转换之外运行的代码与转换之内的代码运行在不同的环境中。当你提交时,你将运行你的检查,它运行转换之外的代码来生成
jobspec
,从技术上讲,这是你的可执行转换。你可以在检查通过后的数据集的“细节”中找到这些。然后,转换中的逻辑被分离,并在每次构建时独立运行,在转换外部定义的
global accum
永远不会运行,并且在计算内部的代码运行时不存在。在第二个代码示例中,你所做的打印发生在
df
处理之后,因为你要求spark使用new.show(2, false)
进行计算;而在第一个示例中,你所做的打印发生在df
处理之前,因为计算只会发生在你的return df
之后。如果你想在计算完df后打印,你可以用
@transform(...
代替@transform_df(...
,并在写好 Dataframe 内容后打印。