flink—rocksdb返回null,甚至是以前更新过的

7fyelxc5  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(236)

我在flink应用程序中使用rocksdb进行状态操作。
请看一下这个代码:

public class Process extends KeyedProcessFunction<Tuple, Record, Result>{
    private transient ValueState<Integer> state;

    @Override
    public void open(Configuration parameters)
    {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.seconds(30))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                .cleanupInBackground()
                .build();

        ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>(
                "state",
                TypeInformation.of(Integer.class));

        stateDescriptor.enableTimeToLive(ttlConfig);
        state = getRuntimeContext().getState(stateDescriptor);
    }

    @Override
    public void processElement(Record value, Context ctx, Collector<Result> out) throws Exception
    {
        updateStates(value);

        Integer stateValue = state.value();

        logger.info("processElement -- subTaskName: {}, stateValue: {} , ", getRuntimeContext().getTaskNameWithSubtasks() ,stateValue);

        if (stateValue % 2 == 0)
        {
            out.collect(..)
        }
    }

    private void updateStates(Record value) throws Exception
    {

        if (state.value() == null)
        {
            state.update(1);
        }

        Integer stateValue = state.value();

        logger.info("updateStates -- subTaskName: {}, stateValue: {} ", getRuntimeContext().getTaskNameWithSubtasks() ,stateValue);

        if (something in record true)
        {
            stateValue ++;
            state.update(stateValue);
        }
    }
}

在这个过程函数中,我得到 NullPointerException ,因为 stateValue 进程中的函数为空。
里面还有一些木头 updateState() :

2020-12-04 ... INFO  ...  - updateStates -- subTaskName: Process (9/40), stateValue: null

为什么我从状态获取空值,即使我以前更新了状态。
是rocksdb吗 update() 以及 value() 通过异步方式完成的操作?
根据@davidanderson请求,我正在添加使用keyedprocessfunction的代码:

public class Record{
    private String customerName; 

    // getters and setter for customerName and other fields
}

private DataStream<Result> applyProcess(DataStream<Record> otherStream)
    {
        return otherStream
                .filter(record -> filter if record.method() returns true,-there is no exception in here- )
                .name("filtered")
                .keyBy('customerName')
                .process(new Process())
                .name("ProcessFunction");
 }

update:这看起来很傻,但是当我更新了如下代码时,应用程序就可以工作了:(我刚刚删除了updatestates方法并将其放入processelement函数)

@Override
    public void processElement(Record value, Context ctx, Collector<Result> out) throws Exception
    {
       /* 
        This case is not working...
        if (state.value() == null) {
            stateValue = 1;
        }
        */

         Integer stateValue = state.value();

        if (stateValue == null){
            stateValue = 1;
        }

        if (something in record true)
        {
            stateValue ++;
            state.update(stateValue);
        }

        if (stateValue % 2 == 0)
        {
            out.collect(..)
        }
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题