Flink数据流富集:以不同吞吐量和多级键控连接两个数据流

xytpbqjk  于 2023-08-01  发布在  Apache
关注(0)|答案(2)|浏览(115)

我对Flink并不陌生,我一直想加入两个虚构数据源的两个数据流,以展示有状态数据流。
这些数据流以以下格式(JSON)提供数据:

1.披萨订单

{
  "id": 123,
  "shop": "Mario's kitchen",
  "pizzas": [
    {
      "name": "Diavolo"
    },
    {
      "name": "Hawaii"
    }
  ],
  "timestamp": 12345678
}

字符串

2.披萨价格

{
  "name": "Diavolo",
  "shop": "Mario's kitchen",
  "price": 14.2,
  "timestamp": 12345678
}


第二数据流的更新频率低于第一数据流。
这个想法是用第二个数据流的价格标签来丰富第一个“比萨饼订单”数据流中的比萨饼。结果如下所示:

3.强化披萨订单

{
  "id": 123,
  "shop": "Mario's kitchen",
  "pizzas": [
    {
      "name": "Diavolo",
      "price": 14.2
    },
    {
      "name": "Hawaii",
      "price": 12.5
    }
  ],
  "timestamp": 12345678
}


丰富的比萨饼订单数据流应当仅包含直到相关联的比萨饼订单的时间戳为止的最新比萨饼价格。
我在许多教程中看到,这可以通过使用“keyBy”操作符来实现,使用适当的密钥来连接流以进行匹配。
这就是我的问题所在,因为在两个输入流上都没有合适的顶级键。您将如何尝试实现这一目标?
我最近尝试解决这个问题的方式如下(Java):

{
        [...]
        DataStream<PizzaOrder> pizzaOrderStream = env.fromSource(
                this.pizzaOrderSource,
                WatermarkStrategy.noWatermarks(),
                "Kafka Pizza Order Topic"
        );

        DataStream<PizzaPrice> pizzaPriceStream = env.fromSource(
                this.pizzaPriceSource,
                WatermarkStrategy.noWatermarks(),
                "Kafka Pizza Price Topic"
        );

        DataStream<EnrichedPizzaOrder> enrichedPizzaOrderDataStream =
                pizzaOrderStream.keyBy(PizzaOrder::getShop)
                .connect(pizzaPriceStream.keyBy(PizzaPrice::getShop))
                .process(new ProcessingTimeJoin());

        enrichedPizzaOrderDataStream.sinkTo(sink);

        return env.execute("Pizza Order Enriching Example");
}

public static class ProcessingTimeJoin extends CoProcessFunction<PizzaOrder, 
PizzaPrice, EnrichedPizzaOrder> {

        // pizza name and price (implicitly keyed by store)
        private ValueState<HashMap<String, Double>> pizzaPriceState;

        @Override
        public void  open(Configuration parameters) throws Exception {
            ValueStateDescriptor<HashMap<String, Double>> vDescriptor = new ValueStateDescriptor<HashMap<String, Double>>(
                    "pizzaPriceState",
                    TypeInformation.of(new TypeHint<HashMap<String, Double>>() {})
            );
            pizzaPriceState = getRuntimeContext().getState(vDescriptor);
        }

        @Override
        public void processElement1(PizzaOrder order,
                                    CoProcessFunction<PizzaOrder, PizzaPrice, EnrichedPizzaOrder>.Context ctx,
                                    Collector<EnrichedPizzaOrder> out) throws Exception {
            HashMap<String, Double> state = pizzaPriceState.value();
            if (state == null) {
                state = new HashMap<>();
            }
            List<EnrichedPizza> enrichedPizzas = new ArrayList<>();
            for (Pizza pizza : order.getPizzas()) {
                double price = state.getOrDefault(pizza.getPizzaName(), -1.0);

                EnrichedPizza newPizza = new EnrichedPizza(pizza, price);
                enrichedPizzas.add(newPizza);
            }
            EnrichedPizzaOrder enrichedPizzaOrder = new EnrichedPizzaOrder(order, enrichedPizzas);
            out.collect(enrichedPizzaOrder);
        }

        @Override
        public void processElement2(PizzaPrice price,
                                    CoProcessFunction<PizzaOrder, PizzaPrice, EnrichedPizzaOrder>.Context ctx,
                                    Collector<EnrichedPizzaOrder> out) throws Exception {
            HashMap<String, Double> state = pizzaPriceState.value();
            if (state == null) {
                state = new HashMap<>();
            }
            state.put(price.getName(), price.getPrice());
            pizzaPriceState.update(state);
        }
    }

6yjfywim

6yjfywim1#

“Pizza Price”流是典型的浓缩数据,因此可以是广播流,您将其连接到“Pizza Order”流并按照The Broadcast State Pattern使用。
或者,您可以展平Pizza Order记录,这样一个记录变成N个,每个记录都有一个披萨,然后按商店和披萨键。然后,您可以通过相同的两个字段为Pizza Price流设置关键字,并连接流。丰富之后,您必须按订单ID键,然后重新创建未展平的记录。
在任何一种情况下,你都必须处理基于时间的加入,你的Pizza Price流存储在(比如)MapState<Long, Float>中,你可以在条目上迭代找到最好的一个(time <=订购时间)。
或者使用Table API来帮助您完成上述操作。请参见临时联接。

mkshixfv

mkshixfv2#

正如kkrugler所推荐的,对我来说立即奏效的解决方案是应用广播模式。解决方案如下:

解决方案

主管道

// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

//DataStreamSource
// without watermarks because state is irrelevant
DataStream<PizzaOrder> pizzaOrderStream = env.fromSource(
                this.pizzaOrderSource,
                WatermarkStrategy.noWatermarks(),
                "Kafka Pizza Order Topic"
);

//DataStreamSource
// without watermarks because state is irrelevant
DataStream<PizzaPrice> pizzaPriceStream = env.fromSource(
                this.pizzaPriceSource,
                WatermarkStrategy.noWatermarks(),
                "Kafka Pizza Price Topic"
);

// broadcast the pizza prices and create the broadcast state
BroadcastStream<PizzaPrice> pizzaPriceBroadcastStream = pizzaPriceStream
                .broadcast(pizzaPriceStateDescriptor);

// connect stream (key or non-keyed possible) - call connect on the non-broadcast stream
DataStream<EnrichedPizzaOrder> enrichedPizzaOrderStream = pizzaOrderStream
                .connect(pizzaPriceBroadcastStream)
                .process(new BroadcastPizzaOrderEnrichmentFunction());

enrichedPizzaOrderStream.sinkTo(sink);

// execute the transformation pipeline
return env.execute("Pizza Order Enriching Example");

字符串

充实函数

public static class BroadcastPizzaOrderEnrichmentFunction extends BroadcastProcessFunction<PizzaOrder, PizzaPrice,
            EnrichedPizzaOrder> {
        @Override
        // context -> read only!
        public void processElement(PizzaOrder pizzaOrder, ReadOnlyContext ctx, Collector<EnrichedPizzaOrder> out) throws Exception {
            ReadOnlyBroadcastState<Tuple2<String, String>, PizzaPrice> state = ctx.getBroadcastState(pizzaPriceStateDescriptor);
            List<EnrichedPizza> enrichedPizzas = new ArrayList<>();
            for (Pizza pizza : pizzaOrder.getPizzas()) {
                Tuple2<String, String> stateKey = new Tuple2<>(pizzaOrder.getShop(), pizza.getPizzaName());
                double pizzaPrice = -1; // filler
                if (state.contains(stateKey)) {
                    pizzaPrice = state.get(stateKey).getPrice();
                }
                EnrichedPizza newPizza = new EnrichedPizza(pizza, pizzaPrice);
                enrichedPizzas.add(newPizza);
            }
            EnrichedPizzaOrder enrichedPizzaOrder = new EnrichedPizzaOrder(pizzaOrder, enrichedPizzas);
            out.collect(enrichedPizzaOrder);
        }

        @Override
        // context -> read and write! - must have some deterministic behaviour across all parallel instances
        public void processBroadcastElement(PizzaPrice pizzaPrice, Context ctx, Collector<EnrichedPizzaOrder> out) throws Exception {
            BroadcastState<Tuple2<String, String>, PizzaPrice> state = ctx.getBroadcastState(pizzaPriceStateDescriptor);
            state.put(
                    new Tuple2<>(pizzaPrice.getShop(), pizzaPrice.getName()),
                    pizzaPrice
            );
        }
    }

Map描述符

// a map descriptor to store the joined key <shop, pizza> and the associated pizza price object
public static MapStateDescriptor<Tuple2<String, String>, PizzaPrice> pizzaPriceStateDescriptor =
            new MapStateDescriptor<Tuple2<String, String>, PizzaPrice>(
                    "PizzaPriceBroadcastState",
                    TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}),
                    TypeInformation.of(new TypeHint<PizzaPrice>() {}));


希望这对你有帮助!

相关问题