我的spark流媒体应用程序有以下几行:
我正在尝试将一系列对象写入cassandra中的表(以及文本文件)。我有以下代码:
val rmqReceiver = new RMQReceiver(queueIp, "vehicle-data")
val statusMessageStream = myStreamingContext.receiverStream[String](rmqReceiver)
val vsStream = customReceiverStream.map(jsonToVehicleStatus)
customReceiverStream.foreachRDD((vs: RDD[String])=> vs.saveAsTextFile("/var/log") )
vsStream.foreachRDD((vs: RDD[Vehicle_Status])=> vs.saveToCassandra("vehicle_data","vehicles",AllColumns) )
vsStream.foreachRDD((vs: RDD[Vehicle_Status])=> vs.saveToCassandra("vehicle_data","vehicle_locations",AllColumns) )
我尝试了一系列变体,但结果如下:
文本文件被写入(有时)
第一个调用“saveToCassandra”成功保存记录第二个调用抛出下面列出的异常?
我觉得我错过了一些显而易见的东西,我只是看不到它是什么。
java.io。IOException:无法准备语句INSERT INTO“vehicle_data”。“vehicle_locations”(“timestamp”,“vehicle _id”,“lon”,“geobin”,“lat”)VALUES(:“timestaff”,:“vehible _id”com.datastax.spark.connector.writer.TableWriter$$anonfun$在com.datatax.spark.connector.writter.TableWriter$$annofun$写$1.apply(TableWriters.scala:120(CassandraConnector.scala:99)位于com.datastax.spark.connector.cql.Cassandra connector.closeResourceAfterUse(CassandraConnector.ccala:151),com.datatax.spark.connector.cql.CassandraConnector.withSessionDo(CassandriaConnector.scala:999),com.datatax.sparg.connector.writer.TableWriter.write(TableWriter.scala:120),com/datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra-$1.apply位于com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunnctions.scala:36)的org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)位于org.apacher.spark.cheduler.Task.run的org.aapache.spark.executor.executor$TaskRunner.run(executor.scala:203)java.lang.Thread.run(Thread.java:745)的java.util.concurrent.ThreadPoolExecutor$Worker.run处的(ThreadPool Executor.java:1142)由com.datastax.driver.core.exceptions引起。NoHostAvailableException:在com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)的com.datatax.driver.core.exception.NoHostAvailableException.copy(NoHostAvaileException.java:84)尝试查询的所有主机都失败(未尝试主机)sun.reflect.GeneratedMethodAccessor57.invoke(未知源),位于com.sun.proxy.$Proxy11.prepare(未知源com.sun.proxy.$Proxy11.prepare(未知来源)com.datastax.spark.connector.writer.TableWriter.com$datatax$spark.connector.cql.SessionProxy.invoke(Session代理.scala:28)的com.datatax.spark.comm.PreparedStatementCache$.prepareStatement(PreparedStatementCach.scala:45)更多原因:com.datastax.driver.core.exceptions。NoHostAvailableException:在com.datastax.driver.core.AbstractSession.prepareAsync(AbstractSession.java:103)的com.datatax.driver.core.SessionManager.execute(SessionManager.java:538位于com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:89)……还有24个
此外,此异常:
错误QueryExecutor:无法执行:com.datastax.spark.connector.writer.RichBoundStatement@4892f8c2com.datastax.driver.core.exceptions。NoHostAvailableException:在com.datastax.driver.core.RequestHandler.sendRequest(RequestHangler.java:107)的com.datatax.driver.core.RequestHandler$1.runWorker上,所有尝试查询的主机都失败(已尝试:/52.{MYIP}:9042(com.datastatx.drivercore.TransportException:[/52.{MYIP}:9042]连接已关闭)位于java.lang.Thread.run(Thread.java:745)的java.util.concurrent.ThreadPoolExecutor$Worker.run上的(ThreadPool Executor.java:1142)
我已经将它连接到一个配置良好的集群,当我试图超过每秒6次写入时(每个表3次),就会出现这些错误
1条答案
按热度按时间11dmarpk1#
如果您在本地计算机上使用此选项,请检查密钥空间的复制因子,使其为1,然后重试。这对我来说解决了