我是flink的新手,我有五个无限的Kafka源,它们的数据模式不同。我想减少消息,然后用相同的键外连接所有的Kafka源。所以我使用union将它们合并组合在一起,然后使用 ProcessWindowFunction 将它们转换为一个大对象,然后发送到下游。我有两个问题。
1.我想我找到了数据丢失的根本原因。总是有一个Kafka源水印晚于其他。所以它的消息将被丢弃。
1.对于重复的消息,我认为这与windown大小和联合运算符有关。这不是我的逻辑问题。
class CommonObj {
var id: Long = 0
var entityType: String? = null
var timestamp: Long = System.currentTimeMillis()
val eventMetas: MutableList<EventMeta> = mutableListOf()
var kafkaStreamValue1: KafkaStreamObj1? = null
var kafkaStreamValue2: KafkaStreamObj2? = null
var kafkaStreamValue3: KafkaStreamObj3? = null
var kafkaStreamValue4: KafkaStreamObj4? = null
fun buildSinkObj(): SinkObj = ....
}
字符串
这是一个Kafka的源代码。其他的Kafka源代码逻辑非常相似。
val watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps<KafkaStreamObj1>()
.withIdleness(Duration.ofMinutes(1))
val sourceStream1 = env.fromSource(
getKafkaStream1(params),
watermarkStrategy,
"Kafka Source 1"
)
val kafkaSource1 = sourceStream1
.filter { it != null }
.map {
EventObj<KafkaStreamObj1>(
it.id.toString() + it.entity, //this is key
it, //obj
it.sequence, //timestamp
mutableListOf(EventMeta(it.transactionId, it.type, it.sequence, it.changed))
)
}
.returns(TypeInformation.of(object : TypeHint<EventObj<KafkaStreamObj1>>() {}))
.keyBy {
it.key }
.window(TumblingEventTimeWindows.of(Time.milliseconds(10000)))
.reduce { v1, v2 ->
if (v1.obj.sequence > v2.obj.sequence) {
v1.eventMetaList.addAll(v2.eventMetaList)
v1
} else {
v2.eventMetaList.addAll(v1.eventMetaList)
v2
}
}
.map {
val commonObj = CommonObj()
commonObj.id = it.obj.id
commonObj.entityType = it.obj.entity
commonObj.timestamp = System.currentTimeMillis()
commonObj.eventMetas.addAll(it.eventMetaList)
commonObj.kafkaStreamValue1 = it.obj.entity
commonObj
}
.returns(TypeInformation.of(object : TypeHint<CommonObj>() {}))
return kafkaSource1
型
这个联盟的代码
kafkaStream1.union(kafkaStream2,kafkaStream3,kafkaStream4,kafkaStream5)
.keyBy(keySelector)
.window(TumblingEventTimeWindows.of(Time.milliseconds(10000)))
.process(EventProcessFunction(params))
.sinkTo(kafkaSink())
型
EventProcessFunction将所有Kafka源消息合并到CommonObj中。它将通过键和时间戳减少重复。
我删除了一些敏感信息。
更新2023-12-11
我做了一些优化。但总有一个Kafka源有背压,它的水印比联盟水印晚。所以晚的消息会被丢弃。据我所知,联盟将使用所有联盟源中较小的水印。虽然设置为1分钟空闲,但晚的源有流量,正在被消耗。为什么联盟水印被提前,而不是较小的水印?
2023年12月14日更新
谢谢kkrugler的建议。事实上,我不在乎事件是否迟到。如果它迟到了,就像往常一样处理它。所以我会尝试使用处理时间windown。
目前我已经更改为使用处理时间窗口和无水印。当我使用TumblingProcessingTimeWindows时,所有Kafka源都存在高背压。并且检查点会在10 m后超时。如何调整背压和检查点问题?
我试着调整窗口大小10- 30秒。但问题仍在继续。
1条答案
按热度按时间7xllpg7q1#
要回答有关水印的主要问题,您应该通过(较新的)
StreamExecutionEnvironment.fromSource()
方法将水印策略应用于每个Kafka源,然后将流合并组合起来。