不一致的scipiy.find\u peaks结果来自pandas\u udf和pyspark 3.0

8wtpewkr  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(422)

我试着用scipy的find\u peaks在pyspark的pandas\u udf里。一个赤裸裸的例子:

from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import DoubleType 

import pandas as pd
import numpy as np

from scipy.signal import find_peaks

spark = SparkSession.builder.master("yarn") \
.appName("UDF_debug") \
.config("spark.yarn.dist.archives", "hdfs://PATH/TO/MY/USERFOLDER/envs/my_env.zip#MYENV")\
.config("spark.submit.deployMode", "client")\
.config("spark.yarn.queue", "root.dev")\
.enableHiveSupport()\
.getOrCreate()

# Create a sample dataframe and a corresponding pandas data frame, for cross-checking

df = spark.createDataFrame(
    [Row(id=1, c=3),
    Row(id=2,  c=6),
    Row(id=3,  c=2),
    Row(id=4,  c=9),
    Row(id=5,  c=7)])

dfp = df.toPandas()

def peak_finder(C: pd.Series) -> pd.Series:
     # Find peaks (maxima)
    pos_peaks, pos_properties = find_peaks(C)

    # Create an empty series of appropriate length
    r = pd.Series(np.full(len(C), np.nan))

    # Wherever a peak was found ...
    for idx in pos_peaks:
        # ... mark it by noting its height
        r[idx] = C[idx]

    return r

# Peak finding using pyspark's pandas_udf

peak_finder_udf = pandas_udf(peak_finder, returnType=DoubleType())
df = df.withColumn('peak', peak_finder_udf(df.c))
df.show()

# Peak finding directly on a pandas df

dfp["peaks_pandas"] = peak_finder(dfp["c"])
print(dfp)

两张照片的结果如下。首先,使用udf寻找峰值:

+---+---+----+
| id|  c|peak|
+---+---+----+
|  1|  3|null|
|  2|  6|null|
|  3|  2|null|
|  4|  9| 9.0|
|  5|  7|null|
+---+---+----+

其次,仅在边缘节点上使用stock pandas和numpy:

id  c  peaks_pandas
0   1  3           NaN
1   2  6           6.0
2   3  2           NaN
3   4  9           9.0
4   5  7           NaN

id=2的行不一致。
这可以从pyspark文档中理解,说明:
在内部,pyspark将执行一个udf,方法是将列拆分为批,并将每个批的函数作为数据的子集调用,然后将结果连接在一起。
这么小的分裂似乎很奇怪,但也许。。。
问题1:这种不一致的行为是预期的吗?我能避免吗?
编辑:答:是的,这是由于分区。请看下面我的评论。
另一种奇怪的行为可能指向解决方案(但对我来说,这会引发更多的问题)。继续上面的代码:

fname = "debug.parquet"
df.dropna().write.parquet(fname)
dfnew = spark.read.parquet(fname)
dfnew.show()

它产生结果

+---+---+----+
| id|  c|peak|
+---+---+----+
|  4|  9|null|
+---+---+----+

peak不再是应该的=9,而是空值。
问题2:有人能解释保存过程中的数据丢失吗?
康达环境中的相关 Package :


# Name                    Version                   Build  Channel

_libgcc_mutex             0.1                        main    defaults
arrow-cpp                 0.15.1           py38h7cd5009_5    defaults
attrs                     19.3.0                     py_0    defaults
backcall                  0.2.0                      py_0    defaults
blas                      1.0                         mkl    defaults
bleach                    3.1.5                      py_0    defaults
boost-cpp                 1.71.0               h7b6447c_0    defaults
brotli                    1.0.7                he6710b0_0    defaults
brotlipy                  0.7.0           py38h7b6447c_1000    defaults
bzip2                     1.0.8                h7b6447c_0    defaults
c-ares                    1.15.0            h7b6447c_1001    defaults
ca-certificates           2020.6.24                     0    defaults
certifi                   2020.6.20                py38_0    defaults
cffi                      1.14.0           py38he30daa8_1    defaults
chardet                   3.0.4                 py38_1003    defaults
cryptography              2.9.2            py38h1ba5d50_0    defaults
dbus                      1.13.16              hb2f20db_0    defaults
decorator                 4.4.2                      py_0    defaults
defusedxml                0.6.0                      py_0    defaults
double-conversion         3.1.5                he6710b0_1    defaults
entrypoints               0.3                      py38_0    defaults
expat                     2.2.9                he6710b0_2    defaults
fontconfig                2.13.0               h9420a91_0    defaults
freetype                  2.10.2               h5ab3b9f_0    defaults
gflags                    2.2.2                he6710b0_0    defaults
glib                      2.65.0               h3eb4bd4_0    defaults
glog                      0.4.0                he6710b0_0    defaults
grpc-cpp                  1.26.0               hf8bcb03_0    defaults
gst-plugins-base          1.14.0               hbbd80ab_1    defaults
gstreamer                 1.14.0               hb31296c_0    defaults
icu                       58.2                 he6710b0_3    defaults
idna                      2.10                       py_0    defaults
importlib-metadata        1.7.0                    py38_0    defaults
importlib_metadata        1.7.0                         0    defaults
intel-openmp              2020.1                      217    defaults
ipykernel                 5.3.0            py38h5ca1d4c_0    defaults
ipython                   7.16.1           py38h5ca1d4c_0    defaults
ipython_genutils          0.2.0                    py38_0    defaults
ipywidgets                7.5.1                      py_0    defaults
jedi                      0.17.1                   py38_0    defaults
jinja2                    2.11.2                     py_0    defaults
jpeg                      9b                   h024ee3a_2    defaults
json5                     0.9.5                      py_0    defaults
jsonschema                3.2.0                    py38_0    defaults
jupyter                   1.0.0                    py38_7    defaults
jupyter_client            6.1.3                      py_0    defaults
jupyter_console           6.1.0                      py_0    defaults
jupyter_core              4.6.3                    py38_0    defaults
jupyterlab                2.1.5                      py_0    defaults
jupyterlab_server         1.1.5                      py_0    defaults
ld_impl_linux-64          2.33.1               h53a641e_7    defaults
libboost                  1.71.0               h97c9712_0    defaults
libedit                   3.1.20191231         h7b6447c_0    defaults
libevent                  2.1.8                h1ba5d50_0    defaults
libffi                    3.3                  he6710b0_2    defaults
libgcc-ng                 9.1.0                hdf63c60_0    defaults
libgfortran-ng            7.3.0                hdf63c60_0    defaults
libpng                    1.6.37               hbc83047_0    defaults
libprotobuf               3.11.4               hd408876_0    defaults
libsodium                 1.0.18               h7b6447c_0    defaults
libstdcxx-ng              9.1.0                hdf63c60_0    defaults
libuuid                   1.0.3                h1bed415_2    defaults
libxcb                    1.14                 h7b6447c_0    defaults
libxml2                   2.9.10               he19cac6_1    defaults
lz4-c                     1.8.1.2              h14c3975_0    defaults
markupsafe                1.1.1            py38h7b6447c_0    defaults
mistune                   0.8.4           py38h7b6447c_1000    defaults
mkl                       2020.1                      217    defaults
mkl-service               2.3.0            py38he904b0f_0    defaults
mkl_fft                   1.1.0            py38h23d657b_0    defaults
mkl_random                1.1.1            py38h0573a6f_0    defaults
nbconvert                 5.6.1                    py38_0    defaults
nbformat                  5.0.7                      py_0    defaults
ncurses                   6.2                  he6710b0_1    defaults
notebook                  6.0.3                    py38_0    defaults
numpy                     1.18.5           py38ha1c710e_0    defaults
numpy-base                1.18.5           py38hde5b4d6_0    defaults
openssl                   1.1.1g               h7b6447c_0    defaults
packaging                 20.4                       py_0    defaults
pandas                    1.0.5            py38h0573a6f_0    defaults
pandoc                    2.9.2.1                       0    defaults
pandocfilters             1.4.2                    py38_1    defaults
parso                     0.7.0                      py_0    defaults
pcre                      8.44                 he6710b0_0    defaults
pexpect                   4.8.0                    py38_0    defaults
pickleshare               0.7.5                 py38_1000    defaults
pip                       20.1.1                   py38_1    defaults
prometheus_client         0.8.0                      py_0    defaults
prompt-toolkit            3.0.5                      py_0    defaults
prompt_toolkit            3.0.5                         0    defaults
ptyprocess                0.6.0                    py38_0    defaults
py4j                      0.10.9                     py_0    defaults
pyarrow                   0.15.1           py38h0573a6f_0    defaults
pycparser                 2.20                       py_0    defaults
pygments                  2.6.1                      py_0    defaults
pyopenssl                 19.1.0                   py38_0    defaults
pyparsing                 2.4.7                      py_0    defaults
pyqt                      5.9.2            py38h05f1152_4    defaults
pyrsistent                0.16.0           py38h7b6447c_0    defaults
pysocks                   1.7.1                    py38_0    defaults
pyspark                   3.0.0                      py_0    defaults
python                    3.8.3                hcff3b4d_2    defaults
python-dateutil           2.8.1                      py_0    defaults
pytz                      2020.1                     py_0    defaults
pyzmq                     19.0.1           py38he6710b0_1    defaults
qt                        5.9.7                h5867ecd_1    defaults
qtconsole                 4.7.5                      py_0    defaults
qtpy                      1.9.0                      py_0    defaults
re2                       2019.08.01           he6710b0_0    defaults
readline                  8.0                  h7b6447c_0    defaults
requests                  2.24.0                     py_0    defaults
scipy                     1.5.0            py38h0b6359f_0    defaults
send2trash                1.5.0                    py38_0    defaults
setuptools                47.3.1                   py38_0    defaults
sip                       4.19.13          py38he6710b0_0    defaults
six                       1.15.0                     py_0    defaults
snappy                    1.1.8                he6710b0_0    defaults
sqlite                    3.32.3               h62c20be_0    defaults
terminado                 0.8.3                    py38_0    defaults
testpath                  0.4.4                      py_0    defaults
thrift-cpp                0.11.0               h02b749d_3    defaults
tk                        8.6.10               hbc83047_0    defaults
tornado                   6.0.4            py38h7b6447c_1    defaults
traitlets                 4.3.3                    py38_0    defaults
uriparser                 0.9.3                he6710b0_1    defaults
urllib3                   1.25.9                     py_0    defaults
wcwidth                   0.2.5                      py_0    defaults
webencodings              0.5.1                    py38_1    defaults
wheel                     0.34.2                   py38_0    defaults
widgetsnbextension        3.5.1                    py38_0    defaults
xz                        5.2.5                h7b6447c_0    defaults
zeromq                    4.3.2                he6710b0_2    defaults
zipp                      3.1.0                      py_0    defaults
zlib                      1.2.11               h7b6447c_3    defaults
zstd                      1.3.7                h0b5b093_0    defaults

我还尝试了pyspark 2.4.5(与pyarrow 0.8结合使用)。完全相同的结果。

6ie5vjzr

6ie5vjzr1#

问题1:不一致的行为确实是由于分割造成的。
问题2:找到了解决方法:首先转换为rdd,然后立即转换回Dataframe解决了问题(即添加.rdd.todf())。我不清楚原因,可能是背景里发生了一些我不明白的事情。

相关问题