我正在尝试使用kafka消息(作为flink1.10api流源)在elasticsearch中聚合数据。数据是以json格式接收的,json格式是动态的,下面给出了示例。我想用唯一的id将多个记录合并到一个文档中。数据是按顺序来的,它是时间序列数据。
源-汇kafka和目标-汇elasticseach 7.6.1 6
我没有找到任何好的例子,可以利用在下面的问题陈述。
Record : 1
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.325",
"Data" :
{
"Field1" : "ABC",
"Field2" : "DEF"
}
}
Record : 2
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.725",
"Data" :
{
"Field3" : "GHY"
}
}
Result :
{
"ID" : "1",
"Start_timestamp" : "2020-05-07 14:34:51.325",
"End_timestamp" : "2020-05-07 14:34:51.725",
"Data" :
{
"Field1" : "ABC",
"Field2" : "DEF",
"Field3" : "GHY"
}
}
以下是版本详细信息:
Flink1.10
flink kafka连接器2.11
flink elasticsearch连接器7.x
Kafka2.11
jdk 1.8版
1条答案
按热度按时间umuewwlo1#
你所要求的可以说是某种形式的加入,有很多方法可以通过flink来实现。apache flink培训中有一个状态丰富的示例,展示了如何使用
RichFlatMapFunction
这应该能帮助你开始。您需要首先阅读相关的培训材料——至少是关于数据管道和etl的部分。使用这种方法最终要做的是按id(via)对流进行分区
keyBy
),然后使用键分区状态(可能是MapState
在本例中,假设您要为每个id存储多个属性/值对)来存储来自记录(如记录1)的信息,直到您准备好发出结果为止。顺便说一句,如果密钥集是无界的,您需要注意不要永远保持这种状态。或者在不再需要状态时清除它(如本例所示),或者使用statettl安排它的最终删除。
有关flink中其他类型联接的更多信息,请参阅此答案中的链接。