Logstash Jdbc插件在elasticsearch中填充比实际数据更多的数据,继续运行

jjhzyzn0  于 2022-12-09  发布在  Logstash
关注(0)|答案(1)|浏览(159)

Logstash正在无限循环中运行,我不得不停止这个过程,基本上是在elasticsearch索引中填充值。我需要的文档数量与我的db表中的行数完全相同。
以下是我的logstash配置:

input {
  jdbc {
    jdbc_driver_library => "/correct_path/java/mysql-connector-java-8.0.27.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
    jdbc_user => "user" 
    jdbc_password => "password" 
    jdbc_paging_enabled => true
    schedule => "*/5 * * * * *"
    statement => 'select * from my_table'
  }
}

output {
    elasticsearch {
      user => "test"
      password => "test"
      hosts => ["localhost:9200"] 
      index => "my_index"
    }
    stdout { codec => "rubydebug" }
}
1qczuiv0

1qczuiv01#

这是因为每次执行cron作业时,查询都会获取所有数据。另外,您没有在elasticsearch输出中提供自定义id,因此它会为每个文档创建动态ID,因此索引中会有更多数据(具有不同唯一ID的重复数据)。
您可以使用存储上次搜索日期的sql_last_value参数,并使用created_date或updated_date上的where条件更新查询。这将第一次从数据库中获取所有数据,第二次以后仅获取新创建或更新的数据。

input {
  jdbc {
    jdbc_driver_library => "/correct_path/java/mysql-connector-java-8.0.27.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
    jdbc_user => "user" 
    jdbc_password => "password" 
    jdbc_paging_enabled => true
    schedule => "*/5 * * * * *"
    statement => 'select * from my_table where created_date > :sql_last_value or updated_date > :sql_last_value'
  }
}

output {
    elasticsearch {
      user => "test"
      password => "test"
      hosts => ["localhost:9200"] 
      index => "my_index"
    }
    stdout { codec => "rubydebug" }
}

PS:我不是SQL专业人士,所以我的查询可能会有问题。但我希望你能明白这个想法。

相关问题