flink,如何创建支持接收器的保存点?

fslejnso  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(291)

我使用flink(1.9.2)和mongodb。我想定制一个接收器,将一些消息输出到mongodb中进行测试。但在我完成并运行它之后,我的工作不能占用savepoint吗?
我的Flume:

public class MongoDBSink extends RichSinkFunction<GeneralizedMessage> implements CheckpointedFunction, CheckpointListener {
    private static final Logger logger = LoggerFactory.getLogger(MongoDBSink.class);

    private MongoCollection sinkCollection;
    private Random random = new Random();

    // init mongo connection.
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        MongoConfigurationCenter mongoCenter = MongoConfigurationCenter.getInstance();
        MongoClient mongoClient = mongoCenter.getMongoClient();
        MongoDatabase database = mongoClient.getDatabase("xxxx");
        sinkCollection = database.getCollection(sinkCollectionName);
    }

    /**Writes the given value to the sink. This function is called for every record.*/
    @Override
    public void invoke(GeneralizedMessage value, Context context) throws Exception {
        // some logic
        sinkCollection.insertOne(doc);
        // wait for some times.
        Thread.sleep(200 + random.nextInt(1000));
    }

    /**checkpoint methods */
    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        logger.info("notifyCheckpointComplete");
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        logger.info("snapshotState");
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        logger.info("initializeState");
    }
}

单击savepoint时:无法确认sink操作符。

日志我在jobmanager和taskmanager中找不到错误日志,看起来很好,但保存点失败。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题