我使用jupyter/pyspark-notebook docker镜像来开发一个spark脚本。我的dockerfile看起来像这样:
FROM jupyter/pyspark-notebook
USER root
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt && rm requirements.txt
# this is a default user and the image is configured to use it
ARG NB_USER=jovyan
ARG NB_UID=1000
ARG NB_GID=100
ENV USER ${NB_USER}
ENV HOME /home/${NB_USER}
RUN groupadd -f ${USER} && \
chown -R ${USER}:${USER} ${HOME}
USER ${NB_USER}
RUN export PACKAGES="io.delta:delta-core_2.12:1.0.0"
RUN export PYSPARK_SUBMIT_ARGS="--packages ${PACKAGES} pyspark-shell"
我的requirements.txt看起来像这样:
delta-spark==2.1.0
deltalake==0.10.1
jupyterlab==4.0.6
pandas==2.1.0
pyspark==3.3.3
我通过docker compose构建并运行镜像,然后尝试在notebook中运行:
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("LocalDelta") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
并得到以下错误:
AttributeError Traceback (most recent call last)
Cell In[2], line 2
1 import pyspark
----> 2 from delta import *
4 builder = pyspark.sql.SparkSession.builder.appName("LocalDelta") \
5 .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
6 .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
8 spark = configure_spark_with_delta_pip(builder).getOrCreate()
File /opt/conda/lib/python3.11/site-packages/delta/__init__.py:17
1 #
2 # Copyright (2021) The Delta Lake Project Authors.
3 #
(...)
14 # limitations under the License.
15 #
---> 17 from delta.tables import DeltaTable
18 from delta.pip_utils import configure_spark_with_delta_pip
20 __all__ = ['DeltaTable', 'configure_spark_with_delta_pip']
File /opt/conda/lib/python3.11/site-packages/delta/tables.py:21
1 #
2 # Copyright (2021) The Delta Lake Project Authors.
3 #
(...)
14 # limitations under the License.
15 #
17 from typing import (
18 TYPE_CHECKING, cast, overload, Any, Iterable, Optional, Union, NoReturn, List, Tuple
19 )
---> 21 import delta.exceptions # noqa: F401; pylint: disable=unused-variable
22 from delta._typing import (
23 ColumnMapping, OptionalColumnMapping, ExpressionOrColumn, OptionalExpressionOrColumn
24 )
26 from pyspark import since
File /opt/conda/lib/python3.11/site-packages/delta/exceptions.py:166
162 utils.convert_exception = convert_delta_exception
165 if not _delta_exception_patched:
--> 166 _patch_convert_exception()
167 _delta_exception_patched = True
File /opt/conda/lib/python3.11/site-packages/delta/exceptions.py:154, in _patch_convert_exception()
149 def _patch_convert_exception() -> None:
150 """
151 Patch PySpark's exception convert method to convert Delta's Scala concurrent exceptions to the
152 corresponding Python exceptions.
153 """
--> 154 original_convert_sql_exception = utils.convert_exception
156 def convert_delta_exception(e: "JavaObject") -> CapturedException:
157 delta_exception = _convert_delta_exception(e)
AttributeError: module 'pyspark.sql.utils' has no attribute 'convert_exception'
似乎pyspark和delta版本之间存在不兼容性,但我还没有找到任何关于堆栈溢出或其他任何地方的东西来为我指明正确的方向。我的代码基于这个例子:https://github.com/handreassa/delta-docker/tree/main。
如果你能帮忙的话,我将不胜感激。
2条答案
按热度按时间epfja78i1#
正如@boyangeor所提到的,需要将导出包与spark版本相匹配。此外,Docker镜像最近升级到了spark 3.5版本,目前delta还不支持(根据https://docs.delta.io/latest/releases.html)。由于这一变化,我使用这个标签来获得正确的一个(jupyter/pyspark-notebook:spark-3.4.1)。
我是https://github.com/handreassa/delta-docker repo的所有者,所以我已经做了所有需要的更改,让它恢复工作(https://github.com/handreassa/delta-docker/commit/f0ef9c387a20565ea75d6a846ca354b2052709f6)。
在图片中测试的代码供参考:x1c 0d1x
ukqbszuj2#
您的
delta-spark==2.1.0
版本必须与通过--packages
添加的jar版本相匹配。因此设置:RUN export PACKAGES="io.delta:delta-core_2.12:2.1.0"