我有一个类似于Cassandra表的实体。我正在使用spark将数据保存/更新到cassandra。这里的实体是offer case类
case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, product_type: String, writeTime: util.Date)
val offerDataset: Dataset[Offer] = ....
我将此数据保存如下
offerDataset.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> cassandraKeyspace, "table" -> tableName))
.mode(SaveMode.Append)
.save()
cassandra表的模式是
OFFER(offer_id, metadata_last_modified_source_time, product_type)
问题是在保存/更新cassandra表时,将offer实体的writetime字段配置为write timestamp。这是在这里提到的税收-https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md 像这样配置
writetime=columnName
我不明白的是语法应该是什么样子。
任何帮助都将不胜感激
1条答案
按热度按时间sqxo8psd1#
本文档是为阿尔法版本的SparkCassandra连接器,所以请期待一些不工作。正如文档中指出的-这是一个表选项,因此您可以通过
options
. 你只需要从util.Date
至Timestamp
或者Long
类型-spark sql不支持从Date
类型。根据以下定义,一切正常:
或与
Timestamp
:如果我们使用下表结构:
然后可以将数据保存为以下格式(仅在3.0-alpha中!):
但如果您使用rdd api,它在当前版本中也可以正常工作:
在这两种情况下,你都会得到以下结果: