pyspark 在python中安装delta模块的正确方法是什么?

k2arahey  于 2023-08-03  发布在  Spark
关注(0)|答案(6)|浏览(160)

在python中安装delta模块的正确方法是什么??
example中,他们导入模块
第一个月
但我没有找到正确的方法来安装模块在我的虚拟环境
目前我正在使用这个spark参数-
"spark.jars.packages": "io.delta:delta-core_2.11:0.5.0"

xlpyo6sf

xlpyo6sf1#

由于正确答案隐藏在已接受解决方案的注解中,因此我想在这里添加它。
您需要使用一些额外的设置创建您的spark上下文,然后您可以导入delta:

spark_session = SparkSession.builder \
    .master("local") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

from delta.tables import *

字符串
恼人的是,你的IDE当然会对你大喊大叫,因为这个包没有安装,你也将在没有自动完成和类型提示的情况下操作。我相信有一个工作,我会更新,如果我来across它。
软件包本身在他们的github here上,自述文件建议你可以pip install,但这不起作用。从理论上讲,你可以克隆它并手动安装。

093gszye

093gszye2#

由于Delta的Python代码存储在jar中并由Spark加载,因此在创建SparkSession/SparkContext之前无法导入delta模块。

yb3bgrhw

yb3bgrhw3#

要使用PySpark在本地运行Delta,您需要遵循官方文档。
这对我来说是有效的,但只在直接执行脚本(python)时<script_file>有效,而不是使用pytestunittest
要解决这个问题,需要添加这个环境变量:

PYSPARK_SUBMIT_ARGS='--packages io.delta:delta-core_2.12:1.0.0 pyspark-shell'

字符串
使用符合您情况的Scala和Delta版本。有了这个环境变量,我可以通过cli运行pytest或unittest,没有任何问题

from unittest import TestCase

from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

class TestClass(TestCase):
    
    builder = SparkSession.builder.appName("MyApp") \
        .master("local[*]")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    
    spark = configure_spark_with_delta_pip(builder).getOrCreate()

    def test_create_delta_table(self):
            self.spark.sql("""CREATE IF NOT EXISTS TABLE <tableName> (
                              <field1> <type1>)
                              USING DELTA""")


函数 configure_spark_with_delta_pip 在builder对象中附加一个配置选项

.config("io.delta:delta-core_<scala_version>:<delta_version>")

hlswsv35

hlswsv354#

以下是如何使用conda安装Delta Lake和PySpark。

  • 确保你已经安装了Java(我使用SDKMAN来管理多个Java版本)
  • 安装Miniconda
  • 选择compatible的Delta Lake和PySpark版本。例如,Delta Lake 1.2与PySpark 3.2兼容。
  • 创建一个带有所需依赖项的YAML文件,这里是来自我创建的delta-examples repoan example
  • 使用类似conda env create envs/mr-delta.yml的命令创建环境
  • 使用conda activate mr-delta激活conda环境
  • an example notebook请注意,它以以下代码开始:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

字符串

a7qyws3x

a7qyws3x5#

如果您在使用Jupyter notebook时遇到问题,请添加以下环境变量

from pyspark.sql import SparkSession
import os
from delta import *

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages  org.apache.spark:spark-avro_2.12:3.4.1,io.delta:delta-core_2.12:2.4.0 pyspark-shell'
# RUN spark-shell --packages org.apache.spark:spark-avro_2.12:3.4.1
# RUN spark-shell --packages io.delta:delta-core_2.12:2.4.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

builder = SparkSession.builder.appName("SampleSpark") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = builder.getOrCreate()

字符串

nszi6y05

nszi6y056#

在我的情况下,问题是我有一个集群运行在一个Databricks运行时低于6.1
https://docs.databricks.com/delta/delta-update.html
Python API在Databricks Runtime 6.1及更高版本中可用。
将Databricks Runtime更改为6.4后,问题消失了。
要做到这一点:点击clusters -> Pick the one you are using -> Edit -> Pick Databricks Runtime 6.1 and above

相关问题