无法从Postgres读取数据到Kafka。我通过docker中的debezium将Kafka连接到我的Postgres数据库。但是当我在docker中运行kafkacat来读取postgres时,我得到一个错误
ERROR: Failed to format message in postgres.public.users [0] at offset 0: Avro/Schema-registry message deserialization: REST request failed (code -1): HTTP request failed: Couldn't resolve host name : terminating
我运行kafkacat的命令:
docker run --tty --network pythonproject5_default confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -s key=s -s value=avro -r http://schema-regisrty:8081 -t postgres.public.users
debezium连接器文件如下所示
{
"name": "db-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "George",
"database.password": "tech1337",
"database.dbname": "tech_db",
"database.server.name": "postgres",
"table.include.list": "public.users"
}
}
应用程序中的模式如下所示:
name: str
time_created: int
gender: str
age: int
last_name: str
ip: str
city: str
premium: bool = None
birth_day: str
balance: int
user_id: int
然后这样建模:
class User(Base):
__tablename__ = 'users'
name = Column(String)
time_created = Column(Integer)
gender = Column(String)
age = Column(Integer)
last_name = Column(String)
ip = Column(String)
city = Column(String)
premium = Column(Boolean)
birth_day = Column(String)
user_id = Column(Integer, primary_key=True, index=True)
my_vet = relationship("VET", back_populates="owner")
Docker合成文件:
version: "3.7"
services:
postgres:
image: debezium/postgres:13
ports:
- 5432:5432
environment:
- POSTGRES_USER=goerge
- POSTGRES_PASSWORD=tech1337
- POSTGRES_DB=5_pm_db
zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
- "5056:5056"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092 \
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
debezium:
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema- registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on: [kafka]
ports:
- 8083:8083
schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema- registry:8081,http://localhost:8081
ports:
- 8081:8081
depends_on: [zookeeper, kafka]
所有的操作都是用Postgres的代码,我在SQLAlchemy中实现的。如果有人得到这个错误,请写你如何处理这个错误,我如何修复这个错误?
1条答案
按热度按时间h7appiyu1#
Avro/架构注册表消息反序列化:REST请求失败
这需要为
http://0.0.0.0:8081
,这是默认值。此外,您还需要删除
schema- registry
中所有其他使用它的地方的空格。或者,不要使用AvroConverter,而使用其他不需要注册表的工具,如JSONConverter
您还可以移除
broker
服务,因为它从未在合成中使用过