如何使用可查询状态客户端获取flink中多个keyby的状态?

zfycwa2u  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(538)

我使用的是flink1.4.2,我有一个场景需要使用两个键。例如。

KeyedStream<UsageStatistics, Tuple> keyedStream = stream.keyBy("clusterId", "ssid");
usageCounts = keyedStream.process(new CustomProcessFunction(windowSize,queryableStateName));

价值描述将

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, SsidTotalUsage.class);
        descriptor.setQueryable(queryableStateName);

有人能建议我在flink中使用可查询状态客户端获取多个密钥的状态吗?
下面的queryableclient对于单个键“clusterid”运行良好。

kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, clusterId, BasicTypeInfo.STRING_TYPE_INFO, descriptor);

多个键的类型信息应该是什么?任何与此相关的建议/例子或参考都会很有帮助吗?

gudnpqoy

gudnpqoy1#

我找到了解决办法。
我在valuestatescription中给出了typehint。
在flink工作中:

TypeInformation<SsidTotalUsage> typeInformation = TypeInformation.of(new TypeHint<SsidTotalUsage>() {});

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);

在客户端:

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);

我有两个键,所以我使用了tuple2类并像下面那样设置键的值。注意:如果有两个以上的键,那么必须根据键选择tuple3、tuple4类。

Tuple2<String, String> tuple = new Tuple2<>();
 tuple.f0 = clusterId;
 tuple.f1 = ssid;

然后我提供了提示。

TypeHint<Tuple2<String, String>> typeHint = new TypeHint<Tuple2<String, String>>() {};

CompletableFuture<ValueState<SsidTotalUsage>> kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, tuple, typeHint, descriptor);

在上面的代码中,getstate方法将返回immutablevaluestate,因此我需要像下面那样获取我的pojo。

ImmutableValueState<SsidTotalUsage> state = (ImmutableValueState<SsidTotalUsage>) kvState.get();

totalUsage = state.value();

相关问题