scala | spark | azure cosmos db | java程序不终止

ru9i0ody  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(265)

我正在尝试在java的spark程序中使用sqlapi从azurecosmosdb读取数据。
我有以下代码:

public class CosmosDBTest {
     public static void main(String[] args) {
//        Logger.getLogger("org.apache").setLevel(Level.WARN);

        SparkSession sc = SparkSession.builder().appName("TestCosmosDB").master("local[*]")
                .config("spark.debug.maxToStringFields", 10000)
                .getOrCreate();
        Dataset<Row> ds = sc.read().format("com.microsoft.azure.cosmosdb.spark").options(getCosmosConfigMap("<CosmosEndpoint>",
                "<MasterKey>",
                "<Database>", "<Collection>", "<SQL Query>")).load();
        ds.foreach(row -> {
            System.out.println(row);
        });
        ds.printSchema();
        System.out.println("Stopping spark session");
        sc.stop();
        System.out.println("Spark session stopped");
    }

    public static scala.collection.Map getCosmosConfigMap(String endpoint, String masterKey, String database,
                                                          String collection, String query) {
        Map<String, String> configMap = new HashMap<>();
        configMap.put("Endpoint", endpoint);
        configMap.put("Masterkey", masterKey);
        configMap.put("Database", database);
        configMap.put("Collection", collection);
        configMap.put("query_custom", query);
        scala.collection.Map<String, String> scalaConfigMap = JavaConverters.mapAsScalaMapConverter(configMap).asScala().toMap(
                Predef.<Tuple2<String, String>>conforms()
        );
        return scalaConfigMap;
    }
}

它工作得非常好。但程序不会终止。我可以在日志中看到sparkcontext被终止,但是java程序永远运行。就好像某个插座或什么东西还开着。我很肯定这和《宇宙读本》有关,但我不知道确切的问题是什么。
任何见解。
谢谢您。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题