我有几个kafka流应用程序示例,我想远程查询。
所有示例当前都在侦听指定的host:port pair,每个示例都能够查询自己的本地状态存储,并通过rest服务传递这些值。
+------------------+ +------------------+ +------------------+
| | | | | |
| instance1:9191 | | instance2:9292 | | instance3:9393 |
| | | | | |
+------------------+ +------------------+ +------------------+
我希望有一个单独的应用程序能够查询这些示例中的状态存储:
consumer group separate application
+---------------------------------------+ _____
| [instance1] [instance2] [instance3] | <~------- | app |
+---------------------------------------+ -----
单独的应用程序将在中使用相同的逻辑 StreamsMetadataState::getAllMetadataForStore
要获取应用程序运行示例的所有活动主机/端口对,请通过rest服务运行远程查询,并在其自己的应用程序逻辑中聚合数据。
然而,我很难实现这一点。由于主机/端口对似乎是通过使用者组协议进行通信的,因此我似乎需要实际示例化另一个kafka streams示例(即使用者组的另一个成员),以便利用远程交互查询。
我的问题是:
是否可以为应用程序的所有正在运行的示例找到主机/值对,而不必同时在同一使用者组中运行本地kafka streams示例(我之所以突出显示running,是因为我不介意示例化kafka streams应用程序的一个虚拟示例来获取主机/端口元数据,但是有一个 validateRunning
检查是否阻止我这样做)
上述设计是否存在问题(运行单独的应用程序来查询kafka streams应用程序的所有示例)?i、 也许我所说的行为不被支持,因为我所做的事情有一些我还没有考虑到的后果?
似乎应该有一个静态方法来获取状态存储元数据,它允许我们直接传递从builder对象提取的任何值。即
KafkaStreams::getMetaDataForStore(streamsConfig, storeName);
2条答案
按热度按时间swvgeqrz1#
没有api支持。
建议的模式是,向所有示例添加第二个rpc(不同于iq端口的端口,我们称之为引导端口)。因此,知道一个主机/引导程序端口对就足以获得所有iq主机/端口对的信息。
更新
您还可以使用kafka主题来传播主机/端口信息。每个示例在启动时都会将其主机/端口写入主题,您可以从中读取这些信息。棘手的部分是,从主题中过期/删除。如果启用压缩,每个示例都可以为其主机/端口消息编写逻辑删除消息。但是,如果示例崩溃,您将获得未被删除的旧信息。
另一方面,您可以将此方法与第一种方法相结合(即,将主机/引导端口而不是iq主机/端口写入主题)。只需获得一个有效的主机/引导端口就足够了——如果您使用了一个无效的端口,您的请求就会超时,您可以从查询应用程序中编写一个逻辑删除来进行清理。
结束更新
如果这也不起作用,您可以“破解”这个限制。成功重新平衡后,所有元数据都存储在主题中
__consumer_offsets
理论上,您可以从那里读取信息并提取所有主机/端口对。但是,您将依赖于内部实现细节,因此您的代码可能会在升级时中断。8cdiaqws2#
是否可以为应用程序的所有正在运行的示例找到主机/值对,而不必同时在同一使用者组中运行本地kafka streams示例(我之所以突出显示running,是因为我不介意示例化kafka streams应用程序的一个虚拟示例来获取主机/端口元数据,但是有一个validaterunning检查阻止我这样做)
为什么不向(第一个)kafka streams应用程序添加一个新的restapi方法,将当前活动的主机/端口对公开给第二个应用程序?应用程序示例当然可以随时获得这些信息。
第二个应用程序--“单独的应用程序”--可以通过这个rest方法查询任何(第一个)kafka streams应用程序示例,以发现所有正在运行的应用程序示例。在这里,你不需要运行一个假人
KafkaStreams
第二个应用程序中的示例。上述设计是否存在问题(运行单独的应用程序来查询kafka streams应用程序的所有示例)?i、 也许我所说的行为不被支持,因为我所做的事情有一些我还没有考虑到的后果?
见上文。没有什么可以阻止您向kafka streams应用程序的rest层添加更多方法。毕竟,您的(第一个)应用程序中使用kafka streams api的部分不必是该应用程序的唯一部分。:-)我认为你的问题可能是你的思维有点“封闭”了,因为你觉得必须通过kafka streams api在你的应用程序中做所有的事情——但事实并非如此。毕竟,kafka streams api背后的一个动机是让您可以将它与希望在应用程序中利用的其他api和库混合使用。