我想分析flink对rocksdb进行的每个api调用所花费的时间。但是,我找不到这些函数。
我曾尝试在ide中设置flink的完整源代码,将流示例集成到源代码中,启动调试器并参与许多调用,但都是徒劳的。
以下是示例:
package org.apache.flink.streaming.examples.spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
/**
* Normal code.
*/
public class FraudDetectionAvi {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
env.enableCheckpointing(60000);
env.setStateBackend(new RocksDBStateBackend("file:///home/avsrivas/dev/flink/checkpoints", true));
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetectorAvi())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
点击这里获取完整的源代码。
我尝试进入execute,但是我无法推断调用rocksdb来保存状态的函数。
1条答案
按热度按时间wgeznvg71#
当rocksdb用作flink应用程序的状态后端时,任何密钥分区状态的工作副本都存储在每个任务管理器的本地嵌入式rocksdb示例中。计时器也可以放在那里,也可以堆在一起。rocksdb将其状态保存在本地磁盘上;非键控状态总是在堆上。
拍摄快照时(即,在检查点期间或拍摄保存点时),rocksdb中存储的状态(异步)复制到快照存储(应该是分布式文件系统)。
在应用程序中,当您调用
flagState.update(true)
,例如,在rocksdbvaluestate.java中结束,它使用以下代码写入rocksdb:稍后在快照过程中会发生什么,取决于您是使用增量检查点还是完全检查点,但是您可以在中找到rocksdb特定的代码https://github.com/kebab-mai-haddi/flink/tree/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot.
请注意,快照不存储在rocksdb中。增量快照是通过镜像sst文件来获取的,完整快照涉及到在状态后端中迭代所有状态并写出结果。
有关flink如何使用rocksdb的更多信息,请搜索stefan richter的博客文章和flink forward talks。