如何添加到cassandra中的现有值?

qnakjoqk  于 2021-06-13  发布在  Cassandra
关注(0)|答案(1)|浏览(376)

我正在编写一个流应用程序,它要求我在cassandra表中将传入值添加或减去预先存在的值。我已经看到了cql的批处理文档,但是还没有找到解决问题的方法。下面是一个小例子,以了解更多insite:
预存表

table:{
word:{'hello',
neg:{-0.5},
neu:{0.3},
pos:{0.2},
comp:{0.7}
}
}

输入值:

word:{'hello',
neu:{0.4}
}

在这里,我需要添加0.4和0.3,然后在表中重新插入。

pvabu6sv

pvabu6sv1#

您可以通过使用 leftJoinWithCassandraTable ,然后用提取的数据添加输入数据,并将数据写回cassandra。
类似这样的内容(采用我自己的代码):

// this case class is matching to Cassandra table & input data...
case class Data(....)
val data = ...data_that_you_received_and_casted_to_Data_case_class...
val joined = data.leftJoinWithCassandraTable[Data]("ks", "table")
// perform update of existing values, and prepare new data
val summed = joined.map({ case (n: Data, c: Option[Data]) =>
 c match {
   // if there is no data in Cassandra, just return input data
   case None => Data(n)
   // there is data in Cassandra, do the sum
   case Some(s) =>
     Data(..., n.neu + s.neu)}
})
// and write updated/new values
summed.saveToCassandra("ks", "table")

p、 我建议使用spark cassandra connector 2.5.0直接支持的spark结构化流媒体,以及所谓的Dataframe直接连接。当使用Dataframe时,您的代码会更简单,并且可能会更优化。

相关问题