Logstash“sql_last_value”值比较现在可以在GT条件下正常工作

vatpfxk5  于 2023-01-10  发布在  Logstash
关注(0)|答案(1)|浏览(218)

我想制作一个由RDS(Aurora DB)、Logstash和AWS Opensearch组成的数据管道。
要使我的opensearch索引获得数据一致性,我希望使用查询删除重复值。
为此,我编写了一个如下所示的logstash配置文件。

input{
  jdbc {
    jdbc_driver_library => "/home/ubuntu/logstash-7.16.2/bin/mysql-connector-java-8.0.27.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://~~~~?useSSL=false"
    jdbc_user => "root"
    jdbc_password => "~~~~~~"
    jdbc_paging_enabled => true
    tracking_column => "updated_at"
    use_column_value => true
    record_last_run => true
    tracking_column_type => “timestamp”
    schedule => "*/10 * * * * *"
    statement => "select * from my_table where updated_at > :sql_last_value order by updated_at ASC"
    jdbc_default_timezone => "Asia/Seoul"
  }
}

output {
  opensearch{
    hosts => "https://~~~~~:443"
    user => "admin"
    password => "~~~~~"
    index => "index"
    ecs_compatibility => disabled
    ssl_certificate_verification => false
  }
}

而logstash生成的查询是这些。

[2022-12-29T16:48:40,299][INFO ][logstash.inputs.jdbc     ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001157s) SELECT version()
[2022-12-29T16:48:40,302][INFO ][logstash.inputs.jdbc     ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001163s) SELECT version()
[2022-12-29T16:48:40,306][INFO ][logstash.inputs.jdbc     ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001221s) SELECT count(*) AS `count` FROM (select * from my_table where updated_at > '2022-12-28 22:31:05' order by updated_at ASC) AS `t1` LIMIT 1
[2022-12-29T16:48:40,309][INFO ][logstash.inputs.jdbc     ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001166s) SELECT * FROM (select * from my_table where updated_at > '2022-12-28 22:31:05' order by updated_at ASC) AS `t1` LIMIT 100000 OFFSET 0
[2022-12-29T16:48:50,172][INFO ][logstash.inputs.jdbc     ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001303s) SELECT version()
[2022-12-29T16:48:50,174][INFO ][logstash.inputs.jdbc     ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001152s) SELECT version()
[2022-12-29T16:48:50,178][INFO ][logstash.inputs.jdbc     ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001382s) SELECT count(*) AS `count` FROM (select * from my_table where updated_at > '2022-12-28 22:31:05' order by updated_at ASC) AS `t1` LIMIT 1
[2022-12-29T16:48:50,182][INFO ][logstash.inputs.jdbc     ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001153s) SELECT * FROM (select * from my_table where updated_at > '2022-12-28 22:31:05' order by updated_at ASC) AS `t1` LIMIT 100000 OFFSET 0

看起来一切都很好。每当插入新数据时,Sql_last_value都会更新。
但是,在OpenSearch中查询的值还包含等于sql_last_value和updated_at的值。
例如,

sql_last_value = 2022.12.08 12:12:12

first data : 2022.12.08 12:12:11
second data : 2022.12.08 12:12:12
third data : 2022.12.08 12:12:13

在上面的例子中,我的查询选择了第二个和第三个。另外,sql_last_value从下一个查询更新为2022.12.08 12:12:13
并且,updated_at列是由NestJS中的Sequelize模块创建的(时间戳类型)。
我的配置文件有什么问题?

ajsxfq5m

ajsxfq5m1#

我不明白这是怎么回事,但我解决了问题。
问题是我的语句查询,select * from my_table where updated_at〉:sql_last_value order by updated_at ASC
我将其编辑为select * from my_table,其中updated_at〉:sql_last_value而没有order by命令。

相关问题