从kafka connect transformer中的主题暂停/停止接收器消耗

11dmarpk  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(259)

我已经创建了一个变压器,可以在 ConnectRecord 在Kafka连接工人中使用。
转换器工作正常,但我希望它在发现特定条件(错误)时阻止接收器使用特定主题的消息。
代码瞬间抛出一个异常,将worker碾得停滞不前。虽然它的有效性,这也可能影响到其他议题,似乎是最后的手段。
有没有办法停止/暂停使用Kafka连接的主题 Transformation 用一种好的方式编码?

class ApplySchemaTransformation[T <: ConnectRecord[T]]
    extends Transformation[T]
    with ContentTypeHandler[T] {

   override def apply(record: T): T = {
     if ([some criteria]) {
       [ok code]
     } else {
       [stop consumer]
     }
   }

   override def config(): ConfigDef = ...

   override def close(): Unit = {}

   override def configure(configs: java.util.Map[String, _]): Unit = ...
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题