发布数据损坏时跳过kafka中的接收器步骤
skip the sink step in kafka when posted data corrupted
在java服务器端经过一些处理后,我通过[=32将日志数据(json格式)从服务器发布到kafka =] 网络服务。
在 hdfs 端 我的接收器类型是 avro。因此,为了将 json(源)解析为 avro(目标),我使用了 morphline 和 avro 模式。
如果发布的数据不适合 morphline 或 avro 模式,通常会出现以下错误,
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal
unquoted character ((CTRL-CHAR, code 10)): has to be escaped using
backslash to be included in string value
此外,如果我得到一次,偏移量将不再移动。简而言之,如果 kafka 只得到一次这个错误,它就不能再接收发布的数据了。
为了避免这个错误,我想有两种解决方案。第一个是在服务器端为大数据端使用的 avro 模式编写 json 验证器。我更喜欢的第二种方法是跳过并且不接收未针对请求的 avro 模式格式化的日志数据。但是在跳过损坏的数据后,如果 kafka 获得合适的数据,它应该将其下沉。
我觉得在flume或者kafka的配置文件中加一些参数是可以的。那么当发布的数据不适合请求的模式或请求的 morphline 时,我该如何跳过接收步骤呢?
我解决了吗啉端的问题,
像这样在 morphline 中添加了 try-catch 代码块
morphlines: [
{
id: convertJsonToAvro
importCommands: [ "org.kitesdk.**" ]
commands: [
{
tryRules {
catchExceptions : true
rules : [
{
commands : [
# save initial state
{ readJson {} }
# extract JSON objects into fields
{ extractJsonPaths {
flatten: true
paths: {
PROJECT_NAME: /PROJECT_NAME
WSDL_NAME: /WSDL_NAME
....
....
....
MESSAGE_OUT: /MESSAGE_OUT
}
} }
# convert the extracted fields to an avro object
# described by the schema in this field
{ toAvro {
schemaFile:/u0x/myPath/myAvroSchema.avsc
} }
# serialize the object as avro
{ writeAvroToByteArray: {
format: containerlessBinary
} }
]
}
{
commands : [
{ logWarn { format : "Ignoring record with unsupported input format in myLogService: {}", args : ["@{}"] } }
{ dropRecord {} }
]
}
]
}
}
]
}
]
在 tryRules
中,我强制代码捕获所有异常。
在rules:
中你可以随便写"command:"
块,如果其中一个抛出异常除了最后一个命令块,最后一个命令将运行。请记住,最后一个是 "catch"。我的意思是,以防万一,如果第一个命令块失败,最后(第二个)命令将 运行。如果第一个命令 运行 完美,最后一个命令将不起作用,因为最后一个命令块像 catch 块一样工作。
因此,当代码 readJson {}
在第一个命令块中失败时,它会抛出异常,最后一个命令(catch 块)会处理它,因此它不会尝试在 kafka 主题中接收当前数据,因为它会 运行 dropRecord {}
.
详细文档可以访问kitesdk.
在java服务器端经过一些处理后,我通过[=32将日志数据(json格式)从服务器发布到kafka =] 网络服务。
在 hdfs 端 我的接收器类型是 avro。因此,为了将 json(源)解析为 avro(目标),我使用了 morphline 和 avro 模式。
如果发布的数据不适合 morphline 或 avro 模式,通常会出现以下错误,
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal unquoted character ((CTRL-CHAR, code 10)): has to be escaped using backslash to be included in string value
此外,如果我得到一次,偏移量将不再移动。简而言之,如果 kafka 只得到一次这个错误,它就不能再接收发布的数据了。
为了避免这个错误,我想有两种解决方案。第一个是在服务器端为大数据端使用的 avro 模式编写 json 验证器。我更喜欢的第二种方法是跳过并且不接收未针对请求的 avro 模式格式化的日志数据。但是在跳过损坏的数据后,如果 kafka 获得合适的数据,它应该将其下沉。
我觉得在flume或者kafka的配置文件中加一些参数是可以的。那么当发布的数据不适合请求的模式或请求的 morphline 时,我该如何跳过接收步骤呢?
我解决了吗啉端的问题,
像这样在 morphline 中添加了 try-catch 代码块
morphlines: [
{
id: convertJsonToAvro
importCommands: [ "org.kitesdk.**" ]
commands: [
{
tryRules {
catchExceptions : true
rules : [
{
commands : [
# save initial state
{ readJson {} }
# extract JSON objects into fields
{ extractJsonPaths {
flatten: true
paths: {
PROJECT_NAME: /PROJECT_NAME
WSDL_NAME: /WSDL_NAME
....
....
....
MESSAGE_OUT: /MESSAGE_OUT
}
} }
# convert the extracted fields to an avro object
# described by the schema in this field
{ toAvro {
schemaFile:/u0x/myPath/myAvroSchema.avsc
} }
# serialize the object as avro
{ writeAvroToByteArray: {
format: containerlessBinary
} }
]
}
{
commands : [
{ logWarn { format : "Ignoring record with unsupported input format in myLogService: {}", args : ["@{}"] } }
{ dropRecord {} }
]
}
]
}
}
]
}
]
在 tryRules
中,我强制代码捕获所有异常。
在rules:
中你可以随便写"command:"
块,如果其中一个抛出异常除了最后一个命令块,最后一个命令将运行。请记住,最后一个是 "catch"。我的意思是,以防万一,如果第一个命令块失败,最后(第二个)命令将 运行。如果第一个命令 运行 完美,最后一个命令将不起作用,因为最后一个命令块像 catch 块一样工作。
因此,当代码 readJson {}
在第一个命令块中失败时,它会抛出异常,最后一个命令(catch 块)会处理它,因此它不会尝试在 kafka 主题中接收当前数据,因为它会 运行 dropRecord {}
.
详细文档可以访问kitesdk.