gpt4 book ai didi

hadoop - 在发送到 channel 之前删除空的 Flume 事件

转载 作者:可可西里 更新时间:2023-11-01 14:22:33 26 4
gpt4 key购买 nike

我在Flume的一本书上看到,如果在拦截器的intercept方法中有一个事件返回为null,这个事件就会被丢弃。因此,我创建了一个自定义拦截器,它根据条件将事件返回为 null,例如:

public Event intercept(Event event) {
// TODO Auto-generated method stub
Event finalEvent = event;
check = new String(event.getBody(),Charsets.UTF_8);

if(check.matches("([0-9]-.+?-.+?-[0-9][0-9]+)")){

try {
fileWriter.append(new String(event.getBody(),Charsets.UTF_8)+ "\n");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
finalEvent = null;
}
System.out.println("Event is : " + finalEvent);
return finalEvent;
}

拦截器发出空事件,但文件 channel 仍将其作为空传递给 HDFS 接收器。为什么事件没有被删除?我正在使用假脱机目录作为源。

最佳答案

让我们看看会发生什么。您使用 Spooling 目录作为源,源调用函数 processEventBatch(events),在 processEventBatch() 中:

events = interceptorChain.intercept(events);//use your custom interceptor
...
eventQueue.add(event); // add user event to queue,even the event == null

如果假脱机目录源使用 processEvent(),那么您的拦截器将起作用:在 processEvent() 中:

event = interceptorChain.intercept(event);
if (event == null) {
//null event then return !! intercept works!!
return;
}

所以你应该修改 processEventBatch(),然后做:

if (event == null){
//dont add to eventQueue
}

关于hadoop - 在发送到 channel 之前删除空的 Flume 事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21164514/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com