有大量的数据被推到我们的Kafka主题之一,有没有办法确定这些数据来自哪个生产者?
vuktfyat1#
没有sasl或 Authorizer 级别审计,没有一个简单的方法,除了通过jmx跟踪连接的、可疑的客户id。我建议你执行一个标准的消息格式,并将消息传播给制作人团队。例如,查看cloudevents规范,它包含一个source字段https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md
Authorizer
jv4diomz2#
你可以使用头,和硬编码生产者id在头之前生产!这是我在node.js中使用rdkafka做的事情,java也应该有它!
z9ju0rcb3#
您可以为客户机/用户启用配额,然后通过两个与配额相关的jmx MBean(带宽和请求速率)监视哪些客户机受到限制:度量:每个(用户、客户端id)、用户或客户端id的带宽配额度量mbean公司: kafka.server:type={Produce|Fetch},user=([-.\w]+),client-id=([-.\w]+) 它显示了两个属性。throttle time指示客户端被限制的时间量(毫秒)。理想情况下=0。byte rate表示客户端的数据产生/消耗速率,单位为字节/秒。对于(用户,客户端id)配额,同时指定用户和客户端id。如果对客户端应用了每个客户端id配额,则不指定用户。如果应用了每个用户配额,则不指定客户端id。度量:每个(用户、客户端id)、用户或客户端id的请求配额度量mbean公司: kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+) 它显示了两个属性。throttle time指示客户端被限制的时间量(毫秒)。理想情况下=0。request time指示在代理网络和i/o线程中处理来自客户端组的请求所花费的时间百分比。对于(用户,客户端id)配额,同时指定用户和客户端id。如果对客户端应用了每个客户端id配额,则不指定用户。如果应用了每个用户配额,则不指定客户端id。
kafka.server:type={Produce|Fetch},user=([-.\w]+),client-id=([-.\w]+)
kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+)
3条答案
按热度按时间vuktfyat1#
没有sasl或
Authorizer
级别审计,没有一个简单的方法,除了通过jmx跟踪连接的、可疑的客户id。我建议你执行一个标准的消息格式,并将消息传播给制作人团队。例如,查看cloudevents规范,它包含一个source字段
https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md
jv4diomz2#
你可以使用头,和硬编码生产者id在头之前生产!这是我在node.js中使用rdkafka做的事情,java也应该有它!
z9ju0rcb3#
您可以为客户机/用户启用配额,然后通过两个与配额相关的jmx MBean(带宽和请求速率)监视哪些客户机受到限制:
度量:每个(用户、客户端id)、用户或客户端id的带宽配额度量
mbean公司:
kafka.server:type={Produce|Fetch},user=([-.\w]+),client-id=([-.\w]+)
它显示了两个属性。throttle time指示客户端被限制的时间量(毫秒)。理想情况下=0。byte rate表示客户端的数据产生/消耗速率,单位为字节/秒。对于(用户,客户端id)配额,同时指定用户和客户端id。如果对客户端应用了每个客户端id配额,则不指定用户。如果应用了每个用户配额,则不指定客户端id。度量:每个(用户、客户端id)、用户或客户端id的请求配额度量
mbean公司:
kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+)
它显示了两个属性。throttle time指示客户端被限制的时间量(毫秒)。理想情况下=0。request time指示在代理网络和i/o线程中处理来自客户端组的请求所花费的时间百分比。对于(用户,客户端id)配额,同时指定用户和客户端id。如果对客户端应用了每个客户端id配额,则不指定用户。如果应用了每个用户配额,则不指定客户端id。