kafka jdbc接收器连接器,批量插入值

ar7v8xwq  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(505)

我每秒收到很多消息(通过http协议)(50000-100000),并希望将它们保存到postgresql。为此,我决定使用kafka jdbc sink。
消息由一条记录保存到数据库中,而不是成批保存。我想在postgresql中批量插入500-1000条记录。
我在这个问题上找到了一些答案:如何使用batch.size?
我尝试在配置中使用相关选项,但似乎没有任何效果。
我的kafka jdbc sink postgresql配置( etc/kafka-connect-jdbc/postgres.properties ):

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3

# The topics to consume from - required for sink connectors like this one

topics=jsonb_pkgs

connection.url=jdbc:postgresql://localhost:5432/test?currentSchema=test
auto.create=false
auto.evolve=false

insert.mode=insert
connection.user=postgres
table.name.format=${topic}

connection.password=pwd

batch.size=500

# based on 500*3000byte message size

fetch.min.bytes=1500000
fetch.wait.max.ms=1500
max.poll.records=4000

我还添加了选项 connect-distributed.properties :

consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500

尽管每个分区每秒获得1000条以上的记录,但每个分区将记录保存到postgresql。
编辑:使用者选项以正确的名称添加到其他文件中
我还添加了选项 etc/schema-registry/connect-avro-standalone.properties :


# based on 500*3000 byte message size

consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000
rqqzpn5f

rqqzpn5f1#

我意识到我误解了文件。这些记录被逐一插入数据库。一个事务中插入的记录的计数取决于 batch.size 以及 consumer.max.poll.records . 我希望批插入是以另一种方式实现的。我想选择插入这样的记录:

INSERT INTO table1 (First, Last)
VALUES
    ('Fred', 'Smith'),
    ('John', 'Smith'),
    ('Michael', 'Smith'),
    ('Robert', 'Smith');

但这似乎是不可能的。

相关问题