我目前正在为ksqlDB构建一个用例原型,该用例应该从一个填充了String
键和Avro
值的Kafka主题中读取。
我正在使用以下docker-compose.yml
来设置我的开发环境:
version: "3.9"
services:
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
- "29092:29092"
links:
- zookeeper
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_PORT: 9092
KAFKA_LISTENERS:
INTERNAL://:9092,
EXTERNAL://:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
INTERNAL:PLAINTEXT,
EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_LISTENERS:
INTERNAL://kafka:9092,
EXTERNAL://localhost:29092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
networks:
- network1
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
- network1
cp-schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
hostname: schema-registry
container_name: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: user
SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry-Auth
SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/jaas_config.conf
SCHEMA_REGISTRY_DEBUG: 'true'
volumes:
- './schema_registry:/etc/kafka/secrets:rw'
networks:
- network1
ksqldb-primary:
image: confluentinc/ksqldb-server:0.29.0
hostname: ksqldb-primary
depends_on:
- kafka
- cp-schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka:9092
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: user:user
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
networks:
- network1
# Access the cli by running:
# > docker compose exec ksqldb-cli ksql http://ksqldb-primary:8088
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.29.0
container_name: ksqldb-cli
depends_on:
- ksqldb-primary
entrypoint: /bin/sh
tty: true
networks:
- network1
redpanda:
image: vectorized/console:v2.3.5
ports:
- "5000:8080"
environment:
# All config properties can be entered via env
KAFKA_BROKERS: kafka:9092
KAFKA_SCHEMAREGISTRY_ENABLED: "true"
KAFKA_SCHEMAREGISTRY_URLS: http://schema-registry:8081
KAFKA_SCHEMAREGISTRY_USERNAME: user
KAFKA_SCHEMAREGISTRY_PASSWORD: user
networks:
- network1
networks:
network1:
ipam:
config:
- subnet: 172.177.0.0/16
字符串
我正在使用以下服务:
- Kafka
kafka_2.13-2.8.1
- Zookeeper
- 融合架构注册表
7.2.1
- ksql数据库
0.29.0
- 数据库命令行界面
0.29.0
- Redpanda控制台用户界面是一个很好的Kafka用户界面,可以看到我的主题和配置。
我创建了一个名为table-receipts
的主题,并在Confluent Schema Registry中注册了一个名为table-receipts-value
的模式:
GET localhost:8081/subjects/table-receipts-value/versions/1
{
"subject": "table-receipts-value",
"version": 1,
"id": 7,
"schema": "--omitted--"
}
型
我使用io.confluent.kafka.serializers.KafkaAvroSerializer
编写这些数据,尽管我手动执行了此序列化,但它是有效的:
的数据
到目前为止,一切都很好。
之后,我尝试在ksqlDB中创建一个流来处理此主题:
CREATE STREAM table_receipts WITH (KAFKA_TOPIC='table-receipts', VALUE_FORMAT='AVRO');
型
并立即遇到以下错误:
Schema for message values on topic 'table-receipts' does not exist in the Schema Registry.);
Subject: table-receipts-value
Possible causes include:
- The topic itself does not exist
-> Use SHOW TOPICS; to check
- Messages on the topic are not serialized using a format Schema Registry supports
-> Use PRINT 'table-receipts' FROM BEGINNING; to verify
- Messages on the topic have not been serialized using a Confluent Schema Registry supported serializer
-> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
- The schema is registered on a different instance of the Schema Registry
-> Use the REST API to list available subjects https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects
- You do not have permissions to access the Schema Registry.
-> See https://docs.confluent.io/current/schema-registry/docs/security.html
ksql> PRINT 'table-receipts' FROM BEGINNING;
Key format: SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: does not match any supported format. It may be a STRING with encoding other than UTF8, or some other format.
rowtime: 2023/10/26 13:57:56.322 Z, key: [2c08af31-48fb-41b6-9@3546975261674595891/3774636806494052912], value: \x00\x00\x00\x00\x07\x90\xBE\x9B\xC9\xD5b\xD2\xDE\x9A\xC9\xD5b\x00H39b668b8-8bcd-406f-9449-973eac83d152\x1012345678\x06EUR\x04\x17p\x02\x02\x04\x17p\x00\x04\x17p\x00\x02\x0E50% off\x02\x04\x17p\x00\x02\x04\x02He3a7158b-0aeb-40c9-876f-0bd5b357d861\x02\x04\x1BX\x02\x0EPRODUCT\x02\x04\x0B\xB8\x00\x02H5db87916-f3ff-48c4-9163-1f6e3b99adde,Alkoholische Getr\xC3\xA4nke\x00\x04\x15\xCC\x02Hc2ac3dfc-c176-4655-b725-ae6eb01c60a7H5db87916-f3ff-48c4-9163-1f6e3b99adde\x12Cocktails\x00\x02He3a7158b-0aeb-40c9-876f-0bd5b357d861\x18Whiskey Sour\x081234Hc2ac3dfc-c176-4655-b725-ae6eb01c60a7\x00H61ef9eeb-ba7f-4e8b-98c1-fcd15a97d650H2c08af31-48fb-41b6-94b4-71b019c536f3\x020\x04\x17p\x02\x04\x15\xCCH694b9be0-903f-463b-8c47-4ab07e3b7d70\x04\x17p\x04\x1BX\x04\x01\xA4\x00\x10John Doe\x0412, partition: 9
^CTopic printing ceased
型
- 如果我从模式注册表中删除了基本身份验证 *,则一切正常:
ksql> CREATE STREAM table_receipts WITH (KAFKA_TOPIC='table-receipts', VALUE_FORMAT='AVRO');
Message
----------------
Stream created
----------------
型
然而,根据本页(部分底部),似乎我使用了正确的设置来传递基本的auth参数。
我如何将基本的auth凭据传递给ksqlDB模式注册表客户端?所以,我猜问题是ksqlDB模式注册表客户端没有获得正确的参数。
1条答案
按热度按时间k5hmc34c1#
最后我失败了。
文档中描述的设置参考了
confluentinc/cp-ksqldb-server
docker镜像。我使用的是“plain”
confluentinc/ksqldb-server
。无论出于什么原因,似乎都不支持将基本的auth / USER_INFO凭据传递给模式注册中心客户端,或者至少我无法找到应该如何完成。将图像切换到
confluentinc/cp-ksqldb-server:7.5.1
,这样我就可以毫无问题地创建流。我会让这个开放的时间更长一点,以防有人想权衡。我想有时它有助于只是通过整个过程中的头部从一开始。