Spark“Run SQL on files directly”失败

7nbnzgx9  于 2023-06-24  发布在  Apache
关注(0)|答案(2)|浏览(133)

Spark文档建议我可以直接在PySpark中的文件上运行SQL,语法如下:

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

我有一些Delta格式的数据放在AWS S3存储桶中(好吧,目前LocalStack中是一个假的S3存储桶)。我正在尝试使用这种方法查询它。bucket看起来像这样:

$ awslocal s3 ls s3://data-lake/data/
                       PRE _delta_log/
2023-06-07 16:55:33        903 part-00000-923a2eea-e2fe-468e-b2b9-a85206858ddb-c000.snappy.parquet
2023-06-07 16:55:33        914 part-00001-a6ceac4f-d7ea-44e9-bce9-d7fe0c103e35-c000.snappy.parquet

虽然下面的方法很好:

df = spark.read.format("delta").load("s3://data-lake/data")

我尝试使用SQL查询它的语法如下失败:

df = spark.sql("select * from delta.`s3://data-lake/data`")

(我也尝试过直接指向文件的路径,而不仅仅是包含目录,没有区别)。
数据是通过以下简单的Scala Spark代码编写的:

val ds = Seq(
  Foo("a", Bar(1, 2)),
  // etc
).toDS()

ds
  .write
  .format("delta")
  .mode(SaveMode.Overwrite)
  .save("s3://data-lake/data")

当我尝试查询这个时,我得到的错误如下,为了简洁起见编辑了一点:

Py4JError                                 Traceback (most recent call last)
/tmp/ipykernel_178/100492152.py in <module>

---> 11 df = spark.sql("select * from delta.`s3://data-lake/data`")

<snip>

py4j.Py4JException: Method sql([class java.lang.String, class java.util.HashMap]) does not exist

感觉像是管道中的一个奇怪的事情,而不是我的语法错误(尤其是因为如果我在Scala中尝试相同的方法,它可以工作)。

  • 简体中文
  • PySpark 3.4.0
  • 从Jupyter notebook对基于emr-serverless/spark/emr-6.10.0:latest镜像的LocalStack容器运行。
mnemlml8

mnemlml81#

对我来说,这看起来像是PySpark和底层Spark版本之间的不匹配。错误是py4j无法调用SparkSession::sql()方法,该方法接受参数化查询和参数名与其值的Map:

py4j.Py4JException: Method sql([class java.lang.String, class java.util.HashMap]) does not exist
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

这个特殊的重载是在Spark 3.4.0中添加的,PySpark 3.4.0总是使用它,无论查询是否参数化:

...
try:
   litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
   return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
...

确保在notebook环境中正确配置了Spark的主目录,并且PySpark没有使用旧的Spark版本。
事实上,检查emr-6.10.0中包含的the software版本证明了我的理论是正确的:

将PySpark降级到3.3.1版本。

alen0pnh

alen0pnh2#

看起来你弄错了文件。
它说你可以直接从文件读取。
DeltaLake是一个开源文件协议,它将数据存储在Parquet文件中
我建议尝试:

df = spark.sql("select * from parquet.`s3://data-lake/data/part-00000-923a2eea-e2fe-468e-b2b9-a85206858ddb-c000.snappy.parquet`")

但是由于delta文件夹不是文件格式,我不明白为什么select * from delta会工作。
df = spark.read.format("delta").load("s3://data-lake/data")之所以可以工作,是因为你可以从文件夹中加载

相关问题