spark:如何在保存到cassandra时配置writetime

uurity8g  于 2021-06-09  发布在  Cassandra
关注(0)|答案(1)|浏览(444)

我有一个类似于Cassandra表的实体。我正在使用spark将数据保存/更新到cassandra。这里的实体是offer case类

case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, product_type: String, writeTime: util.Date)

val offerDataset: Dataset[Offer] = ....

我将此数据保存如下

offerDataset.write.format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> cassandraKeyspace, "table" -> tableName))
      .mode(SaveMode.Append)
      .save()

cassandra表的模式是

OFFER(offer_id, metadata_last_modified_source_time, product_type)

问题是在保存/更新cassandra表时,将offer实体的writetime字段配置为write timestamp。这是在这里提到的税收-https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md 像这样配置

writetime=columnName


我不明白的是语法应该是什么样子。
任何帮助都将不胜感激

sqxo8psd

sqxo8psd1#

本文档是为阿尔法版本的SparkCassandra连接器,所以请期待一些不工作。正如文档中指出的-这是一个表选项,因此您可以通过 options . 你只需要从 util.DateTimestamp 或者 Long 类型-spark sql不支持从 Date 类型。
根据以下定义,一切正常:

import java.time.Instant
import java.sql.Timestamp

case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, 
  product_type: String, writeTime: Long)

val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", 1243124234L),
  Offer("456", Timestamp.from(Instant.now()), "test", 12431242366L)).toDF

或与 Timestamp :

case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, 
   product_type: String, writeTime: Timestamp)

val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", new Timestamp(1243124234L)),
  Offer("456", Timestamp.from(Instant.now()), "test", new Timestamp(12431242366L))).toDF

如果我们使用下表结构:

create table test.wrt_test (
  offer_id text,
  metadata_last_modified_source_time timestamp,
  product_type text,
  primary key(offer_id, metadata_last_modified_source_time));

然后可以将数据保存为以下格式(仅在3.0-alpha中!):

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.cassandra._
offerDataset.write.cassandraFormat("wrt_test", "test")
    .option("writetime", "writeTime") // here you specify name of the column with time!
    .mode(SaveMode.Append).save()

但如果您使用rdd api,它在当前版本中也可以正常工作:

import com.datastax.spark.connector.writer._
offerDataset.rdd.saveToCassandra("test", "wrt_test", 
   writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))

在这两种情况下,你都会得到以下结果:

cqlsh> select offer_id, metadata_last_modified_source_time, product_type, writetime(product_type) from test.wrt_test;
offer_id | metadata_last_modified_source_time | product_type | writetime(product_type)
----------+------------------------------------+--------------+-------------------------
      123 |    2020-04-16 07:28:38.905000+0000 |         test |              1243124234
      456 |    2020-04-16 07:28:38.905000+0000 |         test |             12431242366
(2 rows)

相关问题