我有一个用例,我们在一个Minio集群中有一些输入数据,我们需要读取和转换这些数据,然后将它们添加到另一个Minio集群中,我们必须使用Spark来完成。我们如何实现同样的目标?
1szpjjfi1#
如果你使用hadoop-aws,你可以简单地使用s3 a://协议读写Minio。你应该能够为每个单独的bucket设置不同的端点,凭证等,使用属性:
spark.hadoop.fs.s3a.bucket.<bucket>.endpoint spark.hadoop.fs.s3a.bucket.<bucket>.aws.credentials.provider spark.hadoop.fs.s3a.bucket.<bucket>.access.key spark.hadoop.fs.s3a.bucket.<bucket>.secret.key spark.hadoop.fs.s3a.bucket.<bucket>.path.style.access
所以,假设你有一个Minio服务器https://minio1.com和bucket dataIn,https://minio2.com和bucket dataOut,你可以设置以下配置(例如,在spark-defaults.conf中,使用spark-submit的--conf参数,或者直接在代码中的SparkConf对象上):
dataIn
dataOut
spark-defaults.conf
spark-submit
--conf
SparkConf
spark.hadoop.fs.s3a.bucket.dataIn.endpoint https://minio1.com spark.hadoop.fs.s3a.bucket.dataIn.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider spark.hadoop.fs.s3a.bucket.dataIn.access.key ACCESS_KEY_1 spark.hadoop.fs.s3a.bucket.dataIn.secret.key SECRET_KEY_1 spark.hadoop.fs.s3a.bucket.dataIn.path.style.access true spark.hadoop.fs.s3a.bucket.dataOut.endpoint https://minio2.com spark.hadoop.fs.s3a.bucket.dataOut.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider spark.hadoop.fs.s3a.bucket.dataOut.access.key ACCESS_KEY_2 spark.hadoop.fs.s3a.bucket.dataOut.secret.key SECRET_KEY_2 spark.hadoop.fs.s3a.bucket.dataOut.path.style.access true
然后,在您的应用程序中,简单地传输数据如下:
val documents = spark.read.parquet("s3a://dataIn/path/to/data") val transformed = documents.select(...) // do your transformations here transformed.write.parquet("s3a://dataOut/path/to/target")
1条答案
按热度按时间1szpjjfi1#
如果你使用hadoop-aws,你可以简单地使用s3 a://协议读写Minio。你应该能够为每个单独的bucket设置不同的端点,凭证等,使用属性:
所以,假设你有一个Minio服务器https://minio1.com和bucket
dataIn
,https://minio2.com和bucketdataOut
,你可以设置以下配置(例如,在spark-defaults.conf
中,使用spark-submit
的--conf
参数,或者直接在代码中的SparkConf
对象上):然后,在您的应用程序中,简单地传输数据如下: