在下面的代码中,摘自文档:
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
应用于数据流transactions
的.keyBy(Transaction::getAccountId)
函数是否对传入数据进行分区(基于帐户ID),并在每个分区上执行process(new FraudDetector())
,如下图所示?
2)如果是,如何写一个Map器来处理从每个分区流的数据?使用.keyBy(Transaction::getAccountId)
?
1条答案
按热度按时间n1bvdmb61#
keyBy
被应用于数据流transactions
。在应用
keyBy
之后,来自transactions
的记录与相同的account ID
将在同一个分区中,并且您可以应用来自KeyedStream
的函数,如process
(不推荐,因为它被标记为已弃用),window
,reduce
,min/max/sum
等。有很多使用keyBy的例子,例如:此链接