spark structured streaming将kafka值字符串截断为4095

wydwbb8l  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(417)

以下代码

builder = SparkSession.builder\
   .appName("PythonTest11")
spark = builder.getOrCreate()

# spark.conf.set("spark.sql.debug.maxToStringFields", 10000)

# Subscribe to 1 topic

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
  .option("subscribe", dataFlowTopic) \
  .load()

df = df \
    .selectExpr("LENGTH(value)")

# .selectExpr("CAST(value as string)") \

df.printSchema()

# Start running the query that prints the running counts to the console

query = df \
    .writeStream \
    .outputMode('append') \
    .format('console') \
    .start()

query.awaitTermination()

印刷品

+-------------+
|length(value)|
+-------------+
|         4095|
+-------------+

对于任何大消息,也就是说,它会截断传入的字符串。
如何解决这个问题?

ocebsuys

ocebsuys1#

有点像控制台截短之类的。不是Kafka或Spark问题。
首先我在跑步


# kafka-console-producer.sh --topic dataflow --bootstrap-server localhost:9092

然后将消息粘贴到它的命令行并进行截断。
我跑的时候


# kafka-console-producer.sh --topic dataflow --bootstrap-server localhost:9092 < row01.json

里面有同样的数据 row01.json 而且它没有被截断。

相关问题