如何在flink map()中使用绝地

brc7rcf0  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(410)

我的代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.enableCheckpointing(500);

  DataStream<String> stream = env.addSource(getConsumer(TOPIC_1));

  Jedis jedis = new Jedis("master1");
  stream.map(new RichMapFunction<String, String>() {
      @Override
      public String map(String value) throws Exception {
          String result = jedis.hget("rtc", value);
          return result;
      }
  });

我想从redis那里得到一些数据 map() ,但它无法运行,因为jedis.class不可序列化。
如何在中使用不可序列化类 map() ,比如说,绝地武士?

rjee0c15

rjee0c151#

所有丰富的功能,如 RichMapFunction 有一个 open(Configuration) 以及 close 可以覆盖的呼叫。一旦函数部署到taskmanager并在其中执行,就会调用这些生命周期方法。

class MyMapFunction extends RichMapFunction<String, String> {

    private transient Jedis jedis;

    @Override
    public void open(Configuration parameters) {
        // open connection to Redis, for example
        jedis = new Jedis("master1");
    }

    @Override
    public void close() {
        // close connection to Redis
        jedis.close();
    }
}

相关问题