我使用flink(1.9.2)和mongodb。我想定制一个接收器,将一些消息输出到mongodb中进行测试。但在我完成并运行它之后,我的工作不能占用savepoint吗?
我的Flume:
public class MongoDBSink extends RichSinkFunction<GeneralizedMessage> implements CheckpointedFunction, CheckpointListener {
private static final Logger logger = LoggerFactory.getLogger(MongoDBSink.class);
private MongoCollection sinkCollection;
private Random random = new Random();
// init mongo connection.
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MongoConfigurationCenter mongoCenter = MongoConfigurationCenter.getInstance();
MongoClient mongoClient = mongoCenter.getMongoClient();
MongoDatabase database = mongoClient.getDatabase("xxxx");
sinkCollection = database.getCollection(sinkCollectionName);
}
/**Writes the given value to the sink. This function is called for every record.*/
@Override
public void invoke(GeneralizedMessage value, Context context) throws Exception {
// some logic
sinkCollection.insertOne(doc);
// wait for some times.
Thread.sleep(200 + random.nextInt(1000));
}
/**checkpoint methods */
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
logger.info("notifyCheckpointComplete");
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
logger.info("snapshotState");
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
logger.info("initializeState");
}
}
单击savepoint时:无法确认sink操作符。
日志我在jobmanager和taskmanager中找不到错误日志,看起来很好,但保存点失败。
暂无答案!
目前还没有任何答案,快来回答吧!