ACL配置在Kafka连接中不起作用

7d7tgy0s  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(132)

我有一个Kafka集群的单节点与acl启用SASL_PLAINTEXT。下面是我的kakfa-connect和Kafka集群的配置
connect-distributed.properties

bootstrap.servers=<ip>:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=False
value.converter.schemas.enable=False

offset.storage.topic=connect-offsets
offset.storage.replication.factor= 1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=100
offset.storage.partitions=8
config.storage.partitions=1
status.storage.partitions=2
rest.advertised.host.name=<ip>
plugin.path=/opt/kafka/connectors
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="username" \
  password="password";

producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="username" \
  password="password"

server.properties

broker.id=14148
host.name=<ip>
advertised.listeners=SASL_PLAINTEXT://<ip>:9092
listeners=SASL_PLAINTEXT://<ip>:9092
num.network.threads=3
num.io.threads=8
num.partitions=12

default.replication.factor=1
offsets.topic.replication.factor=1

unclean.leader.election.enable=false
auto.create.topics.enable=true
log.dirs=/data/data-kafka/
log.dir=/data/data-kafka/

reserved.broker.max.id=500000

socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600

log.flush.interval.messages=10000
log.flush.interval.ms=1000

log.retention.hours=120
log.retention.bytes=104857600
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=true

zookeeper.connect=<ip>:2181

zookeeper.connection.timeout.ms=10000

delete.topic.enable=true
queued.max.requests=1000

authorizer.class.name=kafka.security.authorizer.AclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin;
allow.everyone.if.no.acl.found=false

下面是连接器配置我击中提交连接器

{
    "name": "connector4",
    "config": {
        "sasl.mechanism": "PLAIN",
        "security.protocol": "SASL_PLAINTEXT",
        "producer.sasl.mechanism": "PLAIN",
        "producer.security.protocol": "SASL_PLAINTEXT",
        "producer.request.timeout.ms": 50000,
        "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";",
        "producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "snapshot.locking.mode": "none",
        "include.schema.changes": "false",
        "schema.history.store.only.captured.tables.ddl": "true",
        "tombstones.on.delete": "false",
        "topic.prefix": "db4",
        "decimal.handling.mode": "double",
        "schema.history.internal.kafka.topic": "schema.history.v4",
         "database.user": "db-user",
        "producer.override.compression.type": "snappy",
        "database.server.id": "1005",
        "event.deserialization.failure.handling.mode": "warn",
        "schema.history.internal.kafka.bootstrap.servers": "<kakfa-ip>:9092",
        "database.port": "3307",
        "inconsistent.schema.handling.mode": "warn",
        "database.hostname": "db-ip",
        "database.password": "db-pwd",
        "name": "connector4",
        "table.include.list": "db1.table_dummy",
        "database.include.list": "db1",
        "snapshot.mode": "when_needed",
        "schema.history.skip.unparseable.ddl": "true"
    }
}

连接器得到了提交,但在检查连接器的状态一段时间后,我可以看到任务失败的消息。

进一步查看Kafka连接日志,我可以看到下面的错误消息。
Cancelled in-flight metadata request with correlation id. Request timed out.
所有的配置都是按照Kafka连接汇合文档,但我仍然无法让它工作。请帮帮我

wlzqhblo

wlzqhblo1#

在浏览了debezium和Kafka connect的源代码后,我得到了它。提交连接器时,需要在连接器配置JSON中添加以下属性

"schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pwd\";",
"schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pwd\";",

理想情况下,这应该在某个地方记录下来。编码愉快!!!将提交同样的debezium乡亲。

相关问题