如何更改或完全禁止这个批处理元数据,只显示我的东西?
-------------------------------------------
Batch: 62
-------------------------------------------
+----+-----+-----------+---------+------+--------------------+-------------+
| key|value| topic|partition|offset| timestamp|timestampType|
+----+-----+-----------+---------+------+--------------------+-------------+
|null| [32]|transaction| 0|335793|2020-07-27 15:10:...| 0|
+----+-----+-----------+---------+------+--------------------+-------------+
-------------------------------------------
Batch: 63
-------------------------------------------
+----+-----+-----------+---------+------+--------------------+-------------+
| key|value| topic|partition|offset| timestamp|timestampType|
+----+-----+-----------+---------+------+--------------------+-------------+
|null| [33]|transaction| 0|335794|2020-07-27 15:10:...| 0|
+----+-----+-----------+---------+------+--------------------+-------------+
-------------------------------------------
spark\u job.py代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import time
KAFKA_TOPIC_NAME_CONS = "transaction"
KAFKA_BOOTSTRAP_SERVERS_CONS = '127.0.0.1:9092'
project_path = 'C:/Users/Admin/Desktop/kafka_project'
if __name__ == "__main__":
print("PySpark Structured Streaming with Kafka Demo Application Started ...")
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka Demo") \
.master("local[*]") \
.config("spark.jars", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
.config("spark.executor.extraClassPath", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
.config("spark.executor.extraLibrary", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
.config("spark.driver.extraClassPath", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
incoming_read = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option("subscribe", KAFKA_TOPIC_NAME_CONS)\
.option("startingOffsets", "earliest")\
.load()
query1 = incoming_read.writeStream.format("console").start()
time.sleep(4)
query1.awaitTermination()
incoming_read.awaitTermination()
print("PySpark demo app finished.")
producer.py永久发送数字0到7,间隔4秒:
# coding=utf8
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8')
)
topic = 'transaction'
while True:
print('restarting the loop...')
for i in range(7):
print('producing for this topic %s this blob: %s ' % (topic,i))
producer.send(topic, value=i)
sleep(1)
另外,如何实际看到最后一行“pyspark演示应用程序完成”?
当spark.py超时时,我是否需要停止制作并等待?使用spark2.4.6,python3.7
1条答案
按热度按时间goucqfw61#
查看类的代码
ConsoleWrite
使用控制台输出时,无法抑制或更改“批处理”输出,因为它似乎是硬编码的:除非有办法压制
println
声明。另外,如何实际看到最后一行“pyspark演示应用程序完成”?
您需要通过发送sigterm信号或终止您提交的应用程序来终止您的应用程序。但不确定是否能在最后一行代码中看到print语句。制片人与此无关。