gpt4 book ai didi

Hadoop Cascading - 创建一个源,两个汇的流

转载 作者:可可西里 更新时间:2023-11-01 14:20:49 26 4
gpt4 key购买 nike

我正在使用 Cascading 2 创建 Hadoop 作业,并尝试创建一个从单一来源开始的流程。在对数据应用几个函数后,我需要拆分流,以便使用此数据创建两个单独的报告(在两个单独的接收器中)。

    //SOURCE
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, input );

//REPORT1 SINK
Scheme report1SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
Tap report1Sink = new Hfs( report1SinkScheme, output1, SinkMode.REPLACE );

//REPORT2 SINK
Scheme report2SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
Tap report2Sink = new Hfs( report2SinkScheme, output2, SinkMode.REPLACE );

//INITIAL FUNCTIONS
Pipe firstPipe = new Pipe("firstPipe");
firstPipe = new Each(firstPipe, new Fields("line"), functionA);
firstPipe = new Each(firstPipe, functionB, Fields.ALL);

//REPORT1 FUNCTION
report1Pipe = new Each(firstPipe, Fields.ALL, function1, Fields.RESULTS);

//REPORT2 FUNCTION
report2Pipe = new Each(firstPipe, Fields.ALL, function2, Fields.RESULTS);

//CONNECT FLOW PARTS
FlowDef flowDef = new FlowDef()
.setName("report-flow")
.addSource(firstPipe, source)
.addSink(report1Pipe, report1Sink)
.addSink(report2Pipe, report2Sink);

new HadoopFlowConnector( properties ).connect( flowDef ).complete();

目前这给我错误“java.lang.IllegalArgumentException: cannot add duplicate sink: firstPipe”但即使在弄乱它一段时间后我也遇到了与流程设置有关的各种其他问题。

是否有人可以解释如何构建这种形式的流(一个源,两个汇)?我需要创建一个级联吗?或者在拆分之前我是否需要一个中间接收器来保存数据?

请帮忙!

最佳答案

您可以使用级联文档中提到的拆分模式。这是一个例子:

public static void main(String[] args) {
// source and sink
Scheme sourceScheme = new TextLine(new Fields("line"));
Tap source = new FileTap(sourceScheme, args[0]);

Fields sinkFields = new Fields("word", "count");
Scheme sinkScheme = new TextLine(sinkFields, sinkFields);
Tap sink_one = new FileTap(sinkScheme, "out-one.txt");
Tap sink_two = new FileTap(sinkScheme, "out-two.txt");

// the pipe assembly
Pipe assembly = new Pipe("wordcount");

String regex = "\\w+";
Function function = new RegexGenerator(new Fields("word"), regex);
assembly = new Each(assembly, new Fields("line"), function);

Aggregator count = new Count(new Fields("count"));

// ...split into two pipes
Pipe countOne = new Pipe("count-one", assembly);
countOne = new GroupBy(countOne, new Fields("word"));
countOne = new Every(countOne, count);

Pipe countTwo = new Pipe("count-two", assembly);
countTwo = new GroupBy(countTwo, new Fields("word"));
countTwo = new Every(countTwo, count);

// create the flow
final List<Pipe> pipes = new ArrayList<Pipe>(2);
pipes.add(countOne);
pipes.add(countTwo);

final Map<String, Tap> sinks = new HashMap<String, Tap>();
sinks.put("count-one", sink_one);
sinks.put("count-two", sink_two);

FlowConnector flowConnector = new LocalFlowConnector();
Flow flow = flowConnector.connect(source, sinks, pipes);

flow.complete();
}

关于Hadoop Cascading - 创建一个源,两个汇的流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12250352/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com