curl -X POST -H "Content-Type: application/json" --data '{
"name": "t1",
"config": {
"tasks.max": "1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"connector.class": "com.github.jcustenborder.kafka.connect.solr.HttpSolrSinkConnector",
"topics": "TRAN",
"solr.queue.size": "100",
"solr.commit.within": "10",
"solr.url": "http://192.168.2.221:27052/solr/TRAN",
"errors.retry.delay.max.ms":"5000",
"errors.retry.timeout":"600000",
"errors.tolerance":"all",
"errors.log.enable":"true",
"errors.log.include.messages":"false",
"errors.deadletterqueue.topic.name":"DEAD_TRAN",
"errors.deadletterqueue.topic.replication.factor":"1",
"retry.backoff.ms":"1000",
"reconnect.backoff.ms":"5000",
"reconnect.backoff.max.ms":"600000"
}
}' http://localhost:8083/connectors
如果solr服务器在运行时关闭,则需要根据连接器配置中的计数重试(不丢失任何数据)。
在我的例子中,当连接器和solr都处于运行状态[活动]时,它工作得很好。但是,虽然只有solr服务器关闭,但在我的数据传递到solr导致数据丢失之前,没有重试过程。。
错误信息如下所示
kafka连接日志中的连接器配置
1条答案
按热度按时间wbrvyc0a1#
我刚检查了房间
SinkTask
特定连接器的实现,它确实抛出RetriableException
在put()
方法。理论上,根据你的连接器配置,它应该阻塞10分钟(
"errors.retry.timeout" : "600000"
). 如果您的solr示例在10分钟内恢复,就不会有任何数据丢失的问题。如果你想在Solr站起来之前完全阻塞你的连接器,你试过设置吗
"errors.retry.timeout" : "-1"
?根据
errors.retry.timeout
:重新尝试失败操作的最大持续时间(毫秒)。默认值为0,这意味着不会重试。使用-1进行无限次重试。
ps:imho如果由于某种原因单个消息永久性地失败了其接收器操作(即:如果接收器拒绝该操作),那么这可能会导致死锁情况。