我使用kafka connect添加了jdbc源连接器,kafka connect从mysql表中获取数据。
连接器添加成功,数据在其中实时流动。
但相应的主题并不包含所有以前的数据。
我试过重新启动Kafka连接。
以下是我的jdbc源连接器配置:
{
"name": "kb_yp_loan",
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "XXXX",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "XXXX",
"config.action.reload": "restart",
"connection.url": "jdbc:mysql://XXXX/XX?user=XXXX&password=XXXX&useCursorFetch=true&defaultFetchSize=1000",
"connection.user": "XXXX",
"connection.password": "XXXX",
"table.whitelist": "yp_loan",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "kb_"
}
请建议我如何把所有以前的数据在我的Kafka主题。
1条答案
按热度按时间mwyxok5s1#
在
connect-distributed.properties
,您需要添加consumer.auto.offset.reset=earliest
启动connect consumer示例以读取所有现有主题数据。否则,连接器将从当前的最新主题偏移开始。