我正在使用debezium执行从SQLServer到Kafka的CDC,根据业务需要,必须对一些列进行加密。
对于环境POV,我有2个kafka-connect示例运行在K8S上,总共有大约50个连接器运行从SQL-Server到Kafka的流数据。
下面是连接器json文件的片段
{
"name": "live.sql.users",
...
"transforms.unwrap.delete.handling.mode": "drop",
"transforms": "unwrap,cipher",
"predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.cipher.predicate": "isTombstone",
"transforms.cipher.negate": "true",
"transforms.cipher.cipher_data_keys": "[ { \"identifier\": \"my-key\", \"material\": { \"primaryKeyId\": 1000000001, \"key\": [ { \"keyData\": { \"typeUrl\": \"type.googleapis.com/google.crypto.tink.AesGcmKey\", \"value\": \"GhDLeulEJRDC8/19NMUXqw2jK\", \"keyMaterialType\": \"SYMMETRIC\" }, \"status\": \"ENABLED\", \"keyId\": 2000000002, \"outputPrefixType\": \"TINK\" } ] } } ]",
"transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.cipher.cipher_mode": "ENCRYPT",
"predicates": "isTombstone",
"transforms.cipher.field_config": "[{\"name\":\"Password\"},{\"name\":\"MobNumber\"}, {\"name\":\"UserName\"}]",
"transforms.cipher.cipher_data_key_identifier": "my-key"
...
}
当我应用它时,几秒钟后,当我调用/connectors/ /status API时,我得到了下面的错误<connector_name>
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:346)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:261)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)\n\t
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\t
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\t
at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: error: ENCRYPT of field path 'UserName' having data 'deleted605' failed unexpectedly\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:90)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.lambda$matchFields$0(SchemaawareRecordHandler.java:73)\n\t
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)\n\t
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.matchFields(SchemaawareRecordHandler.java:50)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:163)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:140)\n\t
at org.apache.kafka.connect.runtime.PredicatedTransformation.apply(PredicatedTransformation.java:56)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t
... 11 more\nCaused by: java.lang.NullPointerException\n\t
at com.esotericsoftware.kryo.util.DefaultGenerics.nextGenericTypes(DefaultGenerics.java:77)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.pushTypeVariables(FieldSerializer.java:144)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:102)\n\t
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:627)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:75)\n\t
... 21 more\n
知道这一点后,相同的配置与其他连接器配合使用不会出现任何问题
1条答案
按热度按时间rt4zxlrg1#
进一步调试和查看Kryo库后,根据Kryo文档,
Kryo
类不是线程安全的:Kryo不是线程安全的。每个线程都应该有自己的Kryo、Input和Output示例。
我在kryptonite repo上打开了一个线程,主提交者已经确认它不支持多线程,唯一的方法是使用单独的连接器示例或池(完整线程),这是不可行的,因为我同时运行了50多个连接器。
关于Kryo示例的池选项here is guide on how to do it,但我没有尝试它。
希望这对任何有同样问题的人有帮助,或者将来会面对它。