pyspark 用胶水连接localstack s3

igsr9ssn  于 2023-04-19  发布在  Spark
关注(0)|答案(1)|浏览(140)

我正在为本地开发创建胶水,遵循提到的here概念。我使用localstack沿着glue spark using Jupyter lab。正如您在屏幕下方看到的那样,从我的spark jupyter实验室,我可以连接到localstack s3,它的bucket和contents。
docker run -it -v ${PWD}/local_path_to_workspace/:/home/glue_user/workspace/jupyter_workspace/ -e AWS_ACCESS_KEY_ID=dummyaccess -e AWS_SECRET_ACCESS_KEY=dummysecret -e AWS_REGION=eu-west-1 -e DISABLE_SSL=true -e ENDPOINT_URL="http://localstack-glue:4566" --rm -p 4040:4040 -p 18080:18080 -p 8998:8998 -p 8888:8888 --name glue_jupyter_lab amazon/aws-glue-libs:glue_libs_3.0.0_image_01 /home/glue_user/jupyter/jupyter_start.sh启动jupyter实验室。
但是,如果我使用以下命令访问jupyter lab中localstack s3 bucket:

from pyspark import SparkContext
from awsglue.context import GlueContext
glueContext = GlueContext(SparkContext.getOrCreate()) 
inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://glue-localstack-bucket-person/persons.json"]}, format = "json")

我收到拒绝访问错误。

An error occurred while calling o73.getDynamicFrame.
: java.nio.file.AccessDeniedException: s3://glue-localstack-bucket-person/persons.json: getFileStatus on s3://glue-localstack-bucket-person/persons.json: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: WGKYX9DAMXQF1P20; S3 Extended Request ID: Kvpk7Ri77fkrtmzeTCZv+VGoD1dfHUgbXdihVfXxTWlXLIb/vfU+y11jZPiFMGol2F+6sXqGUmw=; Proxy: null), S3 Extended Request ID: Kvpk7Ri77fkrtmzeTCZv+VGoD1dfHUgbXdihVfXxTWlXLIb/vfU+y11jZPiFMGol2F+6sXqGUmw=:403 Forbidden

因此,我的问题是如何更改会话配置,以便使用虚拟访问密钥和密钥引用本地堆栈端点?

qxsslcnc

qxsslcnc1#

这里的突破性时刻是发现ENDPOINT_URL环境变量对GlueContext类内部的机制没有任何影响。
也就是说将ENDPOINT_URL更改为http://localstack-glue:4566并不能阻止create_dynamic_frame_from_options方法尝试访问公共Internet上名为glue-localstack-bucket-person的存储桶。我相信您看到的403是因为您正在尝试访问真实S3网络上名为glue-localstack-bucket-person的真实的存储桶-您可能没有访问权限。即使你拥有bucket并且可以访问,你的AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY的虚拟值也会把你锁在外面。
需要修改的参数是fs.s3a.endpoint,位于hadoop配置的深处。您可以使用类似于此的helper函数为本地Glue设置创建正确的配置:

def create_pyspark_session_to_localstack():
    print('creating pyspark session')
    sparksession = (SparkSession.builder
                    .master('local[2]')
                    .appName('glue-connect-to-localstack')
                    .enableHiveSupport()
                    .getOrCreate())

    hadoop_conf = sparksession.sparkContext._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hadoop_conf.set("fs.s3a.path.style.access", "true")
    hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
    hadoop_conf.set("com.amazonaws.services.s3a.enableV4", "true")
    hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
    hadoop_conf.set("fs.s3a.access.key", "mock")
    hadoop_conf.set("fs.s3a.secret.key", "mock")
    hadoop_conf.set("fs.s3a.session.token", "mock")

    # set fs.s3a.endpoint here!
    hadoop_conf.set("fs.s3a.endpoint", "http://localstack-glue:4566")

    return sparksession

这个函数取代了对SparkContext.getOrCreate()的调用。你可以这样使用它:

from pyspark import SparkContext
from adaptors.localstack import create_pyspark_session_to_localstack
from awsglue.context import GlueContext

glueContext = GlueContext(create_pyspark_session_to_localstack()) 

inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://glue-localstack-bucket-person/persons.json"]}, format = "json")

应该可以了
在LinkedIn上的全部功劳归于Raj:https://www.linkedin.com/pulse/development-testing-etl-pipelines-aws-locally-kamal-k/

相关问题