我正在使用spark结构化流媒体将记录发送到Kafka主题。Kafka主题是用配置创建的- message.timestamp.type=CreateTime
这样做是为了使目标Kafka主题记录与原始记录具有相同的时间戳。
我的Kafka流代码:
kafkaRecords.selectExpr("CAST(key AS STRING)", "CAST(value AS BINARY)","CAST(timestamp AS TIMESTAMP)")
.write
.format("kafka")
.option("kafka.bootstrap.servers","IP Of kafka")
.option("topic",targetTopic)
.option("kafka.max.in.flight.requests.per.connection", "1")
.option("checkpointLocation",checkPointLocation)
.save()
但是,这并不保留原始时间戳2018/11/04,而是时间戳反映了最新日期2018/11/9。
另一方面,为了确认kafka config正在运行,当我显式地创建一个kafka生产者和生产者记录,并将其发送给其他人时,原始的时间戳将被保留。
我怎样才能在Kafka结构化流媒体中得到同样的行为。
1条答案
按热度按时间hk8txs481#
这个
CreateTime
一个主题的配置意味着当记录被创建时,就是您得到的时间。不清楚您在哪里读取数据和看到时间戳,如果您运行的是生产者代码“today”,那么这就是他们得到的时间,而不是之前。
如果你想要过去的时间戳,你就需要真正的
ProducerRecord
通过使用包含timestamp参数的构造函数,spark包含该时间戳,但spark不公开它。如果你只是把时间戳放在有效负载值中,就像你正在做的那样,那就是你想做分析的时间,可能不是一个时间
ConsumerRecord.timestamp()
如果你想准确地将数据从一个主题复制到另一个主题,Kafka就使用mirrormaker来实现这一点。那么您只需要配置文件,而不需要编写和部署spark代码