流式处理spark时如何抑制stdout'batch'?

wj8zmpe1  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(359)

如何更改或完全禁止这个批处理元数据,只显示我的东西?

-------------------------------------------
Batch: 62
-------------------------------------------
+----+-----+-----------+---------+------+--------------------+-------------+
| key|value|      topic|partition|offset|           timestamp|timestampType|
+----+-----+-----------+---------+------+--------------------+-------------+
|null| [32]|transaction|        0|335793|2020-07-27 15:10:...|            0|
+----+-----+-----------+---------+------+--------------------+-------------+

-------------------------------------------
Batch: 63
-------------------------------------------
+----+-----+-----------+---------+------+--------------------+-------------+
| key|value|      topic|partition|offset|           timestamp|timestampType|
+----+-----+-----------+---------+------+--------------------+-------------+
|null| [33]|transaction|        0|335794|2020-07-27 15:10:...|            0|
+----+-----+-----------+---------+------+--------------------+-------------+

-------------------------------------------

spark\u job.py代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import time
KAFKA_TOPIC_NAME_CONS = "transaction"
KAFKA_BOOTSTRAP_SERVERS_CONS = '127.0.0.1:9092'
project_path = 'C:/Users/Admin/Desktop/kafka_project'

if __name__ == "__main__":
    print("PySpark Structured Streaming with Kafka Demo Application Started ...")

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars",                    "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .config("spark.executor.extraClassPath", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .config("spark.executor.extraLibrary",   "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .config("spark.driver.extraClassPath",   "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    incoming_read = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
        .option("subscribe", KAFKA_TOPIC_NAME_CONS)\
        .option("startingOffsets", "earliest")\
        .load()

    query1 = incoming_read.writeStream.format("console").start()
    time.sleep(4)

    query1.awaitTermination()
    incoming_read.awaitTermination()

    print("PySpark demo app finished.")

producer.py永久发送数字0到7,间隔4秒:


# coding=utf8

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8')
                         )

topic = 'transaction'

while True:
    print('restarting the loop...')
    for i in range(7):
        print('producing for this topic %s this blob: %s ' % (topic,i))

        producer.send(topic, value=i)
        sleep(1)

另外,如何实际看到最后一行“pyspark演示应用程序完成”?
当spark.py超时时,我是否需要停止制作并等待?使用spark2.4.6,python3.7

goucqfw6

goucqfw61#

查看类的代码 ConsoleWrite 使用控制台输出时,无法抑制或更改“批处理”输出,因为它似乎是硬编码的:

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
    // We have to print a "Batch" label for the epoch for compatibility 
    // with the pre-data source V2 behavior.
    printRows(messages, schema, s"Batch: $epochId")
  }

  protected def printRows(
      commitMessages: Array[WriterCommitMessage],
      schema: StructType,
      printMessage: String): Unit = {
[...]
    // scalastyle:off println
    println("-------------------------------------------")
    println(printMessage)
    println("-------------------------------------------")
    // scalastyle:off println
[...]
  }

除非有办法压制 println 声明。
另外,如何实际看到最后一行“pyspark演示应用程序完成”?
您需要通过发送sigterm信号或终止您提交的应用程序来终止您的应用程序。但不确定是否能在最后一行代码中看到print语句。制片人与此无关。

相关问题