使用Kafka进行Spark Structured Streaming时,我们什么时候需要检查点?在ReadStream还是Write Stream中?

p1iqtdky  于 2023-03-28  发布在  Apache
关注(0)|答案(1)|浏览(146)

我是Kafka和PySpark+Structured Streaming的新手,我们需要从Kafka主题中流式传输数据,并在数据进行多次转换时将数据摄取到另一个表中。我理解检查点的概念,它的用途和优点。但有一件事让我感到困惑,那就是在哪里使用它?ReadStream还是WriteStream?
例如,我是这样阅读的:

def read_from_kafka(spark: SparkSession, kafka_config: dict, topic_name: str, column_schema: str, checkpoint_location: str):
    stream_df = spark.readStream.format('kafka') \
        .option('kafka.bootstrap.servers', kafka_config['broker']) \
        .option('subscribe', topic_name) \
        .option('kafka.security.protocol', kafka_config['security_protocol']) \
        .option('kafka.sasl.mechanism', kafka_config['sasl_mechanism']) \
        .option('kafka.sasl.jaas.config', jass_config) \
        .option('kafka.sasl.login.callback.handler.class', kafka_config['sasl_login_callback_handler_class']) \
        .option('startingOffsets', 'earliest') \
        .option("maxOffsetsPerTrigger", kafka_config['max_offsets_per_trigger']) \
        .option('checkpointLocation', checkpoint_location) \
        .load() \
        .select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
    return stream_df

stream_df = read_from_kafka(spark, config, topic_name, schema, checkpoint_location)

并将相同的df写入如下所示的某个位置。

def write_data(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str):
    kafka_df.writeStream \
        .format('kafka') \
        .foreachBatch(some_func) \
        .option('checkpointLocation', checkpoint_location) \
        .start()
        .awaitTermination()

def some_fun(kafka_df: DataFrame, batch_id: int):
    kafka_df.write.parquet(s'some_path_{batch_id}')

我的困惑是我需要在两个读和写流检查点吗?我看到一些文档,检查点只在WriteStream()使用如果我只在WriteStream()中使用它,而不在ReadStream()中使用它,Kafka+SparkStream如何知道应该从主题的哪个偏移量继续读取?如果我在ReadStream()WriteStream()中都使用它,会有任何影响吗?
谁能告诉我在从Kafka主题阅读和写入数据时,在什么地方使用检查点是正确的?

eimct9ow

eimct9ow1#

在消费时,Spark可以使用Kafka本身存储的消费者组偏移量,因此不需要检查点。
检查点存储您执行的操作的状态,而不仅仅是Kafka偏移量。
如果它没有对您的应用程序产生负面影响,那么检查这两个。

相关问题