以下代码
builder = SparkSession.builder\
.appName("PythonTest11")
spark = builder.getOrCreate()
# spark.conf.set("spark.sql.debug.maxToStringFields", 10000)
# Subscribe to 1 topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
.option("subscribe", dataFlowTopic) \
.load()
df = df \
.selectExpr("LENGTH(value)")
# .selectExpr("CAST(value as string)") \
df.printSchema()
# Start running the query that prints the running counts to the console
query = df \
.writeStream \
.outputMode('append') \
.format('console') \
.start()
query.awaitTermination()
印刷品
+-------------+
|length(value)|
+-------------+
| 4095|
+-------------+
对于任何大消息,也就是说,它会截断传入的字符串。
如何解决这个问题?
1条答案
按热度按时间ocebsuys1#
有点像控制台截短之类的。不是Kafka或Spark问题。
首先我在跑步
然后将消息粘贴到它的命令行并进行截断。
我跑的时候
里面有同样的数据
row01.json
而且它没有被截断。