SQL Server 带有MSSQL的Confluent JDBC连接器仅可传输100行数据

wfveoks0  于 2022-12-17  发布在  其他
关注(0)|答案(1)|浏览(120)

我有一个confluent JDBC源连接器,我正在使用MSSQL数据库,问题是它抛出下面的错误,只要它读取100行。我感到困惑,因为我没有遇到过这个问题与任何其他数据库的相同连接器。

Error: WorkerSinkTask{id=Sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196)
java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1535)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at org.apache.kafka.common.utils.Time.timer(Time.java:79)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:321)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1454)
[2022-12-14 14:35:43,647] INFO [Sink|task-0] Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:161)

尝试的解决方案:

  • 我已经尝试按照一些论坛中的建议增加批处理大小,但仍然存在相同的问题。*
puruo6ea

puruo6ea1#

java.lang.OutOfMemoryError:Java堆空间
这意味着批处理大小对于Connect工作进程来说太大。增加批处理将使问题恶化,因为这意味着需要更多内存。
Connect的默认最大堆为2G ...您可以使用环境变量来增加worker内存,例如将大小增加一倍。

export KAFKA_HEAP_OPTS="-Xms256M -Xmx4G"
bin/connect-distributed.sh  ...

否则,您需要 * 减少 * 批量大小。
另外,您可以尝试使用Debezium,它可以准确地捕获DELETE和UPDATE查询,而JDBC源代码不能。

相关问题