sql server—提高debezium的吞吐量

vpfxa7rd  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(444)

我最近开始使用debezium来处理实时捕获更改数据并将其放入目标数据库。
我使用带有kafka connect的azure event hub连接sqlserver,并使用合流jdbc将更改的数据接收到目标数据库sqlserver,而不是kafka。
我知道debezium异步对数据库性能的影响较小,但是有什么方法可以提高流的吞吐量吗?
最近,我把事件中心的最小吞吐量单位是10和自动充气到20。因此,我预计debezium+kafka connect+EventHubs的流量可以达到10mb-20mb/秒,出口应该达到20-40mb/秒。
然而,真正的表现是最差的。我手动将10k的记录导入到源数据库,该数据库小于6mb。因此,我希望带有sink connector的debezium能够捕获这些更改,并在几秒钟内接收到目标数据库。
接收器连接器不是一次获取数据,而是手动将数据更新到目标数据库。
下面是我的配置。如果需要更改配置以提高性能,请告诉我。任何帮助都将不胜感激。
Kafka连接:Kafka
bootstrap.servers=sqldbcdc.servicebus.windows。net:9093

group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists

config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000
connections.max.idle.ms=180000
metadata.max.age.ms=180000
auto.register.schemas=false

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://sqldbcdc.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************************=";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://sqldbcdc.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************************=";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://sqldbcdc.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************************=";

plugin.path=C:\kafka\libs

sql连接器:

{
 "name": "sql-server-connection", 
 "config": {
 "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
 "tasks.max": "1",
 "database.hostname": "localhost",
 "database.port": "1433",
 "database.user": "sa",
 "database.password": "******",
 "database.dbname": "demodb",
 "database.server.name": "dbservername",
 "table.whitelist": "dbo.portfolios",
 "database.history":"io.debezium.relational.history.MemoryDatabaseHistory",
 "transforms": "route",
 "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
 "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
 "transforms.route.replacement": "$3"
 }
}

Flume连接器:

{
  "name": "jdbc-sink",
  "config":{
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "tasks.max": "1",
      "topics": "portfolios",
      "connection.url": "jdbc:sqlserver://localhost:1433;instance=NEWMSSQLSERVER;databaseName=demodb",
      "connection.user":"sa",
      "connection.password":"*****",
      "batch.size":2000,
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": false,
      "transforms.unwrap.delete.handling.mode": "none",
      "auto.create": "true",
      "insert.mode": "upsert",
      "delete.enabled":true,
      "pk.fields": "portfolio_id",
      "pk.mode": "record_key",
      "table.name.format": "replicated_portfolios"
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题