我应该使用pyspark和delta版本的哪种组合?

7cwmlq89  于 2023-10-15  发布在  Spark
关注(0)|答案(2)|浏览(106)

我使用jupyter/pyspark-notebook docker镜像来开发一个spark脚本。我的dockerfile看起来像这样:

FROM jupyter/pyspark-notebook

USER root
COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt && rm requirements.txt

# this is a default user and the image is configured to use it
ARG NB_USER=jovyan
ARG NB_UID=1000
ARG NB_GID=100

ENV USER ${NB_USER}
ENV HOME /home/${NB_USER}
RUN groupadd -f ${USER} && \
    chown -R ${USER}:${USER} ${HOME}

USER ${NB_USER}

RUN export PACKAGES="io.delta:delta-core_2.12:1.0.0"
RUN export PYSPARK_SUBMIT_ARGS="--packages ${PACKAGES} pyspark-shell"

我的requirements.txt看起来像这样:

delta-spark==2.1.0
deltalake==0.10.1
jupyterlab==4.0.6
pandas==2.1.0
pyspark==3.3.3

我通过docker compose构建并运行镜像,然后尝试在notebook中运行:

import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("LocalDelta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

并得到以下错误:

AttributeError                            Traceback (most recent call last)
Cell In[2], line 2
      1 import pyspark
----> 2 from delta import *
      4 builder = pyspark.sql.SparkSession.builder.appName("LocalDelta") \
      5     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
      6     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
      8 spark = configure_spark_with_delta_pip(builder).getOrCreate()

File /opt/conda/lib/python3.11/site-packages/delta/__init__.py:17
      1 #
      2 # Copyright (2021) The Delta Lake Project Authors.
      3 #
   (...)
     14 # limitations under the License.
     15 #
---> 17 from delta.tables import DeltaTable
     18 from delta.pip_utils import configure_spark_with_delta_pip
     20 __all__ = ['DeltaTable', 'configure_spark_with_delta_pip']

File /opt/conda/lib/python3.11/site-packages/delta/tables.py:21
      1 #
      2 # Copyright (2021) The Delta Lake Project Authors.
      3 #
   (...)
     14 # limitations under the License.
     15 #
     17 from typing import (
     18     TYPE_CHECKING, cast, overload, Any, Iterable, Optional, Union, NoReturn, List, Tuple
     19 )
---> 21 import delta.exceptions  # noqa: F401; pylint: disable=unused-variable
     22 from delta._typing import (
     23     ColumnMapping, OptionalColumnMapping, ExpressionOrColumn, OptionalExpressionOrColumn
     24 )
     26 from pyspark import since

File /opt/conda/lib/python3.11/site-packages/delta/exceptions.py:166
    162     utils.convert_exception = convert_delta_exception
    165 if not _delta_exception_patched:
--> 166     _patch_convert_exception()
    167     _delta_exception_patched = True

File /opt/conda/lib/python3.11/site-packages/delta/exceptions.py:154, in _patch_convert_exception()
    149 def _patch_convert_exception() -> None:
    150     """
    151     Patch PySpark's exception convert method to convert Delta's Scala concurrent exceptions to the
    152     corresponding Python exceptions.
    153     """
--> 154     original_convert_sql_exception = utils.convert_exception
    156     def convert_delta_exception(e: "JavaObject") -> CapturedException:
    157         delta_exception = _convert_delta_exception(e)

AttributeError: module 'pyspark.sql.utils' has no attribute 'convert_exception'

似乎pyspark和delta版本之间存在不兼容性,但我还没有找到任何关于堆栈溢出或其他任何地方的东西来为我指明正确的方向。我的代码基于这个例子:https://github.com/handreassa/delta-docker/tree/main
如果你能帮忙的话,我将不胜感激。

epfja78i

epfja78i1#

正如@boyangeor所提到的,需要将导出包与spark版本相匹配。此外,Docker镜像最近升级到了spark 3.5版本,目前delta还不支持(根据https://docs.delta.io/latest/releases.html)。由于这一变化,我使用这个标签来获得正确的一个(jupyter/pyspark-notebook:spark-3.4.1)。
我是https://github.com/handreassa/delta-docker repo的所有者,所以我已经做了所有需要的更改,让它恢复工作(https://github.com/handreassa/delta-docker/commit/f0ef9c387a20565ea75d6a846ca354b2052709f6)。
在图片中测试的代码供参考:x1c 0d1x

ukqbszuj

ukqbszuj2#

您的delta-spark==2.1.0版本必须与通过--packages添加的jar版本相匹配。因此设置:RUN export PACKAGES="io.delta:delta-core_2.12:2.1.0"

相关问题