如何修复“No FileSystem for scheme:gs”在pyspark?

bqucvtff  于 2023-03-22  发布在  Spark
关注(0)|答案(2)|浏览(104)

我尝试将一个json文件从google bucket读入本地spark机器上的pyspark dataframe。代码如下:

import pandas as pd
import numpy as np

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

conf = SparkConf().setAll([('spark.executor.memory', '16g'),
                        ('spark.executor.cores','4'),
                         ('spark.cores.max','4')]).setMaster('local[*]')

spark = (SparkSession.
              builder.
              config(conf=conf).
              getOrCreate())

sc = spark.sparkContext

import glob
import bz2
import json
import pickle

bucket_path = "gs://<SOME_PATH>/"
client = storage.Client(project='<SOME_PROJECT>')
bucket = client.get_bucket ('<SOME_PATH>')
blobs = bucket.list_blobs()

theframes = []

for blob in blobs:
    print(blob.name)        
    testspark = spark.read.json(bucket_path + blob.name).cache()
    theframes.append(testspark)

它从bucket阅读文件很好(我可以从www.example.com看到打印输出blob.name),但然后像这样崩溃:

Traceback (most recent call last):
 File "test_code.py", line 66, in <module>
   testspark = spark.read.json(bucket_path + blob.name).cache()
 File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/pyspark/sql/readwriter.py", line 274, in json
return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
 File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
 File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
 File "/home/anaconda3/envs/py37base/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o51.json.
: java.io.IOException: No FileSystem for scheme: gs

我在stackoverflow上看到过这种类型的错误,但大多数解决方案似乎都是在Scala中,而我有pyspark,和/或涉及到与core-site.xml的混乱,我已经做了没有效果。
我用的是spark 2.4.1和python 3.6.7。
帮助将不胜感激!

5q4ezhmt

5q4ezhmt1#

需要一些配置参数才能将“gs”识别为分布式文件系统。
将此设置用于Google云存储连接器gcs-connector-hadoop2-latest.jar

spark = SparkSession \
        .builder \
        .config("spark.jars", "/path/to/gcs-connector-hadoop2-latest.jar") \
        .getOrCreate()

可以从pyspark设置的其他配置

spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
# This is required if you are using service account and set true, 
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'true')
spark._jsc.hadoopConfiguration().set('google.cloud.auth.service.account.json.keyfile', "/path/to/keyfile")
# Following are required if you are using oAuth
spark._jsc.hadoopConfiguration().set('fs.gs.auth.client.id', 'YOUR_OAUTH_CLIENT_ID')
spark._jsc.hadoopConfiguration().set('fs.gs.auth.client.secret', 'OAUTH_SECRET')

或者,您可以在core-site.xml或spark-defaults. conf中设置这些配置。

Hadoop命令行配置

您还可以使用spark.hadoop前缀配置属性来设置pyspark(或一般为spark-submit),例如

--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
tkclm6bt

tkclm6bt2#

除了Ranga Vure的回答,具体来说,对于spark和hadoop3,你可以这样做:

spark = SparkSession.builder \
    .appName('spark-run-with-gcp-bucket') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
    .getOrCreate()

这会直接从google中提取所需的jar文件。你可以在google的hadoop页面here中找到其他版本,特别是在“下载连接器”下。你可以复制连接器的链接并将它们放在配置中,而不必在本地下载它们。
然后我设置了另一个配置:

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

我可以直接从我的GCP bucket中拉取CSV文件:

df = spark \
    .read \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .csv("gs://<BUCKET>/<FILE.csv>")

注意:我已经在我的机器上设置了以下内容,以便使用gcp进行身份验证:

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "<PATH TO CREDENTIALS WITH PERMISSION TO VIEW BUCKET OBJECT>"

相关问题