键控流中的reduce函数行为

anhgbhbe  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(374)

对于我们的一个用例,我们需要根据文件中的更改重新进行一些计算,然后广播这个文件的结果,以便我们可以在另一个流中使用它。
程序的生命周期大致如下:
数据流1:监视文件->检测某些更改->重新处理文件中的所有元素->计算一个结果->广播
datastream2:一些转换->对ds2中的每个元素做些什么,同时使用所有当前的广播元素(广播元素中的一些数据丢失可以容忍一段时间)
我将给出一些代码示例来更好地解释问题所在:
所以这就是ds1:Map每个元素,把它们发送到一个reducer,然后计算总数

env.readFile(format, clientPath, FileProcessingMode.PROCESS_CONTINUOUSLY, interval)
    .map(new Adder())
    .keyBy(Map::size)
    .reduce(new Reducer());

这是Map阶段,它只是从一行创建一个hashmap

public static class Adder extends RichMapFunction<String, Map<String, String>> {
  private static final long serialVersionUID = 1L;

  @Override
  public Map<String, String> map(String string) throws Exception {
    String[] strings = string.split("=");
    HashMap<String, String> hashMap = new HashMap<>();
    hashMap.put(strings[0], strings[1]);
    return hashMap;
  }
}

这是最后一步,减速机。获取来自Map器的所有缩减元素,然后返回总数,即单个hashmap

public static class Reducer extends RichReduceFunction<Map<String, String>> {
  private static final long serialVersionUID = 1L;

  @Override
  public Map<String, String> reduce(Map<String, String> stringStringMap, Map<String, String> t1) throws Exception {
    stringStringMap.putAll(t1);
    return stringStringMap;
  }
}

然后像下面的代码段一样广播ds1。

MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>("Brodcasted map state", Types.STRING, Types.STRING);
BroadcastStream<Map<String, String>> broadcastedProperties =  clientProperties.broadcast(descriptor); 
ds2.connect(broadcastedProperties).process(new EventListener(properties));

在给定时间内使用以下元素

Time    Document
T1      K1=V1, K2=V2
T2      K2=V2
T3      K3=V3, K1=V4

当我运行我们的程序时,我期望的是:

Time    Broadcasted Elements
T1      K1=V1, K2=V2
T2      K2=V2
T3      K3=V3, K1=V4

我看到的是:

Time    Broadcasted Elements
T1      K1=V1, K2=V2
T2      K1=V1, K2=V2
T3      K1=V4, K2=V2, K3=V3

为了克服这个问题,我只需要在数据流上设置一个窗口,并使用一个带有累加器的聚合函数,而不是一个缩减器,但我更喜欢使用非窗口方法。
我做了一些调试,我意识到,即使在Map阶段,它只Map那个时间段内可用的元素,但在reduce阶段,它基于之前的状态(我的意思是时间的结果-1)+那个点上的所有元素进行了缩减。我发现在reduce阶段有一个不可见的状态是很奇怪的。从我的观点来看,它应该只基于直接来自Map器的元素。也许我对《Flink》中reduce的理解是错误的,但我很想得到一些澄清。

7lrncoxx

7lrncoxx1#

是的,当flink的任何一个内置聚合器(例如sum、max、reduce等)应用于一个流时,它会以增量、有状态的方式聚合整个流。或者更准确地说,这是在keyedstreams上完成的,聚合是在一个键一个键的基础上完成的,但是是以一种持续的、无限制的方式。例如,如果对整数流1、2、3、4、5使用sum()。。。然后sum()将产生流1,3,6,10,15。在您的示例中,reduce()将生成一个不断更新的流,其中包含越来越多的键/值对。
如果要按时间为流设置关键帧,那么应该会得到所需的结果,但关键帧状态仍将永远保持不变,这可能是有问题的。我建议您要么使用windowapi,要么使用richflatmap或processfunction之类的东西,在这里您可以直接管理状态。

ca1c2owp

ca1c2owp2#

没有窗口的减少功能将是滚动减少。如果您想在滚动减少之间保持一致的状态,请使用state对象保存状态,稍后检索并更新它。我想这就是@davidanderson对richreducefunction的建议。

public static class Reducer extends RichReduceFunction<Map<String, String>> {     
private static final long serialVersionUID = 1L;
private final MapStateDescriptor<String, String> mapStateDesc = new MapStateDescriptor<>("myMapState", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); 

@Override
public void open(Configuration parameters) {
        getRuntimeContext().getMapState(this.mapStateDesc);
}

@Override     
public Map<String, String> reduce(Map<String, String> stringStringMap, Map<String, String> t1) throws Exception {       
     MapState<String, String> myMapState = getRuntimeContext().getMapState(this.mapStateDesc);
     HashMap<String, String> newMap = new HashMap<>();
     //updating your map from previous state 
     for(Map.Entry<String,String> entry : myMapState.entries()) {
        newMap.put(entry.getKey(),entry.getValue());
     }       
     newMap.putAll(stringStringMap);       
     newMap.putAll(t1);
     //update the state with latest data set
     myMapState.putAll(newMap);
     return newMap;     
}

相关问题