我的代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
DataStream<String> stream = env.addSource(getConsumer(TOPIC_1));
Jedis jedis = new Jedis("master1");
stream.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String result = jedis.hget("rtc", value);
return result;
}
});
我想从redis那里得到一些数据 map()
,但它无法运行,因为jedis.class不可序列化。
如何在中使用不可序列化类 map()
,比如说,绝地武士?
1条答案
按热度按时间rjee0c151#
所有丰富的功能,如
RichMapFunction
有一个open(Configuration)
以及close
可以覆盖的呼叫。一旦函数部署到taskmanager并在其中执行,就会调用这些生命周期方法。