不知何故,我无法得到分组数据集的结果。我将连接两个流(来自Kafka)并将它们Map到一个新流,该流只包含两个输入的组合字符串:
//Dataset<Row> orderKeyLineNumber = ... from Kafka
//Dataset<Row> orderKeyOrderStatus = ... from Kafka
Dataset<Row> joined = orderKeyLineNumber.join(orderKeyOrderStatus, "orderKey")
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.TIMESTAMP(), Encoders.STRING(), Encoders.TIMESTAMP()))
.map(new Transformer.join(), Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
.toDF("lineNumberOrderStatus", "timestamp")
.withWatermark("timestamp", "30 seconds");
我确实验证了连接的输出是否如预期的那样。但是,如果我试着数一数 lineNumberOrderStatus
,没有任何输出(无论是在控制台还是Kafka中)。我确实等待了“足够长的时间”,并且确信所有的事件都已经被处理(查看流执行线程的日志,其中 numRowsTotal
等等)
我试着数数:
Dataset<Row> counts = joined.groupBy(functions.window(joined.col("timestamp"), "30 seconds", "30 seconds"), joined.col("lineNumberOrderStatus"))
.count()
.selectExpr("lineNumberOrderStatus AS key", "count AS value");
和
Dataset<Row> counts = joined.groupBy(functions.window(joined.col("timestamp"), "30 seconds", "30 seconds"), joined.col("lineNumberOrderStatus"))
.agg(functions.count("lineNumberOrderStatus"))
.selectExpr("lineNumberOrderStatus AS key", "`count(lineNumberOrderStatus)` AS value");
有人能告诉我出了什么事吗?
编辑
联接中的Map是
// input: orderKey, l_lineNumber, l_timestamp, o_orderStatus, o_timestamp
public static class join implements MapFunction<Tuple5<String, String, Timestamp, String, Timestamp>, Tuple2<String, Timestamp>> {
@Override
public Tuple2<String, Timestamp> call(Tuple5<String, String, Timestamp, String, Timestamp> t) throws Exception {
//return new Tuple2<>("(l: " + t._2() + ", o: " + t._4() + ")", t._3()));
return new Tuple2<>("(l: " + t._2() + ", o: " + t._4() + ")", new Timestamp(System.currentTimeMillis()));
}
}
暂无答案!
目前还没有任何答案,快来回答吧!