我已经创建了一个变压器,可以在 ConnectRecord
在Kafka连接工人中使用。
转换器工作正常,但我希望它在发现特定条件(错误)时阻止接收器使用特定主题的消息。
代码瞬间抛出一个异常,将worker碾得停滞不前。虽然它的有效性,这也可能影响到其他议题,似乎是最后的手段。
有没有办法停止/暂停使用Kafka连接的主题 Transformation
用一种好的方式编码?
class ApplySchemaTransformation[T <: ConnectRecord[T]]
extends Transformation[T]
with ContentTypeHandler[T] {
override def apply(record: T): T = {
if ([some criteria]) {
[ok code]
} else {
[stop consumer]
}
}
override def config(): ConfigDef = ...
override def close(): Unit = {}
override def configure(configs: java.util.Map[String, _]): Unit = ...
}
暂无答案!
目前还没有任何答案,快来回答吧!