为什么从Kafka消费不完成在cloudera,但完成在hortonworks?

4ktjp1zp  于 2021-05-31  发布在  Hadoop
关注(0)|答案(0)|浏览(184)

我有这个密码:

import org.apache.spark.sql.SparkSession

object TopicIngester {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]") // remove this later
      .appName("Ingester")
      .getOrCreate()

    spark.read
      .format("kafka")
      .option("kafka.bootstrap.servers", "sandbox-hdp.hortonworks.com:6667" /*my.cluster.com:9092 in case of cloudera*/)
      .option("subscribe", "test")
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .load()
      .write
      .parquet("/user/maria_dev/test")

    spark.stop()
  }
}

当我在hortonworks沙盒中运行它时,一切正常。所有可用数据都从 test 主题并保存到 /user/maria_dev/test 文件夹。
我的cloudera集群上也有一个同名的主题,出于某种原因,它被困在 .parquet("/path/to/folder") 而且永远不会结束,就好像它永远在等待更多的数据。
有什么问题吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题