无法将数据从Postgres读取到Kafka

332nm8kg  于 2022-12-17  发布在  Apache
关注(0)|答案(1)|浏览(119)

无法从Postgres读取数据到Kafka。我通过docker中的debezium将Kafka连接到我的Postgres数据库。但是当我在docker中运行kafkacat来读取postgres时,我得到一个错误

ERROR: Failed to format message in postgres.public.users [0] at offset 0:  Avro/Schema-registry message deserialization: REST request failed (code -1): HTTP request failed: Couldn't resolve host name : terminating

我运行kafkacat的命令:

docker run --tty --network pythonproject5_default confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -s key=s -s value=avro -r http://schema-regisrty:8081 -t postgres.public.users

debezium连接器文件如下所示

{
"name": "db-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "George",
"database.password": "tech1337",
"database.dbname": "tech_db",
"database.server.name": "postgres",
"table.include.list": "public.users"
}
}

应用程序中的模式如下所示:

name: str
time_created: int
gender: str
age: int
last_name: str
ip: str
city: str
premium: bool = None
birth_day: str
balance: int
user_id: int

然后这样建模:

class User(Base):
    __tablename__ = 'users'
    name = Column(String)
    time_created = Column(Integer)
    gender = Column(String)
    age = Column(Integer)
    last_name = Column(String)
    ip = Column(String)
    city = Column(String)
    premium = Column(Boolean)
    birth_day = Column(String)
    user_id = Column(Integer, primary_key=True, index=True)
    my_vet = relationship("VET", back_populates="owner")

Docker合成文件:

version: "3.7"
services:
  postgres:
    image: debezium/postgres:13
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=goerge
      - POSTGRES_PASSWORD=tech1337
      - POSTGRES_DB=5_pm_db

  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  broker:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker
    ports:
      - "5056:5056"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:   PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092 \
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  kafka:
    image: confluentinc/cp-enterprise-kafka:5.5.3
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
      - 9092:9092

  debezium:
    image: debezium/connect:1.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema- registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:  http://schema-registry:8081
    depends_on: [kafka]
    ports:
      - 8083:8083

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    environment:
     - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
     - SCHEMA_REGISTRY_HOST_NAME=schema-registry
     - SCHEMA_REGISTRY_LISTENERS=http://schema- registry:8081,http://localhost:8081
    ports:
      - 8081:8081
    depends_on: [zookeeper, kafka]

所有的操作都是用Postgres的代码,我在SQLAlchemy中实现的。如果有人得到这个错误,请写你如何处理这个错误,我如何修复这个错误?

h7appiyu

h7appiyu1#

Avro/架构注册表消息反序列化:REST请求失败
这需要为http://0.0.0.0:8081,这是默认值。

SCHEMA_REGISTRY_LISTENERS=http://schema- registry:8081,http://localhost:8081

此外,您还需要删除schema- registry中所有其他使用它的地方的空格。
或者,不要使用AvroConverter,而使用其他不需要注册表的工具,如JSONConverter

KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter

您还可以移除broker服务,因为它从未在合成中使用过

相关问题