pyspark 使用Delta Lake表运行Pytest时出错

0pizxfdo  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(156)

我在一家公司的VDI工作,出于安全原因,他们使用自己的工件。目前我正在编写单元测试,以执行从增量表中删除条目的函数的测试。当我开始时,我收到了一个未解析依赖项的错误,因为我的Spark会话被配置为从Maven加载jar文件。现在我的代码看起来像这样:

class TestTransformation(unittest.TestCase):
    @classmethod
    def test_ksu_deletion(self):
        self.spark = SparkSession.builder\
                        .appName('SPARK_DELETION')\
                        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")\
                        .config("spark.jars", "/opt/spark/jars/delta-core_2.12-0.7.0.jar, /opt/spark/jars/hadoop-aws-3.2.0.jar")\
                        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
                        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
                        .getOrCreate()
        os.environ["KSU_DELETION_OBJECT"]="UNITTEST/"
        deltatable = DeltaTable.forPath(self.spark, "/projects/some/path/snappy.parquet")
        deltatable.delete(col("DATE") < get_current()

但是,我收到错误消息:

E     py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
E     : java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V

你知道这是什么原因造成的吗?我假设这与我配置spark.sql.extions和/或spark.sql.catalog的方式有关,但老实说,我是Spark的新手。我非常感谢任何提示。
提前多谢了!
编辑:我们使用的是Spark 3.0.2(Scala 2.12.10)。根据https://docs.delta.io/latest/releases.html,这应该是兼容的。除了SparkSession之外,我将后续代码精简为

df.spark.read.parquet(Path/to/file.snappy.parquet)

现在我收到错误消息

java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class

正如我所说,我是相当新的(Py)Spark,所以请不要犹豫,提到你认为完全显而易见的事情。
编辑2:在运行代码之前,我检查了Shell中导出的Python路径,我可以看到以下内容:x1c 0d1x这会导致任何问题吗?我不明白为什么我在pipenv中运行代码时没有得到这个错误(使用spark-submit)

hi3rlvi2

hi3rlvi21#

看起来您使用的是Delta lake库的不兼容版本。0.7.0是Spark 3.0的版本,但您使用的是另一个版本--或者更低,或者更高。请查阅Delta releases page以找到Delta版本和所需Spark版本之间的Map。
如果你使用的是Spark 3.1或3.2,可以考虑使用delta-spark Python包,它会安装所有必要的依赖项,所以你只需要导入DeltaTable类。
更新:是的,发生这种情况是因为版本冲突--你需要删除delta-sparkpyspark Python包,并显式安装pyspark==3.0.2
另外,看看pytest-spark包,它可以简化所有测试的配置规范。你可以找到它的例子+ Delta here

相关问题