gpt4 book ai didi

hadoop - 发布数据损坏时跳过 kafka 中的 sink 步骤

转载 作者:行者123 更新时间:2023-12-02 21:14:06 24 4
gpt4 key购买 nike

在java服务器端经过一些过程,我通过restful webservice将日志数据(json格式)从服务器发布到kafka。

在 hdfs 方面 我的水槽类型是 avro。因此,为了将 json(源)解析为 avro(目标),我使用的是 morphline 和 avro 模式。

如果发布的数据不适合 morphline 或 avro 模式,通常我会收到以下错误,

Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal unquoted character ((CTRL-CHAR, code 10)): has to be escaped using backslash to be included in string value



另外,如果我得到这个一次,偏移量就不再移动了。简而言之,如果 kafka 只收到此错误一次,它就不能再接收已发布的数据。

为了避免这个错误,我想有两种解决方案。第一个是在服务器端为大数据端使用的 avro 模式编写 json 验证器。我更喜欢的第二种方法是跳过并且不接收未格式化为请求的 avro 模式的日志数据。但是在跳过损坏的数据后,如果 kafka 获得合适的数据,它应该下沉它。

我认为如果我在flume或kafka配置文件中添加一些参数是可能的。那么,当发布的数据不适合请求的模式或请求的 morphline 时,如何跳过接收器步骤?

最佳答案

我解决了吗啉方面的问题,

像这样在 morphline 中添加了 try-catch 代码块

morphlines: [
{
id: convertJsonToAvro
importCommands: [ "org.kitesdk.**" ]
commands: [
{
tryRules {
catchExceptions : true
rules : [
{
commands : [
# save initial state
{ readJson {} }
# extract JSON objects into fields
{ extractJsonPaths {
flatten: true
paths: {
PROJECT_NAME: /PROJECT_NAME
WSDL_NAME: /WSDL_NAME
....
....
....
MESSAGE_OUT: /MESSAGE_OUT
}
} }
# convert the extracted fields to an avro object
# described by the schema in this field
{ toAvro {
schemaFile:/u0x/myPath/myAvroSchema.avsc
} }
# serialize the object as avro
{ writeAvroToByteArray: {
format: containerlessBinary
} }
]
}
{
commands : [
{ logWarn { format : "Ignoring record with unsupported input format in myLogService: {}", args : ["@{}"] } }
{ dropRecord {} }
]
}
]
}
}
]
}
]

tryRules我正在强制代码捕获所有异常。

rules:你可以写 "command:"阻止你想要的任何东西,如果其中一个抛出除了最后一个命令 block 之外的异常,最后一个命令将运行。请记住,最后一个是“catch”。我的意思是,如果第一个命令 block 失败,最后一个(第二个)命令将运行。如果第一个命令完美运行,最后一个命令将不起作用,因为最后一个命令 block 就像一个 catch block 一样工作。

所以当代码 readJson {}在第一个命令 block 中失败,它抛出一个异常,最后一个命令(catch block )处理它,所以它不会尝试在 kafka 主题中接收当前数据,因为它将运行 dropRecord {} .

详细文档可以访问 kitesdk .

关于hadoop - 发布数据损坏时跳过 kafka 中的 sink 步骤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39287178/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com