我最近开始使用debezium来处理实时捕获更改数据并将其放入目标数据库。
我使用带有kafka connect的azure event hub连接sqlserver,并使用合流jdbc将更改的数据接收到目标数据库sqlserver,而不是kafka。
我知道debezium异步对数据库性能的影响较小,但是有什么方法可以提高流的吞吐量吗?
最近,我把事件中心的最小吞吐量单位是10和自动充气到20。因此,我预计debezium+kafka connect+EventHubs的流量可以达到10mb-20mb/秒,出口应该达到20-40mb/秒。
然而,真正的表现是最差的。我手动将10k的记录导入到源数据库,该数据库小于6mb。因此,我希望带有sink connector的debezium能够捕获这些更改,并在几秒钟内接收到目标数据库。
接收器连接器不是一次获取数据,而是手动将数据更新到目标数据库。
下面是我的配置。如果需要更改配置以提高性能,请告诉我。任何帮助都将不胜感激。
Kafka连接:Kafka
bootstrap.servers=sqldbcdc.servicebus.windows。net:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
connections.max.idle.ms=180000
metadata.max.age.ms=180000
auto.register.schemas=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://sqldbcdc.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************************=";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://sqldbcdc.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************************=";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://sqldbcdc.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************************=";
plugin.path=C:\kafka\libs
sql连接器:
{
"name": "sql-server-connection",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "1433",
"database.user": "sa",
"database.password": "******",
"database.dbname": "demodb",
"database.server.name": "dbservername",
"table.whitelist": "dbo.portfolios",
"database.history":"io.debezium.relational.history.MemoryDatabaseHistory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
Flume连接器:
{
"name": "jdbc-sink",
"config":{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "portfolios",
"connection.url": "jdbc:sqlserver://localhost:1433;instance=NEWMSSQLSERVER;databaseName=demodb",
"connection.user":"sa",
"connection.password":"*****",
"batch.size":2000,
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"transforms.unwrap.delete.handling.mode": "none",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled":true,
"pk.fields": "portfolio_id",
"pk.mode": "record_key",
"table.name.format": "replicated_portfolios"
}
}
暂无答案!
目前还没有任何答案,快来回答吧!