带有多个Kafka输入的logstash

uubf1zoe  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(549)

我试图从多个主题中过滤Kafka事件,但是一旦一个主题中的所有事件都被过滤,logstash就无法从另一个Kafka主题中获取事件。我正在使用3个分区和2个复制的主题这里是我的logstash配置文件

input {
    kafka{              
        auto_offset_reset => "smallest"
        consumer_id => "logstashConsumer1"          
        topic_id => "unprocessed_log1"
        zk_connect=>"192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
        type => "kafka_type_1"
}
kafka{              
    auto_offset_reset => "smallest"
    consumer_id => "logstashConsumer1"          
    topic_id => "unprocessed_log2"
    zk_connect => "192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
    type => "kafka_type_2"
}
}
filter{
    if [type] == "kafka_type_1"{
    csv { 
        separator=>" "
        source => "data"        
    }   
}
if [type] == "kafka_type_2"{    
    csv { 
        separator => " "        
        source => "data"
    }
}
}
output{
    stdout{ codec=>rubydebug{metadata => true }}
}
bihw5rsg

bihw5rsg1#

这是一个非常晚的答复,但如果您想采取输入多个主题和输出到另一个Kafka多输出,您可以这样做:

input {
  kafka {
    topics => ["topic1", "topic2"]
    codec => "json"
    bootstrap_servers => "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092"
    decorate_events => true
    group_id => "logstash-multi-topic-consumers"
    consumer_threads => 5
  }
}

output {
   if [kafka][topic] == "topic1" {
     kafka {
       codec => "json"
       topic_id => "new_topic1"
       bootstrap_servers => "output-kafka-1:9092"
     }
   }
   else if [kafka][topic] == "topic2" {
      kafka {
       codec => "json"
       topic_id => "new_topic2"
       bootstrap_servers => "output-kafka-1:9092"
      }
    }
}

在详细说明引导服务器时要小心,给出kafka播发侦听器的名称。
参考1:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-输入-kafka-group\u id
参考2:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-输入-Kafka-卡努事件

相关问题