我试图从多个主题中过滤Kafka事件,但是一旦一个主题中的所有事件都被过滤,logstash就无法从另一个Kafka主题中获取事件。我正在使用3个分区和2个复制的主题这里是我的logstash配置文件
input {
kafka{
auto_offset_reset => "smallest"
consumer_id => "logstashConsumer1"
topic_id => "unprocessed_log1"
zk_connect=>"192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
type => "kafka_type_1"
}
kafka{
auto_offset_reset => "smallest"
consumer_id => "logstashConsumer1"
topic_id => "unprocessed_log2"
zk_connect => "192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
type => "kafka_type_2"
}
}
filter{
if [type] == "kafka_type_1"{
csv {
separator=>" "
source => "data"
}
}
if [type] == "kafka_type_2"{
csv {
separator => " "
source => "data"
}
}
}
output{
stdout{ codec=>rubydebug{metadata => true }}
}
1条答案
按热度按时间bihw5rsg1#
这是一个非常晚的答复,但如果您想采取输入多个主题和输出到另一个Kafka多输出,您可以这样做:
在详细说明引导服务器时要小心,给出kafka播发侦听器的名称。
参考1:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-输入-kafka-group\u id
参考2:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-输入-Kafka-卡努事件