我想制作一个由RDS(Aurora DB)、Logstash和AWS Opensearch组成的数据管道。
要使我的opensearch索引获得数据一致性,我希望使用查询删除重复值。
为此,我编写了一个如下所示的logstash配置文件。
input{
jdbc {
jdbc_driver_library => "/home/ubuntu/logstash-7.16.2/bin/mysql-connector-java-8.0.27.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://~~~~?useSSL=false"
jdbc_user => "root"
jdbc_password => "~~~~~~"
jdbc_paging_enabled => true
tracking_column => "updated_at"
use_column_value => true
record_last_run => true
tracking_column_type => “timestamp”
schedule => "*/10 * * * * *"
statement => "select * from my_table where updated_at > :sql_last_value order by updated_at ASC"
jdbc_default_timezone => "Asia/Seoul"
}
}
output {
opensearch{
hosts => "https://~~~~~:443"
user => "admin"
password => "~~~~~"
index => "index"
ecs_compatibility => disabled
ssl_certificate_verification => false
}
}
而logstash生成的查询是这些。
[2022-12-29T16:48:40,299][INFO ][logstash.inputs.jdbc ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001157s) SELECT version()
[2022-12-29T16:48:40,302][INFO ][logstash.inputs.jdbc ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001163s) SELECT version()
[2022-12-29T16:48:40,306][INFO ][logstash.inputs.jdbc ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001221s) SELECT count(*) AS `count` FROM (select * from my_table where updated_at > '2022-12-28 22:31:05' order by updated_at ASC) AS `t1` LIMIT 1
[2022-12-29T16:48:40,309][INFO ][logstash.inputs.jdbc ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001166s) SELECT * FROM (select * from my_table where updated_at > '2022-12-28 22:31:05' order by updated_at ASC) AS `t1` LIMIT 100000 OFFSET 0
[2022-12-29T16:48:50,172][INFO ][logstash.inputs.jdbc ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001303s) SELECT version()
[2022-12-29T16:48:50,174][INFO ][logstash.inputs.jdbc ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001152s) SELECT version()
[2022-12-29T16:48:50,178][INFO ][logstash.inputs.jdbc ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001382s) SELECT count(*) AS `count` FROM (select * from my_table where updated_at > '2022-12-28 22:31:05' order by updated_at ASC) AS `t1` LIMIT 1
[2022-12-29T16:48:50,182][INFO ][logstash.inputs.jdbc ][main][0bb20d034a10be3c1a48635cda2cc7dfcb97e29fb63940352f5380ec253dfe48] (0.001153s) SELECT * FROM (select * from my_table where updated_at > '2022-12-28 22:31:05' order by updated_at ASC) AS `t1` LIMIT 100000 OFFSET 0
看起来一切都很好。每当插入新数据时,Sql_last_value都会更新。
但是,在OpenSearch中查询的值还包含等于sql_last_value和updated_at的值。
例如,
sql_last_value = 2022.12.08 12:12:12
first data : 2022.12.08 12:12:11
second data : 2022.12.08 12:12:12
third data : 2022.12.08 12:12:13
在上面的例子中,我的查询选择了第二个和第三个。另外,sql_last_value从下一个查询更新为2022.12.08 12:12:13。
并且,updated_at列是由NestJS中的Sequelize模块创建的(时间戳类型)。
我的配置文件有什么问题?
1条答案
按热度按时间ajsxfq5m1#
我不明白这是怎么回事,但我解决了问题。
问题是我的语句查询,select * from my_table where updated_at〉:sql_last_value order by updated_at ASC。
我将其编辑为select * from my_table,其中updated_at〉:sql_last_value而没有order by命令。