gpt4 book ai didi

configuration - 级联 : how to define every map-reduce job in configuration?

转载 作者:行者123 更新时间:2023-12-02 05:02:27 24 4
gpt4 key购买 nike

我的代码如下。这是级联代码。它有 8 个工作岗位。我不知道如何配置每个作业。下面的代码将 8 个作业配置在一起。但我想做的是让最后一份工作减少。请问这8个jobs怎么识别,分别怎么配置?谢谢。

private static void Demo(String[] args) {
/* Tap sourceTap = new Hfs(new TextLine(), "D:/test/file");
Tap finalResultTap = new Hfs(new TextLine(), "D:/test/result", true);
*/
Tap sourceTap = new Hfs(new TextLine(), args[0], SinkMode.KEEP);
Tap finalResultTap = new Hfs(new TextLine(), args[1], SinkMode.REPLACE);
Tap trap = new Hfs(new TextLine(), args[2], SinkMode.REPLACE);


Pipe sourcePipe = new Pipe("sourcePipe");
sourcePipe = getFilterPipe(sourcePipe);

Pipe vvResultPipe = new Pipe("vvResultPipe", sourcePipe);
vvResultPipe = getVVResultPipe(sourcePipe);

Pipe clickResultPipe = new Pipe("clickResultPipe", sourcePipe);
clickResultPipe = getClickResultPipe(clickResultPipe);

Pipe stClickResultPipe = new Pipe("stClickResultPipe", sourcePipe);
stClickResultPipe = getStClickResultPipe(sourcePipe);

//连接3个pipe的结果
Pipe resultPipe = new Pipe("resultPipe");
resultPipe = new CoGroup(vvResultPipe, new Fields("vid"), clickResultPipe, new Fields("referVid"),
new Fields("vid", "totalVV", "referVid", "totalClick"), new LeftJoin());
resultPipe = new CoGroup(resultPipe, new Fields("vid"), stClickResultPipe, new Fields("referVid"),
new Fields("vid", "totalVV", "referVid", "totalClick", "referVid2", "st1","st2","st3", "st4","st6", "st8"), new LeftJoin());
resultPipe = new Each(resultPipe, new Fields("vid", "totalVV", "totalClick", "st1","st2","st3", "st4","st6", "st8"),
new Identity(Fields.ARGS));

Fields sortClickFields = new Fields("totalVV");
resultPipe = new GroupBy(resultPipe, Fields.NONE, sortClickFields );
sortClickFields.setComparators(Collections.reverseOrder());

/* Limit limit = new Limit(200);
resultPipe = new Each(resultPipe, limit);
*/

JobConf conf = new JobConf();
conf.setJarByClass(Main.class);

//Properties properties = new Properties();
Properties properties = AppProps.appProps().buildProperties(conf);
properties.setProperty("user.group", "d_sdo_data");
properties.setProperty("mapred.job.queue.name", "cug_d_sdo_data");
properties.setProperty("mapred.fairscheduler.pool", "cug_d_sdo_data");
properties.setProperty("cascading.tmp.dir", "/home/hdfs/cluster-data/tmp/mapred/staging/recommend_user/tmp");
properties.setProperty("mapreduce.job.complete.cancel.delegation.tokens", "false");
properties.setProperty("mapred.reduce.tasks", "30");
properties.setProperty("mapred.map.tasks", "200");

//AppProps.setApplicationJarClass(properties, Main.class);
FlowConnector flowConnector = new HadoopFlowConnector(properties);
FlowDef flowDef = FlowDef.flowDef()
.setName( "tfidf" )
.addSource( sourcePipe, sourceTap )
.addTailSink( resultPipe, finalResultTap )
.addTrap( "assertions", trap );
Flow flow = flowConnector.connect(flowDef);
flow.complete();
}

最佳答案

这个问题已经存在三年了,但我在寻找这个解决方案时遇到了它。这是我最终得到的结果:
pipe.getStepConfigDef().setProperty("mapreduce.job.reduces", "1");
这在您定义了要配置的步骤之后进行。

请注意,这是针对 Hadoop 2.6.4 - 如果您使用的是 mapred.reduce.tasks 属性。看这里:

https://hadoop.apache.org/docs/r2.6.4/hadoop-project-dist/hadoop-common/DeprecatedProperties.html

关于configuration - 级联 : how to define every map-reduce job in configuration?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15108196/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com