从cassandra查询的数据不能在同一列上再次筛选(invalidqueryexception)

a11xaf1n  于 2021-06-15  发布在  Cassandra
关注(0)|答案(1)|浏览(279)

我正在尝试按时间从cassandra查询一大块数据,然后使用spark数据集一次获取要处理的较小数据块,但是,应用程序失败,出现无效的查询异常:

WARN  2018-11-22 13:16:54 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.212, executor 0): java.io.IOException: Exception during preparation of SELECT "userid", "event_time", "value" FROM "user_1234"."data" WHERE token("userid") > ? AND token("userid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?   ALLOW FILTERING: More than one restriction was found for the start bound on event_time
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: More than one restriction was found for the start bound on event_time
        at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:41)
        at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:28)
        at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:108)
        at com.datastax.driver.dse.DefaultDseSession.prepare(DefaultDseSession.java:278)
        at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)

这是我试图执行的代码:

case class RawDataModel(userid: String, event_time: Long, value: Double)
var dtRangeEnd = System.currentTimeMillis()
var dtRangeStart = (dtRangeEnd - (60 * 60 * 1000).toLong)

val queryTimeRange = "SELECT * FROM user1234.datafile WHERE event_time >= " + dtRangeStart

val dataFrame = sparkSession.sql(queryTimeRange)

import sparkSession.implicits._
val dataSet: Dataset[RawDataModel] = dataFrame.as[RawDataModel]

dataSet.show(1)

dtRangeEnd = System.currentTimeMillis()
dtRangeStart = (dtRangeEnd - (15 * 60 * 1000).toLong)

val dtRangeData = dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))

dtRangeData.show(1)

注意:这不是一个数据集问题,因为我已经尝试用没有区别的Dataframe交换它们。起初我认为这是一个延迟求值问题,两个不同的边界同时被延迟应用,但是dataset.show(1)命令应该调用一个早期聚合,并避免级联求值

rjee0c15

rjee0c151#

Spark合并 sparkSession.sql(queryTimeRange) 以及 dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) 变成一个命令,在cql中如下所示:
从“company”(公司)5a819ee2522e572c8a16a43a.“data”(数据)中选择“sensorid”、“event”(事件)、“time”(时间)、“value”(值),其中标记(“sensorid”)>?和令牌(“sensorid”)<=?和“事件时间”>=?和“事件时间”>=?和“事件时间”<=?
在同一个区域有两个相同的限制 "event_time" >= ? .
如果你坚持 dataFrame 在执行 .filter spark将计算 dataFrame 与…分开 .filter :

val dataFrame = sparkSession.sql(queryTimeRange)
dataFrame.persist
dataFrame.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))

相关问题