我将apachebeam sdk与flinksdk进行流处理比较,以确定使用beam作为附加框架的成本/优势。
我有一个非常简单的设置,其中从kafka源读取数据流,并由运行flink的节点集群并行处理。
根据我对这些SDK工作原理的理解,逐窗口处理数据流的最简单方法是:
使用apache beam(在flink上运行):
1.1. 创建管道对象。
1.2. 创建Kafka记录的pcollection。
1.3. 应用窗口功能。
1.4. 通过窗口将管道转换为关键帧。
1.5. 按键分组记录(窗口)。
1.6. 对窗口记录应用所需的任何函数。
使用flink sdk
2.1. 从Kafka源创建数据流。
2.2. 通过提供键函数将其转换为键控流。
2.3. 应用窗口功能。
2.4. 对窗口记录应用所需的任何函数。
虽然flink解决方案在编程上更简洁,但根据我的经验,它在处理大量数据时效率较低。我只能想象开销是由密钥提取函数引入的,因为beam不需要这个步骤。
我的问题是:我是不是在比较同类?这些过程不是等价的吗?有什么可以解释光束通道更有效,因为它使用Flink作为一个跑步者(所有其他条件都是一样的)?
这是使用beam sdk的代码
PipelineOptions options = PipelineOptionsFactory.create();
//Run with Flink
FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
flinkPipelineOptions.setRunner(FlinkRunner.class);
flinkPipelineOptions.setStreaming(true);
flinkPipelineOptions.setParallelism(-1); //Pick this up from the user interface at runtime
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(flinkPipelineOptions);
// Create a PCollection of Kafka records
PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = p.apply(KafkaIO.<Long, String>readBytes()
.withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
.withTopics(ImmutableList.of(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC))
.updateConsumerProperties(ImmutableMap.of("group.id", CONSUMER_GROUP)));
//Apply Windowing Function
PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));
//Transform the pipeline to key by window
PCollection<KV<IntervalWindow, KafkaRecord<byte[], byte[]>>> keyedByWindow =
windowedKafkaCollection.apply(
ParDo.of(
new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
@ProcessElement
public void processElement(ProcessContext context, IntervalWindow window) {
context.output(KV.of(window, context.element()));
}
}));
//Group records by key (window)
PCollection<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>> groupedByWindow = keyedByWindow
.apply(GroupByKey.<IntervalWindow, KafkaRecord<byte[], byte[]>>create());
//Process windowed data
PCollection<KV<IIntervalWindowResult, IPueResult>> processed = groupedByWindow
.apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));
// Run the pipeline.
p.run().waitUntilFinish();
这是使用flink sdk的代码
// Create a Streaming Execution Environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(6);
//Connect to Kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", KAFKA_IP + ":" + KAFKA_PORT);
properties.setProperty("group.id", CONSUMER_GROUP);
DataStream<ObjectNode> stream = env
.addSource(new FlinkKafkaConsumer010<>(Arrays.asList(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC), new JSONDeserializationSchema(), properties));
//Key by id
stream.keyBy((KeySelector<ObjectNode, Integer>) jsonNode -> jsonNode.get("id").asInt())
//Set the windowing function.
.timeWindow(Time.seconds(5L), Time.seconds(1L))
//Process Windowed Data
.process(new PueCalculatorFn(), TypeInformation.of(ImmutablePair.class));
// execute program
env.execute("Using Flink SDK");
非常感谢您的宝贵意见。
编辑
我想我应该添加一些可能相关的指标。
网络接收字节
flink sdk
任务经理.2
2,644,786,446
任务经理.3
2,645,765,232
任务经理.1
2,827,676,598
任务经理.6
2,422,309,148
任务经理.4
2,428,570,491
任务经理.5
2,431,368,644
光束
任务经理.2
4,092,154,160
任务经理.3
4,435,132,862
任务经理.1
4,766,399,314
任务经理.6
4,425,190,393
任务经理.4
4,096,576,110
任务经理.5
4,092,849,114
cpu利用率(最大)
flink sdk
任务经理.2
93.00%
任务经理.3
92.00%
任务经理.1
91.00%
任务经理.6
90.00%
任务经理.4
90.00%
任务经理.5
92.00%
光束
任务经理.2
52.0%
任务经理.3
71.0%
任务经理.1
72.0%
任务经理.6
40.0%
任务经理.4
56.0%
任务经理.5
26.0%
beam似乎使用了更多的网络,而flink使用了更多的cpu。这是否意味着beam正在以一种更有效的方式并行处理呢?
编辑no2
我很确定puecalculatorfn类是等价的,但是我将在这里分享代码,看看这两个进程之间是否有明显的差异。
光束
public class PueCalculatorFn extends DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>> implements Serializable {
private transient List<IKafkaConsumption> realEnergyRecords;
private transient List<IKafkaConsumption> itEnergyRecords;
@ProcessElement
public void procesElement(DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>>.ProcessContext c, BoundedWindow w) {
KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>> element = c.element();
Instant windowStart = Instant.ofEpochMilli(element.getKey().start().getMillis());
Instant windowEnd = Instant.ofEpochMilli(element.getKey().end().getMillis());
Iterable<KafkaRecord<byte[], byte[]>> records = element.getValue();
//Calculate Pue
IPueResult result = calculatePue(element.getKey(), records);
//Create IntervalWindowResult object to return
DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
formatter.format(windowEnd), realEnergyRecords, itEnergyRecords);
//Return Pue keyed by Window
c.output(KV.of(intervalWindowResult, result));
}
private PueResult calculatePue(IntervalWindow window, Iterable<KafkaRecord<byte[], byte[]>> records) {
//Define accumulators to gather readings
final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
//Declare variable to store the result
BigDecimal pue = BigDecimal.ZERO;
//Initialise transient lists
realEnergyRecords = new ArrayList<>();
itEnergyRecords = new ArrayList<>();
//Transform the results into a stream
Stream<KafkaRecord<byte[], byte[]>> streamOfRecords = StreamSupport.stream(records.spliterator(), false);
//Iterate through each reading and add to the increment count
streamOfRecords
.map(record -> {
byte[] valueBytes = record.getKV().getValue();
assert valueBytes != null;
String valueString = new String(valueBytes);
assert !valueString.isEmpty();
return KV.of(record, valueString);
}).map(kv -> {
Gson gson = new GsonBuilder().registerTypeAdapter(KafkaConsumption.class, new KafkaConsumptionDeserialiser()).create();
KafkaConsumption consumption = gson.fromJson(kv.getValue(), KafkaConsumption.class);
return KV.of(kv.getKey(), consumption);
}).forEach(consumptionRecord -> {
switch (consumptionRecord.getKey().getTopic()) {
case REAL_ENERGY_TOPIC:
totalRealIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
realEnergyRecords.add(consumptionRecord.getValue());
break;
case IT_ENERGY_TOPIC:
totalItIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
itEnergyRecords.add(consumptionRecord.getValue());
break;
}
}
);
assert totalRealIncrement.doubleValue() > 0.0;
assert totalItIncrement.doubleValue() > 0.0;
//Beware of division by zero
if (totalItIncrement.doubleValue() != 0.0) {
//Calculate PUE
pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
}
//Create a PueResult object to return
IWindow intervalWindow = new Window(window.start().getMillis(), window.end().getMillis());
return new PueResult(intervalWindow, pue.stripTrailingZeros());
}
@Override
protected void finalize() throws Throwable {
super.finalize();
RecordSenderFactory.closeSender();
WindowSenderFactory.closeSender();
}
}
Flink
public class PueCalculatorFn extends ProcessWindowFunction<ObjectNode, ImmutablePair, Integer, TimeWindow> {
private transient List<KafkaConsumption> realEnergyRecords;
private transient List<KafkaConsumption> itEnergyRecords;
@Override
public void process(Integer integer, Context context, Iterable<ObjectNode> iterable, Collector<ImmutablePair> collector) throws Exception {
Instant windowStart = Instant.ofEpochMilli(context.window().getStart());
Instant windowEnd = Instant.ofEpochMilli(context.window().getEnd());
BigDecimal pue = calculatePue(iterable);
//Create IntervalWindowResult object to return
DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
formatter.format(windowEnd), realEnergyRecords
.stream()
.map(e -> (IKafkaConsumption) e)
.collect(Collectors.toList()), itEnergyRecords
.stream()
.map(e -> (IKafkaConsumption) e)
.collect(Collectors.toList()));
//Create PueResult object to return
IPueResult pueResult = new PueResult(new Window(windowStart.toEpochMilli(), windowEnd.toEpochMilli()), pue.stripTrailingZeros());
//Collect result
collector.collect(new ImmutablePair<>(intervalWindowResult, pueResult));
}
protected BigDecimal calculatePue(Iterable<ObjectNode> iterable) {
//Define accumulators to gather readings
final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
//Declare variable to store the result
BigDecimal pue = BigDecimal.ZERO;
//Initialise transient lists
realEnergyRecords = new ArrayList<>();
itEnergyRecords = new ArrayList<>();
//Iterate through each reading and add to the increment count
StreamSupport.stream(iterable.spliterator(), false)
.forEach(object -> {
switch (object.get("topic").textValue()) {
case REAL_ENERGY_TOPIC:
totalRealIncrement.accumulate(object.get("energyConsumed").asDouble());
realEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
break;
case IT_ENERGY_TOPIC:
totalItIncrement.accumulate(object.get("energyConsumed").asDouble());
itEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
break;
}
});
assert totalRealIncrement.doubleValue() > 0.0;
assert totalItIncrement.doubleValue() > 0.0;
//Beware of division by zero
if (totalItIncrement.doubleValue() != 0.0) {
//Calculate PUE
pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
}
return pue;
}
}
下面是我在beam示例中使用的自定义反序列化程序。
Kafka消费序列器
public class KafkaConsumptionDeserialiser implements JsonDeserializer<KafkaConsumption> {
public KafkaConsumption deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
if(jsonElement == null) {
return null;
} else {
JsonObject jsonObject = jsonElement.getAsJsonObject();
JsonElement id = jsonObject.get("id");
JsonElement energyConsumed = jsonObject.get("energyConsumed");
Gson gson = (new GsonBuilder()).registerTypeAdapter(Duration.class, new DurationDeserialiser()).registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserialiser()).create();
Duration duration = (Duration)gson.fromJson(jsonObject.get("duration"), Duration.class);
JsonElement topic = jsonObject.get("topic");
Instant eventTime = (Instant)gson.fromJson(jsonObject.get("eventTime"), Instant.class);
return new KafkaConsumption(Integer.valueOf(id != null?id.getAsInt():0), Double.valueOf(energyConsumed != null?energyConsumed.getAsDouble():0.0D), duration, topic != null?topic.getAsString():"", eventTime);
}
}
}
2条答案
按热度按时间mznpcxlj1#
不知道为什么您编写的beam管道更快,但从语义上讲,它与flink作业不同。与flink中窗口的工作方式类似,一旦在beam中指定了窗口,以下所有操作都会自动将窗口考虑在内。你不需要按窗口分组。
可以将梁管道定义简化如下:
至于性能,这取决于许多因素,但请记住,beam是flink之上的抽象层。总的来说,如果你看到beam on flink提高了性能,我会很惊讶。
编辑:为了进一步澄清,您没有在beam管道中的json“id”字段上分组,这是在flink代码段中进行的。
t3psigkw2#
值得一提的是,如果窗口处理可以通过reduce()或aggregate()进行预聚合,那么本机flink作业的性能应该比当前更好。
许多细节,如状态后端的选择、序列化、检查点等,也会对性能产生很大的影响。
在这两种情况下是否使用了相同的flink——即相同的版本、相同的配置?