cassandraio不适用于保存时间戳

r7xajy2e  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(314)

这是我的简单代码,它从pubsub订阅中读取消息,并用当前时间戳将消息体保存到cassandra表中。
消息是从订阅中使用的,但没有向表插入记录,也没有错误消息。
但是,如果我在类testtable中将日期类型“timestamp”更改为long,那么这段代码可以工作并将记录插入表中。
这里是创建表的脚本。

DROP TABLE IF EXISTS test_table;

CREATE TABLE IF NOT EXISTS test_table(
    post_index int,
    ingestion_time TIMESTAMP,
    body text,
    PRIMARY KEY ((post_index))
);
@Table(keyspace = "{keyspace_name}", name = "{table_name}",
        readConsistency = "LOCAL_QUORUM",
        writeConsistency = "LOCAL_QUORUM",
        caseSensitiveKeyspace = false,
        caseSensitiveTable = false)
class TestTable implements Serializable {
  @PartitionKey
  @Column(name="post_index")
  Integer postIndex;
  @Column(name="ingestion_time")
  Timestamp ingestionTime;
  @Column(name = "body")
  String body;

    public Integer getPostIndex() {
        return postIndex;
    }

    public void setPostIndex(Integer postIndex) {
        this.postIndex = postIndex;
    }

    public Timestamp getIngestionTime() {
        return ingestionTime;
    }

    public void setIngestionTime(Timestamp ingestionTime) {
        this.ingestionTime = ingestionTime;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }

    public TestTable(Integer postIndex, Timestamp ingestionTime, String body) {
      this.body = body;
      this.ingestionTime = ingestionTime;
      this.postIndex = postIndex;
  }
  public TestTable() {
        this.body = "";
        this.ingestionTime = Timestamp.from(Instant.now());
        this.postIndex = 0;
  }
}

public class TestCassandraJobJava {

    public static void main(String[] args) {

        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());

       PCollection<String> data = pipeline.apply("ReadStrinsFromPubsub",
                PubsubIO.readStrings().fromSubscription("projects/{project_id}/subscriptions/{subscription_name}"))
                .apply("window", Window.into(FixedWindows.of(Duration.standardSeconds(5))))
        .apply("CreateMutation", ParDo.of(new DoFn<String, TestTable>() {
            @ProcessElement
            public void processElement(@Element String word, OutputReceiver<TestTable> out) {
                TestTable t = new TestTable(new Random().nextInt(), java.sql.Timestamp.from(Instant.now()), word);
                out.output(t);
            }
        })).apply(CassandraIO.<TestTable>write()
                        .withHosts(Arrays.asList("127.0.0.1"))
                        .withPort(9042)
                        .withKeyspace("{keyspace}")
                        .withLocalDc("Cassandra")
                        .withEntity(TestTable.class)
                );
        pipeline.run().waitUntilFinish();
    }
}
2ul0zpep

2ul0zpep1#

为了让它工作,你需要有一个编码解码器之间的Cassandra的 timestamp 以及 java.sql.Timestamp . 默认情况下,在java驱动程序3.x中 timestamp 转换为 java.util.Date (请参阅Map),尽管您也可以通过额外的编解码器使用joda time或java8.x time api。在java驱动程序4.x中 Instant 用于表示时间戳。
没有用于的内置编解码器 java.sql.Timestamp ,但实现自己的代码应该不是很难-文档详细描述了自定义编解码器的创建和使用过程。

相关问题