我需要读入 SourceConnector.taskConfigs() 方法,有办法吗?
SourceConnector.taskConfigs()
46scxncf1#
这有票和公关https://issues.apache.org/jira/browse/kafka-4794目前,我设法使用反射(joor)获得offsetreader:
private OffsetStorageReader getOffsetStorageReader() { try { Object innerContext = on(context).get("this$0"); Object ctx = on(innerContext).get("ctx"); Object herder = on(ctx).get("herder"); String connectorName = on(ctx).get("connectorName"); Object worker = on(herder).get("worker"); Object internalKeyConverter = on(worker).get("internalKeyConverter"); Object internalValueConverter = on(worker).get("internalValueConverter"); Object offsetBackingStore = on(worker).get("offsetBackingStore"); return (OffsetStorageReader) on(Class.forName("org.apache.kafka.connect.storage.OffsetStorageReaderImpl")) .create(offsetBackingStore, connectorName, internalKeyConverter, internalValueConverter) .get(); } catch (Exception e) { throw new RuntimeException(e); } }
1条答案
按热度按时间46scxncf1#
这有票和公关
https://issues.apache.org/jira/browse/kafka-4794
目前,我设法使用反射(joor)获得offsetreader: