当发布的数据损坏时,跳过kafka中的sink步骤

bzzcjhmw  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(442)

在java服务器端,经过一些处理之后,我通过restfulwebservice将日志数据(json格式)从服务器发布到kafka。
在hdfs侧,我的Flume类型是avro。因此,为了将json(源)解析为avro(目标),我使用morphline和avro模式。
如果发布的数据不适合morphline或avro模式,通常我会得到下面的错误,
原因:com.fasterxml.jackson.core.jsonparseexception:非法的无引号字符((ctrl char,代码10)):必须使用反斜杠转义才能包含在字符串值中
如果我得到这一次,偏移量就不再移动了。简单地说,如果Kafka只得到一次这个错误,它就不能再接收发布的数据了。
为了避免这个错误,我假设有两种解决方案。第一种是在服务器端为大数据端使用的avro模式编写json验证器。我更喜欢的第二种方法是跳过并且不接收为请求的avro模式未格式化的日志数据。但是在跳过一个损坏的数据之后,如果Kafka得到了合适的数据,它就应该将其丢弃。
我认为如果我在flume或kafka配置文件中添加一些参数是可能的。那么,当发布的数据不适合请求的模式或请求的行时,如何跳过sink步骤呢?

9bfwbjaz

9bfwbjaz1#

我把问题解决了,
像这样在morphline中添加了try-catch代码块

morphlines: [
  {
    id: convertJsonToAvro
    importCommands: [ "org.kitesdk.**" ]
    commands: [
       {
         tryRules {
              catchExceptions : true
           rules : [
             {
               commands : [
                 # save initial state
                 { readJson {} }
                # extract JSON objects into fields
              { extractJsonPaths {
                flatten: true
                paths: {
            PROJECT_NAME: /PROJECT_NAME
            WSDL_NAME: /WSDL_NAME
            ....
            ....
            ....
            MESSAGE_OUT: /MESSAGE_OUT
        }
      } }
      # convert the extracted fields to an avro object
      # described by the schema in this field
      { toAvro {
        schemaFile:/u0x/myPath/myAvroSchema.avsc
      } }
      # serialize the object as avro
      { writeAvroToByteArray: {
        format: containerlessBinary
              } }
           ]
         }
         {
          commands : [
            { logWarn { format : "Ignoring record with unsupported input format in myLogService: {}", args : ["@{}"] } }
            { dropRecord {} }    
            ]
         }
       ]
     }   
    }    
   ]
  }
]

tryRules 我强制代码捕获所有异常。
rules: 你可以写 "command:" 阻塞任何您想要的,如果其中一个抛出除最后一个命令块之外的异常,最后一个命令将运行。记住最后一个是“抓住”。我的意思是,如果第一个命令块失败,最后一个(第二个)命令将运行。若第一个命令运行得很好,最后一个命令就不能工作,因为最后一个命令块的工作方式和catch块类似。
所以当代码 readJson {} 在第一个命令块中失败,它抛出一个异常,最后一个命令(catch块)处理它,这样它就不会尝试在kafka主题中接收当前数据,因为它将运行 dropRecord {} .
有关详细文档,请访问kitesdk。

相关问题