我正在使用jdbc插件从mysql数据库中的数据构建es索引。
我的一个列有多个多对多参数,我需要将这些参数导入索引(因为es不能很好地管理多对多)。我是通过一个连接调用来实现的:
Select * from tableA as a
LEFT JOIN aToB
ON a.id = aToB.aId
LEFT JOIN B
on B.id = aToB.bId
LEFT JOIN aToC
ON a.id = aToC.aId
LEFT JOIN C
on C.id = aToC.cId
WHERE last_Update > :sql_last_value
然后,在过滤器中,我使用聚合将值Map到一起:
filter{
aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['valueA1'] = event.get('valueA1')
map['valueA2'] = event.get('valueA2')
map['B'] ||= []
bEntry = {
'valueB1' => event.get('idvalueB1),
'valueB2' => event.get('valueB2')
}
if ! map['B'].include?(BEntry)
map['B'] << BEntry
else
puts 'Duplicate'
end
map['C'] ||= []
cEntry = {
'valueC1' => event.get('valueC1'),
'valueC2' => event.get('valueC2')
}
if ! map['C'].include?(valueC1)
map['C'] << valueC1
end
event.cancel()"
push_previous_map_as_event => true
timeout => 30
}
}
因此,我一开始只是简单地添加值,但是由于连接会复制每一条信息的列,因此结果索引中有很多重复。
例如,假设b中有1个元素链接到a,c中有6个元素。c会被正确添加,但b会有6倍的相同信息。
上面的代码是我目前试图解决的问题:我只想添加不在Map中的条目。问题是,我认为这些Map在某种程度上是共同的,因为在第一次插入之后,下一个Map被标记为重复的,并且有些Map不是事件编写的。
如何正确地导入多个多对多条目而不重复,也不丢失某些条目?
编辑:这是文件
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "${OPTA_CONNECTION_STRING:jdbc:mysql://localhost:3306/oz?serverTimezone=UTC}"
jdbc_user => "${DB_USER:root}"
jdbc_password => "${DB_PASSWORD:root}"
jdbc_paging_enabled => true
tracking_column => "updatedAt"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/59 * * * * *"
statement => "SELECT testA.id as id, testA.dataA1 as dataA1, testA.dataA2 as dataA2, testA.updatedAt as updatedAt,
testB.dataB1 as dataB1, testB.dataB2 as dataB2,
testC.dataC1 as dataC1, testC.dataC2 as dataC2
FROM testA
LEFT JOIN aToB aToB
ON aToB.idA = testA.id
LEFT JOIN testB testB
ON testB.id = aToB.idB
LEFT JOIN aToC aToC
ON aToC.idA = testA.id
LEFT JOIN testC testC
ON testC.id = aToC.idC
WHERE UNIX_TIMESTAMP(testA.updatedAt) > :sql_last_value AND testA.updatedAt < NOW()"
}
}
filter{
aggregate {
task_id => "%{id}"
code => "
p event
map['id'] = event.get('id')
map['dataA1'] = event.get('dataA1')
map['dataA2'] = event.get('dataA2')
map['testB'] ||= []
testBEntry = {
'dataB1' => event.get('dataB1'),
'dataB2' => event.get('dataB2')
}
p testBEntry
if ! map['testB'].include?(testBEntry)
map['testB'] << testBEntry
else
puts 'Duplicate'
end
map['testC'] ||= []
testCEntry = {
'dataC1' => event.get('dataC1'),
'dataC2' => event.get('dataC2')
}
p testCEntry
if ! map['testC'].include?(testCEntry)
map['testC'] << testCEntry
else
puts 'Duplicate'
end
event.cancel()"
push_previous_map_as_event => true
timeout => 30
}
}
output {
elasticsearch {
hosts => ["${ES_HOST:http://localhost:9200}"]
user => "${ES_USER:elastic}"
password => "${ES_PASSWORD:changeme}"
index => "test"
document_id => "test_%{id}"
}
}
使用此mysql数据:
DROP TABLE IF EXISTS `aToB`;
CREATE TABLE `aToB` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`idA` int(11) DEFAULT NULL,
`idB` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
LOCK TABLES `aToB` WRITE;
/*!40000 ALTER TABLE `aToB` DISABLE KEYS */;
INSERT INTO `aToB` (`id`, `idA`, `idB`)
VALUES
(1,1,1),
(3,2,2);
/*!40000 ALTER TABLE `aToB` ENABLE KEYS */;
UNLOCK TABLES;
# Dump of table aToC
# ------------------------------------------------------------
DROP TABLE IF EXISTS `aToC`;
CREATE TABLE `aToC` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`idA` int(11) DEFAULT NULL,
`idC` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
LOCK TABLES `aToC` WRITE;
/*!40000 ALTER TABLE `aToC` DISABLE KEYS */;
INSERT INTO `aToC` (`id`, `idA`, `idC`)
VALUES
(1,1,1),
(2,1,2),
(3,2,1),
(4,2,2);
/*!40000 ALTER TABLE `aToC` ENABLE KEYS */;
UNLOCK TABLES;
# Dump of table testA
# ------------------------------------------------------------
DROP TABLE IF EXISTS `testA`;
CREATE TABLE `testA` (
`id` int(6) unsigned NOT NULL AUTO_INCREMENT,
`dataA1` int(6) DEFAULT NULL,
`dataA2` int(6) DEFAULT NULL,
`updatedAt` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
LOCK TABLES `testA` WRITE;
/*!40000 ALTER TABLE `testA` DISABLE KEYS */;
INSERT INTO `testA` (`id`, `dataA1`, `dataA2`, `updatedAt`)
VALUES
(1,1,2,'2020-12-10 14:57:53'),
(2,3,4,'2020-12-10 14:57:53');
/*!40000 ALTER TABLE `testA` ENABLE KEYS */;
UNLOCK TABLES;
# Dump of table testB
# ------------------------------------------------------------
DROP TABLE IF EXISTS `testB`;
CREATE TABLE `testB` (
`id` int(6) unsigned NOT NULL AUTO_INCREMENT,
`dataB1` int(6) DEFAULT NULL,
`dataB2` int(6) DEFAULT NULL,
`updatedAt` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
LOCK TABLES `testB` WRITE;
/*!40000 ALTER TABLE `testB` DISABLE KEYS */;
INSERT INTO `testB` (`id`, `dataB1`, `dataB2`, `updatedAt`)
VALUES
(1,5,6,'2020-12-10 14:57:53'),
(2,7,8,'2020-12-10 14:57:53'),
(3,666,6677,'2020-12-10 14:57:53');
/*!40000 ALTER TABLE `testB` ENABLE KEYS */;
UNLOCK TABLES;
# Dump of table testC
# ------------------------------------------------------------
DROP TABLE IF EXISTS `testC`;
CREATE TABLE `testC` (
`id` int(6) unsigned NOT NULL AUTO_INCREMENT,
`dataC1` int(6) DEFAULT NULL,
`dataC2` int(6) DEFAULT NULL,
`updatedAt` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
LOCK TABLES `testC` WRITE;
/*!40000 ALTER TABLE `testC` DISABLE KEYS */;
INSERT INTO `testC` (`id`, `dataC1`, `dataC2`, `updatedAt`)
VALUES
(1,9,10,'2020-12-10 14:57:53'),
(2,11,12,'2020-12-10 14:57:53'),
(3,888,999,'2020-12-10 14:57:53');
/*!40000 ALTER TABLE `testC` ENABLE KEYS */;
UNLOCK TABLES;
暂无答案!
目前还没有任何答案,快来回答吧!