Spark文档建议我可以直接在PySpark中的文件上运行SQL,语法如下:
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
我有一些Delta格式的数据放在AWS S3存储桶中(好吧,目前LocalStack中是一个假的S3存储桶)。我正在尝试使用这种方法查询它。bucket看起来像这样:
$ awslocal s3 ls s3://data-lake/data/
PRE _delta_log/
2023-06-07 16:55:33 903 part-00000-923a2eea-e2fe-468e-b2b9-a85206858ddb-c000.snappy.parquet
2023-06-07 16:55:33 914 part-00001-a6ceac4f-d7ea-44e9-bce9-d7fe0c103e35-c000.snappy.parquet
虽然下面的方法很好:
df = spark.read.format("delta").load("s3://data-lake/data")
我尝试使用SQL查询它的语法如下失败:
df = spark.sql("select * from delta.`s3://data-lake/data`")
(我也尝试过直接指向文件的路径,而不仅仅是包含目录,没有区别)。
数据是通过以下简单的Scala Spark代码编写的:
val ds = Seq(
Foo("a", Bar(1, 2)),
// etc
).toDS()
ds
.write
.format("delta")
.mode(SaveMode.Overwrite)
.save("s3://data-lake/data")
当我尝试查询这个时,我得到的错误如下,为了简洁起见编辑了一点:
Py4JError Traceback (most recent call last)
/tmp/ipykernel_178/100492152.py in <module>
---> 11 df = spark.sql("select * from delta.`s3://data-lake/data`")
<snip>
py4j.Py4JException: Method sql([class java.lang.String, class java.util.HashMap]) does not exist
感觉像是管道中的一个奇怪的事情,而不是我的语法错误(尤其是因为如果我在Scala中尝试相同的方法,它可以工作)。
- 简体中文
- PySpark 3.4.0
- 从Jupyter notebook对基于
emr-serverless/spark/emr-6.10.0:latest
镜像的LocalStack容器运行。
2条答案
按热度按时间mnemlml81#
对我来说,这看起来像是PySpark和底层Spark版本之间的不匹配。错误是py4j无法调用
SparkSession::sql()
方法,该方法接受参数化查询和参数名与其值的Map:这个特殊的重载是在Spark 3.4.0中添加的,PySpark 3.4.0总是使用它,无论查询是否参数化:
确保在notebook环境中正确配置了Spark的主目录,并且PySpark没有使用旧的Spark版本。
事实上,检查
emr-6.10.0
中包含的the software版本证明了我的理论是正确的:将PySpark降级到3.3.1版本。
alen0pnh2#
看起来你弄错了文件。
它说你可以直接从文件读取。
DeltaLake是一个开源文件协议,它将数据存储在Parquet文件中
我建议尝试:
但是由于delta文件夹不是文件格式,我不明白为什么
select * from delta
会工作。df = spark.read.format("delta").load("s3://data-lake/data")
之所以可以工作,是因为你可以从文件夹中加载。