我有这个密码:
import org.apache.spark.sql.SparkSession
object TopicIngester {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]") // remove this later
.appName("Ingester")
.getOrCreate()
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "sandbox-hdp.hortonworks.com:6667" /*my.cluster.com:9092 in case of cloudera*/)
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
.write
.parquet("/user/maria_dev/test")
spark.stop()
}
}
当我在hortonworks沙盒中运行它时,一切正常。所有可用数据都从 test
主题并保存到 /user/maria_dev/test
文件夹。
我的cloudera集群上也有一个同名的主题,出于某种原因,它被困在 .parquet("/path/to/folder")
而且永远不会结束,就好像它永远在等待更多的数据。
有什么问题吗?
暂无答案!
目前还没有任何答案,快来回答吧!