Apache Spark 在Palantir Foundry的df.widthColumn内的用户定义函数中使用累加器值后,访问累加器值

xqnpmsa8  于 2023-02-16  发布在  Apache
关注(0)|答案(1)|浏览(193)

我正在尝试使用Palantir Foundry中的自定义累加器来聚合用户定义函数中的数据,该函数应用于语句df.withColumn(...)中 Dataframe 的每一行。
从生成的 Dataframe 中,我看到累加器值的增量按预期发生。但是,脚本中累加器变量本身的值在执行过程中不会更改。我看到脚本中累加器变量的Python ID与用户定义函数中累加器的Python ID不同。但这可能是预期的...
我如何访问累加器值,在执行后,可以从调用脚本内的结果 Dataframe 列中观察累加器值的增量,因为这是我正在寻找的信息?

from transforms.api import transform_df, Input, Output
import numpy as np
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.functions import udf, struct

global accum

@transform_df(
    Output("ri.foundry.main.dataset.xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"),
)
def compute(ctx):

    from pyspark.sql.types import StructType, StringType, IntegerType,  StructField

    data2 = [("James","","Smith","36636","M",3000),
        ("Michael","Rose","","40288","M",4000),
        ("Robert","","Williams","42114","M",4000),
        ("Maria","Anne","Jones","39192","F",4000),
        ("Jen","Mary","Brown","","F",-1)
    ]

    schema = StructType([ \
        StructField("firstname",StringType(),True), \
        StructField("middlename",StringType(),True), \
        StructField("lastname",StringType(),True), \
        StructField("id", StringType(), True), \
        StructField("gender", StringType(), True), \
        StructField("salary", IntegerType(), True) \
    ])

    df = ctx.spark_session.createDataFrame(data=data2, schema=schema)

    ####################################

    class AccumulatorNumpyArray(AccumulatorParam):
        def zero(self, zero: np.ndarray):
            return zero

        def addInPlace(self, v1, v2):
            return v1 + v2

    # from pyspark.context import SparkContext
    # sc = SparkContext.getOrCreate()
    sc = ctx.spark_session.sparkContext

    shape = 3

    global accum
    accum = sc.accumulator(
            np.zeros(shape, dtype=np.int64),
            AccumulatorNumpyArray(),
            )

    def func(row):
        global accum
        accum += np.ones(shape)
        return str(accum) + '_' + str(id(accum))

    user_defined_function = udf(func, StringType())

    new = df.withColumn("processed", user_defined_function(struct([df[col] for col in df.columns])))
    new.show(2)

    print(accum)

    return df

导致

+---------+----------+--------+-----+------+------+--------------------+
|firstname|middlename|lastname|   id|gender|salary|           processed|
+---------+----------+--------+-----+------+------+--------------------+
|    James|          |   Smith|36636|     M|  3000|[1. 1. 1.]_140388...|
|  Michael|      Rose|        |40288|     M|  4000|[2. 2. 2.]_140388...|
+---------+----------+--------+-----+------+------+--------------------+
only showing top 2 rows

以及

> accum
 Accumulator<id=0, value=[0 0 0]>
> id(accum)
 140574405092256

如果铸造锅炉钢板被拆除,导致

import numpy as np
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

spark = (
    SparkSession.builder.appName("Python Spark SQL basic example")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)
# ctx = spark.sparkContext.getOrCreate()

data2 = [
    ("James", "", "Smith", "36636", "M", 3000),
    ("Michael", "Rose", "", "40288", "M", 4000),
    ("Robert", "", "Williams", "42114", "M", 4000),
    ("Maria", "Anne", "Jones", "39192", "F", 4000),
    ("Jen", "Mary", "Brown", "", "F", -1),
]

schema = StructType(
    [
        StructField("firstname", StringType(), True),
        StructField("middlename", StringType(), True),
        StructField("lastname", StringType(), True),
        StructField("id", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("salary", IntegerType(), True),
    ]
)

# df = ctx.spark_session.createDataFrame(data=data2, schema=schema)
df = spark.createDataFrame(data=data2, schema=schema)

####################################

class AccumulatorNumpyArray(AccumulatorParam):
    def zero(self, zero: np.ndarray):
        return zero

    def addInPlace(self, v1, v2):
        return v1 + v2

sc = SparkContext.getOrCreate()

shape = 3

global accum
accum = sc.accumulator(
    np.zeros(shape, dtype=np.int64),
    AccumulatorNumpyArray(),
)

def func(row):
    global accum
    accum += np.ones(shape)
    return str(accum) + "_" + str(id(accum))

user_defined_function = udf(func, StringType())

new = df.withColumn(
    "processed", user_defined_function(struct([df[col] for col in df.columns]))
)
new.show(2, False)

print(id(accum))
print(accum)

在Ubuntu上使用pyspark 3.3.1版的常规Python环境中获得的输出符合预期,并且

+---------+----------+--------+-----+------+------+--------------------------+
|firstname|middlename|lastname|id   |gender|salary|processed                 |
+---------+----------+--------+-----+------+------+--------------------------+
|James    |          |Smith   |36636|M     |3000  |[1. 1. 1.]_139642682452576|
|Michael  |Rose      |        |40288|M     |4000  |[1. 1. 1.]_139642682450224|
+---------+----------+--------+-----+------+------+--------------------------+
only showing top 2 rows

140166944013424
[3. 3. 3.]
vtwuwzda

vtwuwzda1#

在转换之外运行的代码与转换之内的代码运行在不同的环境中。当你提交时,你将运行你的检查,它运行转换之外的代码来生成jobspec,从技术上讲,这是你的可执行转换。你可以在检查通过后的数据集的“细节”中找到这些。
然后,转换中的逻辑被分离,并在每次构建时独立运行,在转换外部定义的global accum永远不会运行,并且在计算内部的代码运行时不存在。

global accum <-- runs in checks

@transform_df(
    Output("ri.foundry.main.dataset.c0d4fc0c-bb1d-4c7b-86ce-a13ec6666490"),
)
def compute(ctx):
    bla bla some logic <-- runs during build

在第二个代码示例中,你所做的打印发生在df处理之后,因为你要求spark使用new.show(2, false)进行计算;而在第一个示例中,你所做的打印发生在df处理之前,因为计算只会发生在你的return df之后。
如果你想在计算完df后打印,你可以用@transform(...代替@transform_df(...,并在写好 Dataframe 内容后打印。

@transform(
    output=Output("ri.foundry.main.dataset.c0d4fc0c-bb1d-4c7b-86ce-a13ec6666490"),
)
def compute(ctx, output):
    df = ... some logic ...

    output.write_dataframe(df) # please check the function name I think it was write_dataframe, but may be wrong
    print accum

相关问题