我是Kafka和PySpark+Structured Streaming的新手,我们需要从Kafka主题中流式传输数据,并在数据进行多次转换时将数据摄取到另一个表中。我理解检查点的概念,它的用途和优点。但有一件事让我感到困惑,那就是在哪里使用它?ReadStream还是WriteStream?
例如,我是这样阅读的:
def read_from_kafka(spark: SparkSession, kafka_config: dict, topic_name: str, column_schema: str, checkpoint_location: str):
stream_df = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', kafka_config['broker']) \
.option('subscribe', topic_name) \
.option('kafka.security.protocol', kafka_config['security_protocol']) \
.option('kafka.sasl.mechanism', kafka_config['sasl_mechanism']) \
.option('kafka.sasl.jaas.config', jass_config) \
.option('kafka.sasl.login.callback.handler.class', kafka_config['sasl_login_callback_handler_class']) \
.option('startingOffsets', 'earliest') \
.option("maxOffsetsPerTrigger", kafka_config['max_offsets_per_trigger']) \
.option('checkpointLocation', checkpoint_location) \
.load() \
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
return stream_df
stream_df = read_from_kafka(spark, config, topic_name, schema, checkpoint_location)
并将相同的df写入如下所示的某个位置。
def write_data(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str):
kafka_df.writeStream \
.format('kafka') \
.foreachBatch(some_func) \
.option('checkpointLocation', checkpoint_location) \
.start()
.awaitTermination()
def some_fun(kafka_df: DataFrame, batch_id: int):
kafka_df.write.parquet(s'some_path_{batch_id}')
我的困惑是我需要在两个读和写流检查点吗?我看到一些文档,检查点只在WriteStream()
使用如果我只在WriteStream()
中使用它,而不在ReadStream()
中使用它,Kafka+SparkStream如何知道应该从主题的哪个偏移量继续读取?如果我在ReadStream()
和WriteStream()
中都使用它,会有任何影响吗?
谁能告诉我在从Kafka主题阅读和写入数据时,在什么地方使用检查点是正确的?
1条答案
按热度按时间eimct9ow1#
在消费时,Spark可以使用Kafka本身存储的消费者组偏移量,因此不需要检查点。
检查点存储您执行的操作的状态,而不仅仅是Kafka偏移量。
如果它没有对您的应用程序产生负面影响,那么检查这两个。