代码如下:
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setAppName("test")
.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
.set("spark.hadoop.fs.s3a.access.key", "minio")
.set("spark.hadoop.fs.s3a.secret.key", "minio123")
.set("spark.hadoop.fs.s3a.path.style.access", "true")
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
;
SparkSession sparkSession = SparkSession.builder()
.master("local")
.config(sparkConf)
.getOrCreate();
System.out.println("********** Reading from minio **********");
Dataset<Row> csv = sparkSession
.read()
.format("csv")
.option("header","true")
.csv("s3a://spark-test/test.csv")
;
System.out.println("********** Writing to minio **********");
csv.write().mode("overwrite").format("delta").save("s3a://spark-test/deltafile");
这是输出和挂起
********** Reading from minio **********
23/05/16 20:07:00 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/05/16 20:07:00 INFO SharedState: Warehouse path is 'file:/C:/<...>/spark-warehouse'.
23/05/16 20:07:02 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/05/16 20:07:02 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
23/05/16 20:07:02 INFO MetricsSystemImpl: s3a-file-system metrics system started
23/05/16 20:07:11 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
如果我等待大约5分钟并停止程序,我会得到这样的输出,没有错误:
23/05/16 20:11:53 INFO SparkContext: Invoking stop() from shutdown hook
23/05/16 20:11:53 INFO SparkUI: Stopped Spark web UI at http://BAH5CD010DT15.resource.ds.bah.com:4041
23/05/16 20:11:53 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/05/16 20:11:53 INFO MemoryStore: MemoryStore cleared
23/05/16 20:11:53 INFO BlockManager: BlockManager stopped
23/05/16 20:11:53 INFO BlockManagerMaster: BlockManagerMaster stopped
23/05/16 20:11:53 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/05/16 20:11:53 INFO SparkContext: Successfully stopped SparkContext
23/05/16 20:11:53 INFO ShutdownHookManager: Shutdown hook called
23/05/16 20:11:53 INFO ShutdownHookManager: Deleting directory C:\Users\611528\AppData\Local\Temp\1\spark-9f08dab0-3643-4cec-9d40-443b5db388be
23/05/16 20:11:53 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system...
23/05/16 20:11:53 INFO MetricsSystemImpl: s3a-file-system metrics system stopped.
23/05/16 20:11:53 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.
但是,如果我在挂起点停止程序仅一分钟,我会得到以下附加消息:
23/05/16 20:13:26 WARN AWSCredentialProviderList: Credentials requested after provider list was closed
23/05/16 20:13:26 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3a://spark-test/test.csv.
java.nio.file.AccessDeniedException: s3a://spark-test/test.csv: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: Credentials requested after provider list was closed
我花了一整天的时间研究这个问题,还没有找到解决办法。它甚至不会进入代码的write()部分。不知什么原因,它挂在了读数()上。任何帮助将不胜感激。
1条答案
按热度按时间dsekswqp1#
终于想通了因为minio是在容器中运行的,所以我必须进行端口转发,这样我的测试才能到达minio。
示例:
kubectl port-forward -n gossamer-windows deployment/minio-deployment 9001:9000
一旦我做了端口转发,我能够阅读和权利的minio在delta格式。