如何使kafka sink connector与postgres的avro序列化键和值一起工作

gk7wooem  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(349)

**结案。**此问题不可复制或由打字错误引起。它目前不接受答案。
**想改进这个问题吗?**更新问题,使其成为堆栈溢出的主题。

去年关门了。
改进这个问题
我有一个kafka主题,其中包含带有avro序列化键和avro序列化值的消息。
我正在尝试设置一个sink连接器,将这些值放到postgres数据库的表中(在本例中是awsrds)。
我已经尝试了一些关于主题、消息和接收器配置的变化,但是看看下面的例子,如果有人能提供指导我在哪里出错,那就太好了!
我的主题具有以下架构(在架构注册表中)。。。
密钥架构

{
    "type": "record",
    "name": "TestTopicKey",
    "namespace": "test.messaging.avro",
    "doc": "Test key schema.",
    "fields": [
        {
            "name": "unitId",
            "type": "int"
        }
    ]
}

值架构

{
    "type": "record",
    "name": "TestTopicValues",
    "namespace": "test.messaging.avro",
    "doc": "Test value schema.",
    "fields": [
        {
            "name": "unitPrice",
            "type": "int",
            "doc": "Price in AUD excluding GST."
        },
        {
            "name": "unitDescription",
            "type": "string"
        }
    ]
}

我使用“kafka avro console producer”手动生成主题的记录,如下所示:

/bin/kafka-avro-console-producer --broker-list kafka-box-one:9092 --topic test.units --property parse.key=true --property "key.separator=|" --property "schema.registry.url=http://kafka-box-one:8081" --property key.schema='{"type":"record","name":"TestTopicKey","namespace":"test.messaging.avro","doc":"Test key schema.","fields":[{"name":"unitId","type":"int"}]}' --property value.schema='{"type":"record","name":"TestTopicValues","namespace":"test.messaging.avro","doc":"Test value schema.","fields":[{"name":"unitPrice","type":"int","doc":"Price in AUD excluding GST."},{"name":"unitDescription","type":"string"}]}'

一旦制作者启动,我就可以成功地将记录添加到主题中,如下所示:

{"unitId":111}|{"unitPrice":15600,"unitDescription":"A large widget thingy."}

注:我也可以成功地消费Kafkaavro控制台消费者的期望。
我正在尝试的postgres表如下所示:

CREATE TABLE test_area.unit_prices (
    unitId int4 NOT NULL,
    unitPrice int4 NULL,
    unitDescription text NULL,
    CONSTRAINT unit_prices_unitid_pk PRIMARY KEY (unitId)
);

我的Flume连接器如下所示:

{
  "name": "test.area.unit.prices.v01",
  "config": {
      "connector.class": "JdbcSinkConnector",
      "topics": "test.units",
      "group.id": "test.area.unit.prices.v01",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url": "http://kafka-box-one:8081",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://kafka-box-one:8081",
      "connection.user": "KafkaSinkUser",
      "connection.password": "KafkaSinkPassword",
      "connection.url": "jdbc:postgresql://unit-catalogue.abcdefghij.my-region-1.rds.amazonaws.com:5432/unit_sales?currentSchema=test_area",
      "table.name.format": "unit_prices",
      "auto.create": false,
      "auto.evole": "false"
  }
}

我的期望是,在sink显示为running之后不久,postgres表中就会出现记录。然而,没有任何东西在下沉。
附加说明:
我可以使用usql从kafka连接框连接并写入postgres rds示例,在kafka连接框上发布此sink connector,并使用sink connector的凭据。
接收器连接器状态为“running”,这向我表明接收器语法中没有错误。

zynd9foi

zynd9foi1#

很抱歉,这个答复太晚了。在最终让日志工作之后,这是一个代理问题。谢谢大家的帮助。

相关问题