samza在发送消息时会自动创建分区吗?

1hdlvixo  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(303)

如果您使用samza的outgoingmessageenvelope使用以下格式发送消息:

public OutgoingMessageEnvelope(SystemStream systemStream,
                               java.lang.Object partitionKey,
                               java.lang.Object key,
                               java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.

如果在流任务的process()方法中调用这个方法,并希望将传入的消息路由到适当的分区,那么samza会在调用该方法时为您创建分区吗?
例如

MessageA = {"id": "idA", "key": "keyA", "body":"some details"}
MessageB = {"id": "idB", "key": "keyB", "body":"some more details"}

如果我在流任务的 process() 哪里 msg 是消息示例:

public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
    // ...
    String partition = msg["id"]
    String key = msg["key"]
    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "PartitionedMessages"), id, key, msg));
    // ...

这是否会自动为我创建分区ida和idb(即,在向它们发送消息之前,我是否需要创建这些分区)?我希望能够将消息路由到适当的分区,并且能够使用单独的消息密钥进行日志压缩。

idfiyjo8

idfiyjo81#

创建主题时必须指定分区数。您不能动态地添加新分区(当然,您可以,但这并不容易,而且samza不会自动添加)。如果主题不存在,但具有默认的分区数,samza应该为您创建新的主题。这取决于设置。你可以测试一下。
但是价值 msg["id"] 不指定分区的名称。这个值只是用来计算目标分区的数目。将该值散列为一个数字,然后使用模进行修剪。类似这样的情况(有更多的算法,这是基本算法):

partitionID = hash(msg["id"]) % total_number_of_partitions

以及 partitionID 始终是非负整数。这意味着实际上有多少个分区并不重要。它总是以某种方式结束。其主要思想是,如果您有两条消息具有相同的 msg["id"] ,则消息将在相同的分区中结束。这通常是你想要的。
日志压缩将如您预期的那样工作——它将从特定分区中删除具有相同密钥的消息(但是,如果您有两个具有相同密钥的消息和两个不同分区,则不会删除它们)。
仅供参考,你可以使用Kafka卡特找出分区的数量和其他有用的东西。

相关问题