Spark version:3.00
scala:2.12
Cassandra::3.11.4
spark-cassandra-connector_2.12-3.0.0-alpha2.jar
我没有使用dse。下面是将Dataframe写入cassandra数据库的测试代码。
spark = SparkSession \
.builder \
.config("spark.jars","spark-streaming-kafka-0-10_2.12-3.0.0.jar,spark-sql-kafka-0-10_2.12-3.0.0.jar,kafka-clients-2.5.0.jar,commons-pool2-2.8.0.jar,spark-token-provider-kafka-0-10_2.12-3.0.0.jar,**spark-cassandra-connector_2.12-3.0.0-alpha2.jar**") \
.config("spark.cassandra.connection.host", "127.0.0.1")\
.config('spark.cassandra.output.consistency.level', 'ONE')\
.appName("StructuredNetworkWordCount") \
.getOrCreate()
streamingInputDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.56.1:9092") \
.option("subscribe", "def") \
.load()
## Dataset operations
def write_to_cassandra(streaming_df,E):
streaming_df\
.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="a", keyspace="abc") \
.save()
q1 =sites_flat.writeStream \
.outputMode('update') \
.foreachBatch(write_to_cassandra) \
.start()
q1.awaitTermination()
我可以对dataframe做一些操作并将其打印到控制台,但我无法保存它,甚至无法从我的cassandra数据库中读取它。我得到的错误是:
File "C:\opt\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o70.load.
: java.lang.NoClassDefFoundError: com/datastax/spark/connector/TableRef
at org.apache.spark.sql.cassandra.DefaultSource$.TableRefAndOptions(DefaultSource.scala:142)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:203)
我尝试过其他Cassandra连接器版本(2.5),但得到相同的错误,请帮助!!!
2条答案
按热度按时间c9qzyr3d1#
问题是你用的是
spark.jars
只在类路径中包含提供的jar的选项。但是TableRef
案例类在spark-cassandra-connector-driver
依赖于的包spark-cassandra-connector
. 要解决这个问题,最好先启动pyspark
或者spark-submit
与--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-alpha2
(kafka支持也是如此)-在这种情况下,spark将获取所有必要的依赖项并将它们放入类路径。p、 s.与
alpha2
在获取某些依赖项时可能会遇到问题,例如,ffi
,groovy
这是一个已知的bug(主要是在spark中):sparkc-599,它已经被修复了,我们希望能很快得到beta-drop。p、 另外,对于spark结构化流媒体写给cassandra的内容,不要使用
foreachbatch
,仅用作普通数据接收器:8fq7wneg2#
我遇到了同样的问题,试试看:
版本兼容性被认为是原因