Apache Spark 我们如何通过指定开始和结束时间戳来使用kinesis或Kafka中的数据块读取历史数据?

ztyzrc3y  于 2023-05-07  发布在  Apache
关注(0)|答案(1)|浏览(226)

让我们说我想读取在2023年3月8日至2023年3月14日期间到达的数据
有没有一种方法可以定义结束位置沿着下面的initialPosition。
spark.readStream.format("kinesis").option("streamName", kinesisStreamName).option("region", kinesisRegion).option("initialPosition", '{"at_timestamp": "03/08/2023 00:00:00 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}').option("awsAccessKey", awsAccessKeyId).option("awsSecretKey", awsSecretKey).load()

6ie5vjzr

6ie5vjzr1#

我认为你正在寻找的是一个批处理不是一个流处理,因为你想回填工作的愿望。
不幸的是,你不能像endPosition config一样设置到Spark Streaming应用程序来读取Kafka或Kinesis。
一些建议:
1-如果您有机会将Kinesis更改为Kafka,则可以使用spark.read(“kafka”)方法代替spark.readStream(“kafka”)。你可以使用下面的参数。

.option("startingOffsets", start_offset) \
    .option("endingOffsets", end_offset) \

2-如果需要使用Kinesis,则可以使用此Kinesis流馈送s3路径。然后,您可以通过设置start-end where条件来使用Spark的数据文件。(我建议AWS-Glue pushdown_predicate特性不要读取所有数据)。
谢谢

相关问题