如何在ApacheFlink中用大文件丰富事件流?

aiazj4mn  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(320)

我有一个flink应用程序用于点击流收集和处理。该应用程序由kafka作为事件源、map函数和sink组成,如下图所示:

我想根据从kafka摄取的原始事件中的userip字段,使用用户的ip位置来丰富传入的click流数据。
csv文件的简化切片,如下所示

start_ip,end_ip,country
   "1.1.1.1","100.100.100.100","United States of America"
   "100.100.100.101","200.200.200.200","China"

我做了一些研究,发现了一些可能的解决方案:
1.解决方案:广播丰富数据,并通过一些ip匹配逻辑与事件流连接。
1.结果:它适用于一对样本ip位置数据,但不适用于整个csv数据。jvm堆已达到3.5GB,由于广播状态,无法将广播状态放入磁盘(对于rocksdb)
2.解决方案:加载csv数据 open() 中的方法 RichFlatMapFunction 进入状态(valuestate),然后开始事件处理,并在中丰富事件数据 flatMap 方法。
2.结果:由于扩展数据太大,无法存储在jvm堆中,因此无法加载到valuestate。而且,通过valuestate进行反序列化对于键值性质的数据来说也是一种不好的做法。
3.解决方案:为了避免处理jvm堆约束,我尝试用mapstate将扩展数据作为state放入rocksdb(uses disk)。
3.结果:尝试将csv文件加载到中的mapstate open() 方法,给了我一个错误,告诉我不能在中放入mapstate open() 方法,因为我不在 open() 方法:flink键控流密钥为空
4.解决方案:由于mapstate需要keyed上下文(放入rocksdb),在将数据流放入keyedstream后,我尝试在process函数中将整个csv文件加载到本地rocksdb示例(磁盘)中:

class KeyedIpProcess extends KeyedProcessFunction[Long, Event, Event] {

  var ipMapState: MapState[String, String] = _
  var csvFinishedFlag: ValueState[Boolean] = _

  override def processElement(event: Event,
                              ctx: KeyedProcessFunction[Long, Event, Event]#Context,
                              out: Collector[Event]): Unit = {

    val ipDescriptor = new MapStateDescriptor[String, String]("ipMapState", classOf[String], classOf[String])
    val csvFinishedDescriptor = new ValueStateDescriptor[Boolean]("csvFinished", classOf[Boolean])

    ipMapState = getRuntimeContext.getMapState(ipDescriptor)
    csvFinishedFlag = getRuntimeContext.getState(csvFinishedDescriptor)

    if (!csvFinishedFlag.value()) {
      val csv = new CSVParser(defaultCSVFormat)

      val fileSource = Source.fromFile("/tmp/ip.csv", "UTF-8")
      for (row <- fileSource.getLines()) {
        val Some(List(start, end, country)) = csv.parseLine(row)
        ipMapState.put(start, country)
      }
      fileSource.close()
      csvFinishedFlag.update(true)
    }

    out.collect {
      if (ipMapState.contains(event.userIp)) {
        val details = ipMapState.get(event.userIp)
        event.copy(data =
          event.data.copy(
            ipLocation = Some(details.country)
          ))
      } else {
        event
      }
    }
  }
}

4.结果:由于阻止文件读取操作,它太黑并且阻止事件处理。
你能告诉我在这种情况下我能做些什么吗?
谢谢

a9wyjsp7

a9wyjsp71#

您可以实现一个自定义分区器,并将扩展数据的一部分加载到每个分区中。这里有一个这样的例子;我将摘录一些关键部分:
工作组织如下:

DataStream<SensorMeasurement> measurements = env.addSource(new SensorMeasurementSource(100_000));

DataStream<EnrichedMeasurements> enrichedMeasurements = measurements
  .partitionCustom(new SensorIdPartitioner(), measurement -> measurement.getSensorId())
  .flatMap(new EnrichmentFunctionWithPartitionedPreloading());

自定义分区器需要知道有多少个分区,并确定地将每个事件分配给特定的分区:

private static class SensorIdPartitioner implements Partitioner<Long> {
    @Override
    public int partition(final Long sensorMeasurement, final int numPartitions) {
        return Math.toIntExact(sensorMeasurement % numPartitions);
    }
}

然后,扩展函数利用知道如何进行分区的优势,只将相关切片加载到每个示例中:

public class EnrichmentFunctionWithPartitionedPreloading extends RichFlatMapFunction<SensorMeasurement, EnrichedMeasurements> {

    private Map<Long, SensorReferenceData> referenceData;

    @Override
    public void open(final Configuration parameters) throws Exception {
        super.open(parameters);
        referenceData = loadReferenceData(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
    }

    @Override
    public void flatMap(
            final SensorMeasurement sensorMeasurement,
            final Collector<EnrichedMeasurements> collector) throws Exception {
        SensorReferenceData sensorReferenceData = referenceData.get(sensorMeasurement.getSensorId());
        collector.collect(new EnrichedMeasurements(sensorMeasurement, sensorReferenceData));
    }

    private Map<Long, SensorReferenceData> loadReferenceData(
            final int partition,
            final int numPartitions) {
        SensorReferenceDataClient client = new SensorReferenceDataClient();
        return client.getSensorReferenceDataForPartition(partition, numPartitions);
    }

}

请注意,扩展没有在键控流上进行,因此不能在扩展函数中使用键控状态或计时器。

相关问题