apachekafka尝试在ksql中创建流时“事务id授权失败”

snz8szmq  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(351)

我在openshift中设置了一个ksql服务器,并连接到本地cloudera kafka集群(cdh6)(kerberized和ssl)。当我执行“列出主题”或“打印”命令时,一切正常。但是,只要我想创建一个流,就会出现以下错误:

Could not write the statement 'create stream dev_abc (date varchar, timestamp varchar, latitude varchar, longitude varchar) WITH (KAFKA_TOPIC='topic123', VALUE_FORMAT='JSON');' into the command topic: Transactional Id authorization failed.
Caused by: Transactional Id authorization failed.

查看日志文件时,我还看到以下错误:

[2020-11-18 11:53:58,090] INFO Processed unsuccessfully: KsqlRequest{ksql='CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<target VARCHAR, errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>, serializationError STRUCT<target VARCHAR, errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, kafkaStreamsThreadError STRUCT<errorMessage VARCHAR, threadName VARCHAR, cause ARRAY<VARCHAR>>>) WITH(KAFKA_TOPIC='service_uykh7k6ksql_processing_log', VALUE_FORMAT='JSON');', configOverrides={}, requestProperties={}, commandSequenceNumber=Optional[-1]}, reason: Could not write the statement 'CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<target VARCHAR, errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>, serializationError STRUCT<target VARCHAR, errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, kafkaStreamsThreadError STRUCT<errorMessage VARCHAR, threadName VARCHAR, cause ARRAY<VARCHAR>>>) WITH(KAFKA_TOPIC='service_abc_processing_log', VALUE_FORMAT='JSON');' into the command topic: Transactional Id authorization failed. (io.confluent.ksql.rest.server.resources.KsqlResource:301)

我现在正在使用以下配置:
图片:confluentinc/ksqldb-server:0.13.0 (但也试过老的)
ksql-server.properties属性:

listeners=http://0.0.0.0:8088

# to avoid Attempted to write a non-default includeClusterAuthorizedOperations at version 7

ksql.access.validator.enable=off

kafka.confluent.support.metrics.enable=false

security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
ssl.truststore.location=/.../.../truststore.jks
ssl.truststore.password=XXXXX
ssl.truststore.type=JKS

sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="blablub.keytab" serviceName="kafka"  principal="principalname";

serviceName="kafka"

principal="principalname";

ksql.service.id=myservicename

# authentication for producers, needed for ksql commands like "Create Stream"

producer.ssl.endpoint.identification.algorithm=HTTPS
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=/.../truststore.jks
producer.ssl.truststore.password=XXXXX
producer.sasl.mechanism=GSSAPI
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="....keytab" serviceName="kafka"  principal="principalname";

# authentication for consumers, needed for ksql commands like "Create Stream"

consumer.ssl.endpoint.identification.algorithm=HTTPS
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=/..../truststore.jks
consumer.ssl.truststore.password=XXXXX
consumer.sasl.mechanism=GSSAPI
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/.....keytab" serviceName="kafka"  principal="principalname";

# ------ Logging config -------

# Automatically create the processing log topic if it does not already exist:

ksql.logging.processing.topic.auto.create=false
ksql.logging.processing.topic.name=abc_processing_log

# Automatically create a stream within KSQL for the processing log:

ksql.logging.processing.stream.auto.create=true

# ------ External service config -------

# The set of Kafka brokers to bootstrap Kafka cluster information from:

bootstrap.servers=.....:9093,.....:9093,......:9093

我发现了很多关于使用kafka acl来避免此类错误的方法,但是cdh不支持这种方法。你对如何继续分析有什么想法吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题