pyspark:py4j.protocol.py4jjavaerror:调用o65.save时出错

r1zhe5dt  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(1311)

我已经编写了一个代码来将csv文件中的数据加载到postgresql表中。代码运行得更早,突然出现了错误 py4j.protocol.Py4JJavaError: An error occurred while calling o65.save. : java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.kafka010.KafkaSourceProvider could not be instantiated 在连接jdbc驱动程序时似乎出现了问题。任何建议都会有帮助,

import pyspark
from pyspark.sql import SparkSession, Row

sc = pyspark.SparkContext('local[*]')
SqlContext = pyspark.SQLContext(sc)
spark = SparkSession(sc)

productsFF = sc.textFile("C:\Hadoop\Data\ProductType.csv")
productsDF = productsFF.map(lambda p: Row(product_type_intrnl_id = int(p.split(",")[0]), 
product_type_code = p.split(",")[1], product_sub_type_code = p.split(",")[3])).toDF()

productsDF.createTempView("product_type_tmp")
product_type_tmp = SqlContext.sql("select * from product_type_tmp") #.show()

SqlContext.sql("show tables").show()

product_type_tmp.write.format('jdbc') \
    .option("url", "jdbc:postgresql://localhost:5432/xxxx") \
    .option("dbtable", "xxxx") \
    .option("user", "xxxx") \
    .option("password", "xxxx") \
    .option("driver", "org.postgresql.Driver").mode("append").save()

下面是详细的错误,

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "C:/Users/Macaulay/PycharmProjects/Spark/SparkSqlFile2Table.py", line 18, in <module>
    product_type_tmp.write.format('jdbc') \
  File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\readwriter.py", line 767, in save
  File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1285, in __call__
  File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 98, in deco
  File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o65.save.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.kafka010.KafkaSourceProvider could not be instantiated
    at java.util.ServiceLoader.fail(Unknown Source)
    at java.util.ServiceLoader.access$100(Unknown Source)
    at java.util.ServiceLoader$LazyIterator.nextService(Unknown Source)
    at java.util.ServiceLoader$LazyIterator.next(Unknown Source)
    at java.util.ServiceLoader$1.next(Unknown Source)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:728)
    at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:832)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.<init>(KafkaSourceProvider.scala:40)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.lang.reflect.Constructor.newInstance(Unknown Source)
    at java.lang.Class.newInstance(Unknown Source)
    ... 31 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging$class
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.net.URLClassLoader$1.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    ... 37 more```
ddrv8njm

ddrv8njm1#

尝试只在程序中导入下面的包。

org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0-preview2

相关问题