我正在尝试使用pyspark从s3读取数据。我的Pypark会话是在livy之上创建的。如果没有livy,我可以从s3读取,但是当我使用livy创建sesion时,我会得到下面的错误,我也不想启动任何hadoop守护进程。
2020-08-08 00:22:01,837 INFO utils.LineBufferedStream: Exception in thread "main" java.net.ConnectException: Call From mon-server/10.128.3.48 to localhost:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
2020-08-08 00:22:01,838 INFO utils.LineBufferedStream: at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
2020-08-08 00:22:01,838 INFO utils.LineBufferedStream: at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
2020-08-08 00:22:01,838 INFO utils.LineBufferedStream: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
2020-08-08 00:22:01,838 INFO utils.LineBufferedStream: at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
2020-08-08 00:22:01,838 INFO utils.LineBufferedStream: at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:831)
2020-08-08 00:22:01,838 INFO utils.LineBufferedStream: at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:755)
2020-08-08 00:22:01,838 INFO utils.LineBufferedStream: at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1515)
2020-08-08 00:22:01,838 INFO utils.LineBufferedStream: at org.apache.hadoop.ipc.Client.call(Client.java:1457)
2020-08-08 00:22:01,847 INFO utils.LineBufferedStream: at org.apache.hadoop.ipc.Client.call(Client.java:1367)
2020-08-08 00:22:01,847 INFO utils.LineBufferedStream: at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
2020-08-08 00:22:01,847 INFO utils.LineBufferedStream: at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
2020-08-08 00:22:01,847 INFO utils.LineBufferedStream: at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
2020-08-08 00:22:01,847 INFO utils.LineBufferedStream: at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:903)
我的代码如下
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName("AWS")\
.config('spark.executor.memory', '2g')\
.config('spark.executor.cores', '2')\
.config('spark.cores.max', '2')\
.config('spark.driver.memory','2g')\
.getOrCreate()
accessKeyId=""
secretAccessKey=""
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", accessKeyId)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secretAccessKey)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3-ap-south-1.amazonaws.com")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.multipart.size", "104857600")
df = spark.read.csv('s3a://xyz/abc.csv', header=True , inferSchema = True)
df.count()
暂无答案!
目前还没有任何答案,快来回答吧!