spark sql在spark流中失败(kafkastream)

hlswsv35  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(343)

我在spark流作业中使用spark sql在配置单元表中搜索。Kafka流媒体工作正常,没有问题。如果我跑了 hiveContext.runSqlHive(sqlQuery); 外面 directKafkaStream.foreachRDD 它工作正常,没有问题。但我需要在流式处理作业中查找配置单元表。jdbc的使用( jdbc:hive2:// )可以,但我想使用sparksql。
我的源代码的重要位置如下所示:

// set context
SparkConf sparkConf = new SparkConf().setAppName(appName).set("spark.driver.allowMultipleContexts", "true");
SparkContext sparkSqlContext = new SparkContext(sparkConf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(batchDuration));
HiveContext hiveContext = new HiveContext(sparkSqlContext);

// Initialize Direct Spark Kafka Stream. Starts from top
JavaPairInputDStream<String, String> directKafkaStream =
                KafkaUtils.createDirectStream(streamingContext,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParams,
                        topicsSet);

// work on stream                   
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> {
    rdd.foreachPartition(tuple2Iterator -> {
        // get message
        Tuple2<String, String> item = tuple2Iterator.next();

        // lookup
        String sqlQuery = "SELECT something FROM somewhere";
        Seq<String> resultSequence = hiveContext.runSqlHive(sqlQuery);
        List<String> result = scala.collection.JavaConversions.seqAsJavaList(resultSequence);

        });
    return null;
});

// Start the computation
streamingContext.start();
streamingContext.awaitTermination();

我没有得到任何有意义的错误,即使我周围的尝试捕捉。
我希望有人能帮忙-谢谢。
//编辑:解决方案如下所示:

// work on stream                   
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> {
    // driver
    Map<String, String> lookupMap = getResult(hiveContext); //something with hiveContext.runSqlHive(sqlQuery);
    rdd.foreachPartition(tuple2Iterator -> {
        // worker
        while (tuple2Iterator != null && tuple2Iterator.hasNext()) {
            // get message
            Tuple2<String, String> item = tuple2Iterator.next();
            // lookup
            String result = lookupMap.get(item._2());
        }
    });
    return null;
});
ijxebb2r

ijxebb2r1#

仅仅因为你想使用sparksql,这是不可能的。spark的第一条规则是没有嵌套操作、转换或分布式数据结构。
如果您可以将查询表示为join,那么可以使用push将其提升到更高的级别 foreachRDD 在这里,使用spark sql几乎耗尽了您的选择:

directKafkaStream.foreachRDD(rdd -> 
   hiveContext.runSqlHive(sqlQuery)
   rdd.foreachPartition(...)
)

否则,直接jdbc连接可能是一个有效的选项。

相关问题