我在flink应用程序中使用rocksdb进行状态操作。
请看一下这个代码:
public class Process extends KeyedProcessFunction<Tuple, Record, Result>{
private transient ValueState<Integer> state;
@Override
public void open(Configuration parameters)
{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.cleanupInBackground()
.build();
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>(
"state",
TypeInformation.of(Integer.class));
stateDescriptor.enableTimeToLive(ttlConfig);
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(Record value, Context ctx, Collector<Result> out) throws Exception
{
updateStates(value);
Integer stateValue = state.value();
logger.info("processElement -- subTaskName: {}, stateValue: {} , ", getRuntimeContext().getTaskNameWithSubtasks() ,stateValue);
if (stateValue % 2 == 0)
{
out.collect(..)
}
}
private void updateStates(Record value) throws Exception
{
if (state.value() == null)
{
state.update(1);
}
Integer stateValue = state.value();
logger.info("updateStates -- subTaskName: {}, stateValue: {} ", getRuntimeContext().getTaskNameWithSubtasks() ,stateValue);
if (something in record true)
{
stateValue ++;
state.update(stateValue);
}
}
}
在这个过程函数中,我得到 NullPointerException
,因为 stateValue
进程中的函数为空。
里面还有一些木头 updateState()
:
2020-12-04 ... INFO ... - updateStates -- subTaskName: Process (9/40), stateValue: null
为什么我从状态获取空值,即使我以前更新了状态。
是rocksdb吗 update()
以及 value()
通过异步方式完成的操作?
根据@davidanderson请求,我正在添加使用keyedprocessfunction的代码:
public class Record{
private String customerName;
// getters and setter for customerName and other fields
}
private DataStream<Result> applyProcess(DataStream<Record> otherStream)
{
return otherStream
.filter(record -> filter if record.method() returns true,-there is no exception in here- )
.name("filtered")
.keyBy('customerName')
.process(new Process())
.name("ProcessFunction");
}
update:这看起来很傻,但是当我更新了如下代码时,应用程序就可以工作了:(我刚刚删除了updatestates方法并将其放入processelement函数)
@Override
public void processElement(Record value, Context ctx, Collector<Result> out) throws Exception
{
/*
This case is not working...
if (state.value() == null) {
stateValue = 1;
}
*/
Integer stateValue = state.value();
if (stateValue == null){
stateValue = 1;
}
if (something in record true)
{
stateValue ++;
state.update(stateValue);
}
if (stateValue % 2 == 0)
{
out.collect(..)
}
}
暂无答案!
目前还没有任何答案,快来回答吧!