我正在尝试在java的spark程序中使用sqlapi从azurecosmosdb读取数据。
我有以下代码:
public class CosmosDBTest {
public static void main(String[] args) {
// Logger.getLogger("org.apache").setLevel(Level.WARN);
SparkSession sc = SparkSession.builder().appName("TestCosmosDB").master("local[*]")
.config("spark.debug.maxToStringFields", 10000)
.getOrCreate();
Dataset<Row> ds = sc.read().format("com.microsoft.azure.cosmosdb.spark").options(getCosmosConfigMap("<CosmosEndpoint>",
"<MasterKey>",
"<Database>", "<Collection>", "<SQL Query>")).load();
ds.foreach(row -> {
System.out.println(row);
});
ds.printSchema();
System.out.println("Stopping spark session");
sc.stop();
System.out.println("Spark session stopped");
}
public static scala.collection.Map getCosmosConfigMap(String endpoint, String masterKey, String database,
String collection, String query) {
Map<String, String> configMap = new HashMap<>();
configMap.put("Endpoint", endpoint);
configMap.put("Masterkey", masterKey);
configMap.put("Database", database);
configMap.put("Collection", collection);
configMap.put("query_custom", query);
scala.collection.Map<String, String> scalaConfigMap = JavaConverters.mapAsScalaMapConverter(configMap).asScala().toMap(
Predef.<Tuple2<String, String>>conforms()
);
return scalaConfigMap;
}
}
它工作得非常好。但程序不会终止。我可以在日志中看到sparkcontext被终止,但是java程序永远运行。就好像某个插座或什么东西还开着。我很肯定这和《宇宙读本》有关,但我不知道确切的问题是什么。
任何见解。
谢谢您。
暂无答案!
目前还没有任何答案,快来回答吧!