我在下面的Flink有一个有效载荷;
{
"memberId": 4
"total": 5
}
我想用指定的分区器将数据作为键值格式发送到kafka。对于partitioner,我将使用模partitioner。
模分解器的一个例子; partitionId = value % numPartitions
让我们假设 numPartitions
参数为3。如果我们可以使用上面定义的有效负载的memberid,那么partitionid应该是 4 % 3
= 1
根据上面的partitioner,我想将具有相同partitionid的数据发送到相同的kafka主题。另一个例子;
if(假设numpartitions=3);
memberId: 3 => (3 % 3) => partitionId = 0 => kafka partition 1
memberId: 8 => (8 % 3) => partitionId = 2 => kafka partition 2
memberId: 2 => (2 % 3) => partitionId = 2 => kafka partition 2
memberId: 6 => (6 % 3) => partitionId = 0 => kafka partition 1
memberId: 7 => (7 % 3) => partitionId = 1 => kafka partition 2
如果我没说错的话,如果我们不能指定任何键和分区函数,FlinkKafka生产者就使用flinkfixedpartitioner。如果我们把配分函数设为 null
,flink kafka制片人将采用循环发行。但我不知道如何将数据作为键/值格式发送给kafka,如何按模进行分区。我怎样才能做到这一点?
1条答案
按热度按时间4ktjp1zp1#
如果你使用
KafkaSerializationSchema
,则可以创建KafkaProducerRecords
,并设置Kafka键(和值)。也可以在中设置分区ProducerRecord
.