如何使用带有特定分区器的apache flink将数据作为键/值发送给kafka

nue99wik  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(447)

我在下面的Flink有一个有效载荷;

{
    "memberId": 4
    "total": 5
}

我想用指定的分区器将数据作为键值格式发送到kafka。对于partitioner,我将使用模partitioner。
模分解器的一个例子; partitionId = value % numPartitions 让我们假设 numPartitions 参数为3。如果我们可以使用上面定义的有效负载的memberid,那么partitionid应该是 4 % 3 = 1
根据上面的partitioner,我想将具有相同partitionid的数据发送到相同的kafka主题。另一个例子;
if(假设numpartitions=3);

memberId: 3 => (3 % 3) => partitionId = 0 => kafka partition 1
memberId: 8 => (8 % 3) => partitionId = 2 => kafka partition 2
memberId: 2 => (2 % 3) => partitionId = 2 => kafka partition 2
memberId: 6 => (6 % 3) => partitionId = 0 => kafka partition 1
memberId: 7 => (7 % 3) => partitionId = 1 => kafka partition 2

如果我没说错的话,如果我们不能指定任何键和分区函数,FlinkKafka生产者就使用flinkfixedpartitioner。如果我们把配分函数设为 null ,flink kafka制片人将采用循环发行。但我不知道如何将数据作为键/值格式发送给kafka,如何按模进行分区。我怎样才能做到这一点?

4ktjp1zp

4ktjp1zp1#

如果你使用 KafkaSerializationSchema ,则可以创建Kafka ProducerRecords ,并设置Kafka键(和值)。也可以在中设置分区 ProducerRecord .

相关问题