我有这个密码:
import os
from pyspark import SparkContext,SparkFiles,SQLContext,SparkFiles
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import col
secure_bundle_file=os.getcwd()+'\\secure-connect-dbtest.zip'
sparkSession =SparkSession.builder.appName('SparkCassandraApp')\
.config('spark.cassandra.connection.config.cloud.path',secure_bundle_file)\
.config('spark.cassandra.auth.username', 'test')\
.config('spark.cassandra.auth.password','testquart')\
.config('spark.dse.continuousPagingEnabled',False)\
.master('local[*]').getOrCreate()
data = sparkSession.read.format("org.apache.spark.sql.cassandra")\
.options(table="tbthesis", keyspace="test").load()
data.count()
我尝试做的是连接到我的数据库并检索我的数据。代码很好地连接到数据库,但一旦到达读取行,它就会说:
Exception has occurred: Py4JJavaError
An error occurred while calling o48.load.
: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra.
Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:674)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:728)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:203)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
有人能帮我吗?
另外,我还想添加一些有关此代码的详细信息:
我想做的是测试spark从我的数据库中读取200万条记录需要多长时间,普通的python cassandra驱动程序在大约1小时内读取了200万条记录(使用simplestatement),所以在这里我想知道使用spark读取这200万条记录需要多长时间。
谢谢
2条答案
按热度按时间jhdbpxl91#
在类路径中没有spark cassandra连接器包,因此它找不到相应的类。
你得开始工作了(
spark-submit
或者pyspark
)与--packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1
.如果您真的只想从python代码中执行,那么您可以尝试添加
.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.11:2.5.1")
创建时SparkSession
,但如果类路径已经示例化,它可能并不总是工作的。p、 斯帕克的表现通常会优于其他公司
SimpleStatement
,即使是在本地模式下,尽管spark在分布式模式下确实非常出色。你真的不应该用SimpleStatement
对于重复执行只在参数上不同的查询,应该使用prepared语句。请阅读《使用datastax驱动程序开发应用程序指南》。datastax还赠送了第三版的cassandra。权威指南-刚出版不久-我建议你读一读。a2mppw5e2#
我的问题解决了。
问题不是java、hadoop或spark,而是连接器的下载过程,但是我不能下载任何东西,因为我的jars缓存文件夹上有东西。
my spark下载外部jars的文件夹是c:\users\ulysesrico.ivy2\jars它的缓存是c:\users\ulysesrico.ivy2\cache
我只是删除了缓存和jar的折叠,然后我做了:
pyspark——打包com.datasax。spark:spark-cassandra-connector_2.11:2.5.1和á, 我下载了所有的jar和缓存信息。
问题终于解决了。