发布数据损坏时跳过kafka中的接收器步骤

skip the sink step in kafka when posted data corrupted

在java服务器端经过一些处理后,我通过[=32将日志数据(json格式)从服务器发布到kafka =] 网络服务。

在 hdfs 端 我的接收器类型是 avro。因此,为了将 json(源)解析为 avro(目标),我使用了 morphline 和 avro 模式。

如果发布的数据不适合 morphline 或 avro 模式,通常会出现以下错误,

Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal unquoted character ((CTRL-CHAR, code 10)): has to be escaped using backslash to be included in string value

此外,如果我得到一次,偏移量将不再移动。简而言之,如果 kafka 只得到一次这个错误,它就不能再接收发布的数据了。

为了避免这个错误,我想有两种解决方案。第一个是在服务器端为大数据端使用的 avro 模式编写 json 验证器。我更喜欢的第二种方法是跳过并且不接收未针对请求的 avro 模式格式化的日志数据。但是在跳过损坏的数据后,如果 kafka 获得合适的数据,它应该将其下沉。

我觉得在flume或者kafka的配置文件中加一些参数是可以的。那么当发布的数据不适合请求的模式或请求的 morphline 时,我该如何跳过接收步骤呢?

我解决了吗啉端的问题,

像这样在 morphline 中添加了 try-catch 代码块

morphlines: [
  {
    id: convertJsonToAvro
    importCommands: [ "org.kitesdk.**" ]
    commands: [
       {
         tryRules {
              catchExceptions : true
           rules : [
             {
               commands : [
                 # save initial state
                 { readJson {} }
                # extract JSON objects into fields
              { extractJsonPaths {
                flatten: true
                paths: {
            PROJECT_NAME: /PROJECT_NAME
            WSDL_NAME: /WSDL_NAME
            ....
            ....
            ....
            MESSAGE_OUT: /MESSAGE_OUT
        }
      } }
      # convert the extracted fields to an avro object
      # described by the schema in this field
      { toAvro {
        schemaFile:/u0x/myPath/myAvroSchema.avsc
      } }
      # serialize the object as avro
      { writeAvroToByteArray: {
        format: containerlessBinary
              } }
           ]
         }
         {
          commands : [
            { logWarn { format : "Ignoring record with unsupported input format in myLogService: {}", args : ["@{}"] } }
            { dropRecord {} }    
            ]
         }
       ]
     }   
    }    
   ]
  }
]

tryRules 中,我强制代码捕获所有异常。

rules:中你可以随便写"command:"块,如果其中一个抛出异常除了最后一个命令块,最后一个命令将运行。请记住,最后一个是 "catch"。我的意思是,以防万一,如果第一个命令块失败,最后(第二个)命令将 运行。如果第一个命令 运行 完美,最后一个命令将不起作用,因为最后一个命令块像 catch 块一样工作。

因此,当代码 readJson {} 在第一个命令块中失败时,它会抛出异常,最后一个命令(catch 块)会处理它,因此它不会尝试在 kafka 主题中接收当前数据,因为它会 运行 dropRecord {}.

详细文档可以访问kitesdk.