我曾尝试通过kafka connect jdbc源代码将7700万条记录从mssql服务器加载到kafka主题。
尝试批处理方法,将batch.max.rows设为1000。在这种情况下,在1000条记录之后,它是整个内存的一部分。请分享如何使它工作的建议
下面是我尝试的连接器方法
curl -X POST http://test.com:8083/connectors -H "Content-Type: application/json" -d '{
"name": "mssql_jdbc_rsitem_pollx",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:sqlserver://test:1433;databaseName=xxx",
"connection.user": "xxxx",
"connection.password": "xxxx",
"topic.prefix": "mssql-rsitem_pollx-",
"mode":"incrementing",
"table.whitelist" : "test",
"timestamp.column.name": "itemid",
"max.poll.records" :"100",
"max.poll.interval.ms":"3000",
"validate.non.null": false
}
}'
curl -X POST http://test.com:8083/connectors -H "Content-Type: application/json" -d '{
"name": "mssql_jdbc_test_polly",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "10",
"connection.url": "jdbc:sqlserver://test:1433;databaseName=xxx;defaultFetchSize=10000;useCursorFetch=true",
"connection.user": "xxxx",
"connection.password": "xxxx",
"topic.prefix": "mssql-rsitem_polly-",
"mode":"incrementing",
"table.whitelist" : "test",
"timestamp.column.name": "itemid",
"poll.interval.ms":"86400000",
"validate.non.null": false
}
}'
1条答案
按热度按时间axkjgtzd1#
尝试增加java堆大小,在命令行中写入:
export KAFKA_HEAP_OPTS="-Xms1g -Xmx2g"
您可以更改“xmx2g”部分以匹配您的容量。