我对Flink并不陌生,我一直想加入两个虚构数据源的两个数据流,以展示有状态数据流。
这些数据流以以下格式(JSON)提供数据:
1.披萨订单
{
"id": 123,
"shop": "Mario's kitchen",
"pizzas": [
{
"name": "Diavolo"
},
{
"name": "Hawaii"
}
],
"timestamp": 12345678
}
字符串
2.披萨价格
{
"name": "Diavolo",
"shop": "Mario's kitchen",
"price": 14.2,
"timestamp": 12345678
}
型
第二数据流的更新频率低于第一数据流。
这个想法是用第二个数据流的价格标签来丰富第一个“比萨饼订单”数据流中的比萨饼。结果如下所示:
3.强化披萨订单
{
"id": 123,
"shop": "Mario's kitchen",
"pizzas": [
{
"name": "Diavolo",
"price": 14.2
},
{
"name": "Hawaii",
"price": 12.5
}
],
"timestamp": 12345678
}
型
丰富的比萨饼订单数据流应当仅包含直到相关联的比萨饼订单的时间戳为止的最新比萨饼价格。
我在许多教程中看到,这可以通过使用“keyBy”操作符来实现,使用适当的密钥来连接流以进行匹配。
这就是我的问题所在,因为在两个输入流上都没有合适的顶级键。您将如何尝试实现这一目标?
我最近尝试解决这个问题的方式如下(Java):
{
[...]
DataStream<PizzaOrder> pizzaOrderStream = env.fromSource(
this.pizzaOrderSource,
WatermarkStrategy.noWatermarks(),
"Kafka Pizza Order Topic"
);
DataStream<PizzaPrice> pizzaPriceStream = env.fromSource(
this.pizzaPriceSource,
WatermarkStrategy.noWatermarks(),
"Kafka Pizza Price Topic"
);
DataStream<EnrichedPizzaOrder> enrichedPizzaOrderDataStream =
pizzaOrderStream.keyBy(PizzaOrder::getShop)
.connect(pizzaPriceStream.keyBy(PizzaPrice::getShop))
.process(new ProcessingTimeJoin());
enrichedPizzaOrderDataStream.sinkTo(sink);
return env.execute("Pizza Order Enriching Example");
}
public static class ProcessingTimeJoin extends CoProcessFunction<PizzaOrder,
PizzaPrice, EnrichedPizzaOrder> {
// pizza name and price (implicitly keyed by store)
private ValueState<HashMap<String, Double>> pizzaPriceState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<HashMap<String, Double>> vDescriptor = new ValueStateDescriptor<HashMap<String, Double>>(
"pizzaPriceState",
TypeInformation.of(new TypeHint<HashMap<String, Double>>() {})
);
pizzaPriceState = getRuntimeContext().getState(vDescriptor);
}
@Override
public void processElement1(PizzaOrder order,
CoProcessFunction<PizzaOrder, PizzaPrice, EnrichedPizzaOrder>.Context ctx,
Collector<EnrichedPizzaOrder> out) throws Exception {
HashMap<String, Double> state = pizzaPriceState.value();
if (state == null) {
state = new HashMap<>();
}
List<EnrichedPizza> enrichedPizzas = new ArrayList<>();
for (Pizza pizza : order.getPizzas()) {
double price = state.getOrDefault(pizza.getPizzaName(), -1.0);
EnrichedPizza newPizza = new EnrichedPizza(pizza, price);
enrichedPizzas.add(newPizza);
}
EnrichedPizzaOrder enrichedPizzaOrder = new EnrichedPizzaOrder(order, enrichedPizzas);
out.collect(enrichedPizzaOrder);
}
@Override
public void processElement2(PizzaPrice price,
CoProcessFunction<PizzaOrder, PizzaPrice, EnrichedPizzaOrder>.Context ctx,
Collector<EnrichedPizzaOrder> out) throws Exception {
HashMap<String, Double> state = pizzaPriceState.value();
if (state == null) {
state = new HashMap<>();
}
state.put(price.getName(), price.getPrice());
pizzaPriceState.update(state);
}
}
型
2条答案
按热度按时间6yjfywim1#
“Pizza Price”流是典型的浓缩数据,因此可以是广播流,您将其连接到“Pizza Order”流并按照The Broadcast State Pattern使用。
或者,您可以展平Pizza Order记录,这样一个记录变成N个,每个记录都有一个披萨,然后按商店和披萨键。然后,您可以通过相同的两个字段为Pizza Price流设置关键字,并连接流。丰富之后,您必须按订单ID键,然后重新创建未展平的记录。
在任何一种情况下,你都必须处理基于时间的加入,你的Pizza Price流存储在(比如)
MapState<Long, Float>
中,你可以在条目上迭代找到最好的一个(time <=订购时间)。或者使用Table API来帮助您完成上述操作。请参见临时联接。
mkshixfv2#
正如kkrugler所推荐的,对我来说立即奏效的解决方案是应用广播模式。解决方案如下:
解决方案
主管道
字符串
充实函数
型
Map描述符
型
希望这对你有帮助!