我有一个奇怪的问题,Kafka->ElasticSearch连接器。第一次启动时一切都很好,我在elasticsearch中收到了一个新数据,并通过kibana dashboard进行了检查,但当我在kafka中使用相同的producer应用程序生成新数据并再次尝试启动connector时,我在elasticsearch中没有得到任何新数据。现在我遇到了这样的错误:
[2018-02-04 21:38:04,987] ERROR WorkerSinkTask{id=log-platform-elastic-0} Commit of offsets threw an unexpected exception for sequence number 14: null (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 15805
我正在使用下一个命令运行连接器:
/usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties log-platform-elastic.properties
connect-avro-standalone.properties:
bootstrap.servers=kafka-0.kafka-hs:9093,kafka-1.kafka-hs:9093,kafka-2.kafka-hs:9093
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
# rest.host.name=
rest.port=8084
# rest.advertised.host.name=
# rest.advertised.port=
plugin.path=/usr/share/java
和log-platform-elastic.properties:
name=log-platform-elastic
key.converter=org.apache.kafka.connect.storage.StringConverter
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=member_sync_log, order_history_sync_log # ... and many others
key.ignore=true
connection.url=http://elasticsearch:9200
type.name=log
我检查了与kafka代理、elasticsearch和schema registry的连接(此时schema registry和connector在同一个主机上),一切正常。kafka代理在端口9093上运行,我可以使用kafka avro控制台从主题中读取数据。在这件事上如有任何帮助,我将不胜感激!
2条答案
按热度按时间iqjalb3h1#
我们可以优化ElasticSearch配置来解决这个问题。有关配置参数,请参阅下面的链接
https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/configuration_options.html
以下是关键参数,可以控制消息速率流,最终帮助解决问题:
flush.timeout.ms:增加冲水时间可能有助于增加呼吸
用于定期刷新的超时(以毫秒为单位),以及在添加记录时等待已完成请求提供可用的缓冲区空间的超时。如果超过此超时,任务将失败。
max.buffered.records:尝试减少缓冲区记录限制
在阻止接受更多记录之前,每个任务将缓冲的最大记录数。此配置可用于限制每个任务的内存使用
batch.size:尝试减小批大小
写入elasticsearch时要批处理的记录数
tasks.max:减少或增加并行线程(使用者示例)的数量。这将控制ElasticSearch,如果带宽不能处理减少任务可能会有所帮助。
它通过调整上述参数解决了我的问题
jvidinwx2#
只需将flush.timeout.ms更新为大于10000(默认值为10秒)
根据文件:
flush.timeout.ms用于定期刷新的超时(以毫秒为单位),以及在添加记录时等待已完成请求提供缓冲区空间的超时。如果超过此超时,任务将失败。
类型:长默认值:10000重要性:低
见文件