我有一个Kafka集群的单节点与acl启用SASL_PLAINTEXT。下面是我的kakfa-connect和Kafka集群的配置
connect-distributed.properties
bootstrap.servers=<ip>:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=False
value.converter.schemas.enable=False
offset.storage.topic=connect-offsets
offset.storage.replication.factor= 1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=100
offset.storage.partitions=8
config.storage.partitions=1
status.storage.partitions=2
rest.advertised.host.name=<ip>
plugin.path=/opt/kafka/connectors
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="password";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="password"
server.properties
broker.id=14148
host.name=<ip>
advertised.listeners=SASL_PLAINTEXT://<ip>:9092
listeners=SASL_PLAINTEXT://<ip>:9092
num.network.threads=3
num.io.threads=8
num.partitions=12
default.replication.factor=1
offsets.topic.replication.factor=1
unclean.leader.election.enable=false
auto.create.topics.enable=true
log.dirs=/data/data-kafka/
log.dir=/data/data-kafka/
reserved.broker.max.id=500000
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=120
log.retention.bytes=104857600
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=true
zookeeper.connect=<ip>:2181
zookeeper.connection.timeout.ms=10000
delete.topic.enable=true
queued.max.requests=1000
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin;
allow.everyone.if.no.acl.found=false
下面是连接器配置我击中提交连接器
{
"name": "connector4",
"config": {
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_PLAINTEXT",
"producer.sasl.mechanism": "PLAIN",
"producer.security.protocol": "SASL_PLAINTEXT",
"producer.request.timeout.ms": 50000,
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";",
"producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.locking.mode": "none",
"include.schema.changes": "false",
"schema.history.store.only.captured.tables.ddl": "true",
"tombstones.on.delete": "false",
"topic.prefix": "db4",
"decimal.handling.mode": "double",
"schema.history.internal.kafka.topic": "schema.history.v4",
"database.user": "db-user",
"producer.override.compression.type": "snappy",
"database.server.id": "1005",
"event.deserialization.failure.handling.mode": "warn",
"schema.history.internal.kafka.bootstrap.servers": "<kakfa-ip>:9092",
"database.port": "3307",
"inconsistent.schema.handling.mode": "warn",
"database.hostname": "db-ip",
"database.password": "db-pwd",
"name": "connector4",
"table.include.list": "db1.table_dummy",
"database.include.list": "db1",
"snapshot.mode": "when_needed",
"schema.history.skip.unparseable.ddl": "true"
}
}
连接器得到了提交,但在检查连接器的状态一段时间后,我可以看到任务失败的消息。
进一步查看Kafka连接日志,我可以看到下面的错误消息。Cancelled in-flight metadata request with correlation id. Request timed out.
所有的配置都是按照Kafka连接汇合文档,但我仍然无法让它工作。请帮帮我
1条答案
按热度按时间wlzqhblo1#
在浏览了debezium和Kafka connect的源代码后,我得到了它。提交连接器时,需要在连接器配置JSON中添加以下属性
理想情况下,这应该在某个地方记录下来。编码愉快!!!将提交同样的debezium乡亲。