我有一个kafka streams应用程序用一个线程运行,用一个分区处理一个主题就可以了。
我需要运行这个应用程序的多个示例同时处理不同的主题。在我当前的场景中,所有主题只有一个分区。
当我运行同一应用程序的新示例(具有相同的应用程序\u id)并处理不同的主题时,streams客户端不会在此新应用程序中创建新任务。第一个示例继续处理任务0\u 0中的第一个主题,第二个示例等待,而不执行分配的分区。
我知道我只使用一个分区的主题,但在这种情况下,如果我有两个示例和两个主题,其中一个分区要处理,这就形成了两个分区,¿为什么不能在每个示例中同时处理带有单个分区的两个主题?
我怀疑这与streamsparitionassignor有关,但在kafka streams应用程序中,分配策略是不能更改的:
kafka streams不允许使用自定义分区赋值器。如果您自己设置一个,它将被streamsparitionassignor[1]覆盖。这是为了确保——如果可能的话——在重新平衡期间将分区重新分配给相同的使用者(也称为粘性)。
编辑:
应用程序的拓扑结构:
[2019-11-20 09:36:35,406] [INFO] stream-thread [avro-to-json-d07ad9ad-f4b6-4787-96cf-19c48e72ad46-StreamThread-1] Starting (org.apache.kafka.streams.processor.internals.StreamThread)
[2019-11-20 09:36:35,407] [INFO] stream-thread [avro-to-json-d07ad9ad-f4b6-4787-96cf-19c48e72ad46-StreamThread-1] State transition from CREATED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread)
[2019-11-20 09:36:35,407] [INFO] stream-client [avro-to-json-d07ad9ad-f4b6-4787-96cf-19c48e72ad46] Started Streams client (org.apache.kafka.streams.KafkaStreams)
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [])
--> KSTREAM-MAP-0000000001
Processor: KSTREAM-MAP-0000000001 (stores: [])
--> KSTREAM-MAP-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAP-0000000002 (stores: [])
--> KSTREAM-TRANSFORM-0000000003
<-- KSTREAM-MAP-0000000001
Processor: KSTREAM-TRANSFORM-0000000003 (stores: [])
--> KSTREAM-SINK-0000000004
<-- KSTREAM-MAP-0000000002
Sink: KSTREAM-SINK-0000000004 (extractor class: kafka.AvroToJson$$Lambda$97/741730375@957e06)
<-- KSTREAM-TRANSFORM-0000000003
1条答案
按热度按时间oxiaedzo1#
如果我理解正确的话,您可以使用相同的
application.id
具有相同的拓扑结构,但输入主题除外。这意味着您可以在streams客户端上有效地运行两个不同的streams应用程序,因为输入主题不同。运行两个不同的流应用程序application.id
是未定义的行为application.id
需要在kafka集群中是唯一的(请参见https://kafka.apache.org/23/documentation/#streamsconfigs).你也可以
增加其中一个主题的分区,并将该主题用作两个流应用程序中的输入主题(使它们成为同一个应用程序),或者
更改
application.id
两个应用程序中的一个。注意,选项1为您提供了流客户端之间工作负载的自动重新平衡,而选项2没有。