发送原始时间戳而不是当前时间戳

qv7cva1a  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(294)

我正在使用spark结构化流媒体将记录发送到Kafka主题。Kafka主题是用配置创建的- message.timestamp.type=CreateTime 这样做是为了使目标Kafka主题记录与原始记录具有相同的时间戳。
我的Kafka流代码:

kafkaRecords.selectExpr("CAST(key AS STRING)", "CAST(value AS BINARY)","CAST(timestamp AS TIMESTAMP)")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers","IP Of kafka")
    .option("topic",targetTopic)
    .option("kafka.max.in.flight.requests.per.connection", "1")
    .option("checkpointLocation",checkPointLocation)
    .save()

但是,这并不保留原始时间戳2018/11/04,而是时间戳反映了最新日期2018/11/9。
另一方面,为了确认kafka config正在运行,当我显式地创建一个kafka生产者和生产者记录,并将其发送给其他人时,原始的时间戳将被保留。
我怎样才能在Kafka结构化流媒体中得到同样的行为。

hk8txs48

hk8txs481#

这个 CreateTime 一个主题的配置意味着当记录被创建时,就是您得到的时间。
不清楚您在哪里读取数据和看到时间戳,如果您运行的是生产者代码“today”,那么这就是他们得到的时间,而不是之前。
如果你想要过去的时间戳,你就需要真正的 ProducerRecord 通过使用包含timestamp参数的构造函数,spark包含该时间戳,但spark不公开它。
如果你只是把时间戳放在有效负载值中,就像你正在做的那样,那就是你想做分析的时间,可能不是一个时间 ConsumerRecord.timestamp() 如果你想准确地将数据从一个主题复制到另一个主题,Kafka就使用mirrormaker来实现这一点。那么您只需要配置文件,而不需要编写和部署spark代码

相关问题