对加入的流执行groupby操作后无法获得结果

w41d8nur  于 2021-05-19  发布在  Spark
关注(0)|答案(0)|浏览(208)

不知何故,我无法得到分组数据集的结果。我将连接两个流(来自Kafka)并将它们Map到一个新流,该流只包含两个输入的组合字符串:

//Dataset<Row> orderKeyLineNumber = ... from Kafka
//Dataset<Row> orderKeyOrderStatus = ... from Kafka

Dataset<Row> joined = orderKeyLineNumber.join(orderKeyOrderStatus, "orderKey")
                .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.TIMESTAMP(), Encoders.STRING(), Encoders.TIMESTAMP()))
                .map(new Transformer.join(), Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
                .toDF("lineNumberOrderStatus", "timestamp")
                .withWatermark("timestamp", "30 seconds");

我确实验证了连接的输出是否如预期的那样。但是,如果我试着数一数 lineNumberOrderStatus ,没有任何输出(无论是在控制台还是Kafka中)。我确实等待了“足够长的时间”,并且确信所有的事件都已经被处理(查看流执行线程的日志,其中 numRowsTotal 等等)
我试着数数:

Dataset<Row> counts = joined.groupBy(functions.window(joined.col("timestamp"), "30 seconds", "30 seconds"), joined.col("lineNumberOrderStatus"))
                .count()
                .selectExpr("lineNumberOrderStatus AS key", "count AS value");

Dataset<Row> counts = joined.groupBy(functions.window(joined.col("timestamp"), "30 seconds", "30 seconds"), joined.col("lineNumberOrderStatus"))
                .agg(functions.count("lineNumberOrderStatus"))
                .selectExpr("lineNumberOrderStatus AS key", "`count(lineNumberOrderStatus)` AS value");

有人能告诉我出了什么事吗?
编辑
联接中的Map是

// input: orderKey, l_lineNumber, l_timestamp, o_orderStatus, o_timestamp
public static class join implements MapFunction<Tuple5<String, String, Timestamp, String, Timestamp>, Tuple2<String, Timestamp>> {
        @Override
        public Tuple2<String, Timestamp> call(Tuple5<String, String, Timestamp, String, Timestamp> t) throws Exception {
            //return new Tuple2<>("(l: " + t._2() + ", o: " + t._4() + ")", t._3()));
            return new Tuple2<>("(l: " + t._2() + ", o: " + t._4() + ")", new Timestamp(System.currentTimeMillis()));
        }
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题