我正在尝试执行一个简单的spark结构化流应用程序,目前并不期望从本地Kafka集群中提取并写入本地文件系统。代码如下所示:
private static final String TARGET_PATH = "orchestration/target/myfolder/";
private static final String BOOTSTRAP_SERVER = "localhost:9092";
private static final String KAFKA_TOPIC = "twitter_aapl2";
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession.builder().master("local[*]").appName("spark app").getOrCreate();
Dataset<Row> df = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVER)
.option("subscribe", KAFKA_TOPIC)
.load();
StreamingQuery query = df.writeStream()
.outputMode("append")
.format("parquet")
.option("path", TARGET_PATH + "data/")
.option("checkpointLocation", TARGET_PATH + "checkpoints/")
.start();
query.awaitTermination();
但是在执行时,我得到了下面的输出,并且我的数据没有得到真正的处理。
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
我该怎么补救呢?
3条答案
按热度按时间eeq64g8w1#
我必须通过设置日志配置来避免这种情况:
虽然这是预期行为,但似乎没有必要每隔几毫秒记录一次“查找到最新偏移量”消息。它会使日志文件中的所有其他应用程序日志变得模糊。从并非始终非常活跃的主题使用时,此问题会变得更加令人担忧。如果它只是在调试级别而不是信息级别,效果会更好。
n3ipq98p2#
事实证明,这种查找和重置的行为在不从头读到主题而是从最新偏移量读到主题的情况下是非常理想的。管道只读取在运行时发送到Kafka主题的新数据,因为没有发送新数据,查找(新数据)和重置(到最新偏移量)的无限循环。
底线是,只需从头读起或发送新数据,问题就解决了。
insrf1ej3#
假设这只是因为Kafka Consumer SubscriptionState的详细日志记录
如果您使用的是带有log4j2设置(默认设置)的Spark 3.3.x,那么下面的代码可能对您有用
将以下代码段添加到
$SPARK_HOME/conf/log4j2.properties