spark scala cassandra保存/更新

8wtpewkr  于 2021-06-09  发布在  Cassandra
关注(0)|答案(1)|浏览(458)

我有一个实体的spark数据集,它应该保存/更新到cassandra表中,名为'offer'。

case class Offer(offer_id: String, metadata_last_modified_source_time: java.sql.Timestamp, product_type: String)
val offerDataset: Dataset[Offer] = ....

我想用“offer”实体的“metadata\u last\u modified\u source\u time”字段决定的写入时间戳将上述“offerdataset”保存或更新到cassandra。

offerDataset.rdd.saveToCassandra("cassandra_keyspace", "cassandra_table", writeConf = WriteConf(timestamp = TimestampOption.perRow("metadata_last_modified_source_time")))

在给Cassandra写信时,我遇到了以下例外。有人能帮我理解这个问题吗。“metadata\u last\u modified\u source\u time”的util.date和long类型出现相同错误。

com.datastax.driver.core.exceptions.InvalidTypeException: Value metadata_last_modified_source_time is of type bigint, not timestamp
at com.datastax.driver.core.AbstractGettableByIndexData.checkType(AbstractGettableByIndexData.java:83)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:529)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:536)
at com.datastax.driver.core.BoundStatement.set(BoundStatement.java:870)
at com.datastax.spark.connector.writer.BoundStatementBuilder.com$datastax$spark$connector$writer$BoundStatementBuilder$$bindColumnNull(BoundStatementBuilder.scala:59)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$5.apply(BoundStatementBuilder.scala:83)
nle07wnf

nle07wnf1#

我看了这个文件后找到了解决办法-https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
在offer case类中引入了一个新的字段writetime,它应该Map到cassandra表的write timestamp

case class Offer(offer_id: String, metadata_last_modified_source_time: java.sql.Timestamp, product_type: String, writeTime: sql.Date)

在构建offerdataset时,我将writetime字段的值设置为

val offerDataset: Dataset[Offer] = {....
   ....
    val writeTime = new Date(metadata_last_modified_source_time.getTime())
   ....
   ....
}

offerDataset.rdd.saveToCassandra("cassandra_keyspace", "cassandra_table", writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))

相关问题