kafka connect中的acl配置不起作用

voase2hg  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(340)

我为3节点kafka集群设置了acl,能够通过生产者控制台和消费者控制台发送和接收主题。现在我想用acl配置kafka connect。我尝试了sasl\u纯文本组合,在connect.log文件中显示了以下错误。它不是从源表同步到主题,请帮助我在哪里缺少任何配置。
错误日志

[2020-10-14 07:24:35,874] ERROR WorkerSourceTask{id=oracle-jdbc-source-mtx_domains_acl5-0} Failed to flush, timed out while waiting for producer to flush outstanding 1
messages (org.apache.kafka.connect.runtime.WorkerSourceTask:448)
[2020-10-14 07:24:35,874] ERROR WorkerSourceTask{id=oracle-jdbc-source-mtx_domains_acl5-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCo
mmitter:116)"

我的配置如下文件所示。我在jaas.conf文件中提到了用户,并将其设置到环境中。
1:zookeeper.properties。

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
zookeeper.set.acl=true
jaasLoginRenew=3600000

2:服务器属性

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://<server_name>:9092
host.name=server_ip

3:schema-registry.properties

kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.sasl.mechanism=PLAIN
metadataServerUrls=SASL_PLAINTEXT://<server_ip>:9092
zookeeper.set.acl=true
kafkastore.group.id=schema-registry-3

4:connect-avro-distributed.properties

sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT

5:源连接器脚本

curl -X POST -H "Content-Type: application/json" --data '{    "name":"oracle-jdbc-source-mtx_domains_acl5",    "config":{       "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",       "tasks.max":"1",       "connection.url":"jdbc:oracle:thin:@<ip>:<port>:<dbname>",       "connection.user":"<username>",        "connection.password":"password",     "numeric.mapping":"best_fit",       "table.whitelist":"TABLENAME",       "mode":"timestamp",       "timestamp.column.name":"CREATED_ON",      "topic.prefix":"",       "validate.non.null":"false",       "transforms":"createKey",       "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",       "transforms.createKey.fields":"DOMAIN_CODE", "sasl.mechanism":"PLAIN", "security.protocol":"SASL_PLAINTEXT","producer.sasl.mechanism":"PLAIN", "producer.security.protocol":"SASL_PLAINTEXT","producer.request.timeout.ms":50000,
"producer.retry.backoff.ms":500, "offset.flush.timeout.ms":50000,"producer.buffer.memory":100,
"sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producer\";",
"producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producer\";", "key.converter.schemas.enable":"true",       "value.converter.schemas.enable":"true","delete.enabled":"true","key.converter":"io.confluent.connect.avro.AvroConverter",       "key.converter.schema.registry.url":"http://localhost:8081",       "value.converter":"io.confluent.connect.avro.AvroConverter",       "value.converter.schema.registry.url":"http://localhost:8081"    } }' http://localhost:8083/connectors
flmtquvp

flmtquvp1#

您需要将以下属性添加到 connect-distributed.properties :

sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

来源:Kafka连接安全文档

相关问题