我正在使用 spark.readStream
从Kafka读取数据并对结果Dataframe执行分解。我试图保存在一个配置单元表爆炸的结果,我无法找到任何解决方案。我尝试了下面的方法,但它不起作用(它运行,但我没有看到任何新的分区创建)
val query = tradelines.writeStream.outputMode("append")
.format("memory")
.option("truncate", "false")
.option("checkpointLocation", checkpointLocation)
.queryName("tl")
.start()
sc.sql("set hive.exec.dynamic.partition.mode=nonstrict;")
sc.sql("INSERT INTO TABLE default.tradelines PARTITION (dt) SELECT * FROM tl")
1条答案
按热度按时间tvz2xvvm1#
检查hdfs的
dt
文件系统上的分区你得跑了
MSCK REPAIR TABLE
在配置单元表上查看新分区。如果您没有对spark做任何特殊的操作,那么值得指出的是kafka connect hdfs能够直接从kafka注册hive分区。