我正在尝试将mysql表下沉到elasticsearch。我的table有100多万张唱片。问题是我的elasticsearch在插入了大约300条记录之后就再也找不到记录了。我知道我第一次运行它时,它确实运行了所有的记录。当我在删除es索引后再次尝试这样做时,发生了这种情况。我已尝试将更新\u ts字段重置为新的时间戳。我在Flume里试过补偿值。似乎什么都没用。
这是我的源文件
{
"name": "items3",
"config": {
"_comment": "The JDBC connector class. Don't change this if you want to use the JDBC Source.",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"_comment": "How to serialise the value of keys - here use the Confluent Avro serialiser. Note that the JDBC Source Connector always returns null for the key ",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"_comment": "Since we're using Avro serialisation, we need to specify the Confluent schema registry at which the created schema is to be stored. NB Schema Registry and Avro serialiser are both part of Confluent Open Source.",
"key.converter.schema.registry.url": "http://localhost:8081",
"_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"_comment": " --- JDBC-specific configuration below here --- ",
"_comment": "JDBC connection URL. This will vary by RDBMS. Consult your manufacturer's handbook for more information",
"connection.url": "jdbc:mysql://localhost:3306/db?user=user&password=password",
"_comment": "Which table(s) to include",
"table.whitelist": "items",
"_comment": "Pull all rows based on an timestamp column. You can also do bulk or incrementing column-based extracts. For more information, see http://docs.confluent.io/current/connect/connect-jdbc/docs/source_config_options.html#mode",
"mode": "timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "update_ts",
"_comment": "If the column is not defined as NOT NULL, tell the connector to ignore this ",
"validate.non.null": "true",
"_comment": "The Kafka topic will be made up of this prefix, plus the table name ",
"topic.prefix": "kafka-",
"auto.offset.reset" : "earliest"
}
}
这是我的Flume
{
"name": "items-sink",
"config": {
"_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"_comment": "--- Elasticsearch-specific config ---",
"_comment": "Elasticsearch server address",
"connection.url": "http://localhost:9200",
"_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist ",
"type.name": "items",
"_comment": "Which topic to stream data from into Elasticsearch",
"topics": "kafka-items",
"auto.offset.reset" : "earliest",
"_comment": "If the Kafka message doesn't have a key (as is the case with JDBC source) you need to specify key.ignore=true. If you don't, you'll get an error from the Connect task: 'ConnectException: Key is used as document id and can not be null.",
"key.ignore": "true"
}
}
如您所见,我正在尝试将auto.offset.reset设置为最早,因此如果它以某种方式跟踪我的记录,它将重新开始,但都是徒劳的。
1条答案
按热度按时间t5zmwmid1#
"auto.offset.reset" : "earliest"
只能在内部使用connect-distributed.properties
文件,而不是json连接器配置在这个文件中,因为它是一个消费者配置,所以它被命名为
consumer.auto.offset.reset
.此外,消费者组Map到
name
字段,所以除非更改,否则您将继续使用相同名称的前一个字段,直到重置组偏移或更改名称。默认情况下,组名为connect-${connector_name}