所以我在docker里设置了zookeeper,kafka,postgres,elasticsearch,debezium,kafka connect。问题是一个表没有完全导出到elasticsearch。我正在使用 kafka-connect-elasticsearch
作为Flume连接器。观察kafka connect日志,整个表从源数据库完全导出到kafka,但是几乎有1/3的表没有导出到elasticsearch。日志显示
connect | 2020-08-07 17:01:16,520 WARN || Ignoring version conflicts for items: [Key{dbserver1.public.api_product/api_product/313019}] [io.confluent.connect.elasticsearch.jest.JestElasticsearchClient]
各种产品示例的消息不断重复,甚至一个小时后也不会结束。
下面是我的docker配置文件
version: '3.3'
services:
zookeeper:
container_name: zookeeper
ports:
- '2181:2181'
- '2888:2888'
- '3888:3888'
image: 'debezium/zookeeper:1.2'
kafka:
container_name: kafka
ports:
- '9092:9092'
links:
- 'zookeeper:zookeeper'
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
image: 'debezium/kafka:1.2'
postgres:
container_name: postgres
ports:
- '5432:5432'
environment:
- POSTGRES_USER=pgUser
- POSTGRES_PASSWORD=pgPassword
- POSTGRES_DB=pgDB
image: debezium/postgres:11
elasticdbz:
container_name: elasticdbz
ports:
- '8881:8881'
- '9300:9300'
environment:
- http.host=0.0.0.0
- transport.host=127.0.0.1
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
image: 'docker.elastic.co/elasticsearch/elasticsearch:7.3.0'
connect:
container_name: connect
ports:
- '8083:8083'
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
links:
- 'zookeeper:zookeeper'
- 'kafka:kafka'
- 'postgres:postgres'
- 'elasticdbz:elasticdbz'
image: 'debezium/connect:1.2'
我的源连接器配置:
{
"name": "my-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "amazon-rds-host*****",
"database.port": "5432",
"database.user": "my_user",
"database.password": "*************",
"database.dbname" : "dbname",
"database.server.name": "dbserver1",
"schema.whitelist": "public",
"table.whitelist": "public.api_product",
"plugin.name": "pgoutput"
}
}
elasticsearch连接器:
{
"name": "elastic-sink-product",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "dbserver1.public.api_product",
"connection.url": "http://remote-server-ip",
"connection.username": "username",
"connection.password": "**********",
"behavior.on.null.values": "delete",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"type.name": "api_product"
}
}
同时,我的postgres中的另一个表也顺利地导出到elasticsearch。
任何帮助都将非常感谢,因为这是我第一次使用这些服务。
注意:似乎在使用 "write.method": "upsert"
在ElasticSearch连接器配置中修复了这个问题。但我可以看到这种方法是缓慢的,正如文件中提到的。
暂无答案!
目前还没有任何答案,快来回答吧!