gpt4 book ai didi

java - 对于Databag中的每个元组一次又一次地从try block 执行

转载 作者:行者123 更新时间:2023-12-02 21:02:37 25 4
gpt4 key购买 nike

这是代码:

public class databag extends EvalFunc<DataBag> {
TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();

private DataBag result;
private String delimiterType = ": Src / dest :";
public DataBag exec(Tuple input) throws IOException {

try{
result = mBagFactory.newDefaultBag(); // change here
result.add(input);

getLogger().info("::::::: Entered try block ::::::::::::");

// create indexing for source and destination . ::: (Arraylist<Object[]>)
ConcurrentHashMap<Object, ArrayList<Integer>> srcIndexMap = new ConcurrentHashMap<Object, ArrayList<Integer>>();
ConcurrentHashMap<Object, ArrayList<Integer>> destIndexMap = new ConcurrentHashMap<Object, ArrayList<Integer>>();


// store the rows to Arraylist(Object[]) collection by converting .
ArrayList<Object[]> source = new ArrayList<Object[]>();
ArrayList<Object[]> destination = new ArrayList<Object[]>();

int srcCounter = 0;
int destCounter = 0;

ArrayList<Integer> Sourcearray = new ArrayList<Integer>();
ArrayList<Integer> Destinationarray = new ArrayList<Integer>();
for (Iterator<Tuple> iter = result.iterator(); iter.hasNext();) {
//some code here
}

我正在尝试使用for循环迭代数据包中的元组,但是对于每个元组,所有集合都将重新初始化,换句话说,它是从try块中为每个元组执行的。

输出:
INFO  PigUDFpck.databag - ::::::: Entered try block ::::::::::::
PigUDFpck.databag - srcIndexMap={}
PigUDFpck.databag - inside main if loop skey=4
PigUDFpck.databag - destIndexMap.contains(skey)=false
PigUDFpck.databag - into else loop of main method
PigUDFpck.databag - ::::::: Entered try block ::::::::::::
PigUDFpck.databag - srcIndexMap={}
PigUDFpck.databag - inside main if loop skey=4
PigUDFpck.databag - destIndexMap.contains(skey)=false
PigUDFpck.databag - into else loop of main method

更新

pig 脚本
REGISTER /usr/local/pig/UDF/UDFBAG.jar;

sourcenew = LOAD 'hdfs://HADOOPMASTER:54310/DVTTest/Source1.txt' USING PigStorage(',') as (ID:int,Name:chararray,FirstName:chararray ,LastName:chararray,Vertical_Name:chararray ,Vertical_ID:chararray,Gender:chararray,DOB:chararray,Degree_Percentage:chararray ,Salary:chararray,StateName:chararray);

destnew = LOAD 'hdfs://HADOOPMASTER:54310/DVTTest/Destination1.txt' USING PigStorage(',') as (ID:int,Name:chararray,FirstName:chararray ,LastName:chararray,Vertical_Name:chararray ,Vertical_ID:chararray,Gender:chararray,DOB:chararray,Degree_Percentage:chararray ,Salary:chararray,StateName:chararray);

cogroupnew = COGROUP sourcenew BY ID inner, destnew BY ID inner;

diff_data = FOREACH cogroupnew GENERATE DIFF(sourcenew,destnew);

ids = FOREACH diff_data GENERATE FLATTEN($0);

id1 = DISTINCT( FOREACH ids GENERATE $0);

src = FILTER sourcenew BY ID == id1.$0;

finalsrc = FOREACH src GENERATE *, 'Source' as Source:chararray;

dest = FILTER destnew BY ID == id1.$0;

finaldest = FOREACH dest GENERATE *, 'Destination' as Destination:chararray;

final = UNION finalsrc,finaldest ;

A = FOREACH final GENERATE PigUDFpck.databag(*);

DUMP A;

和UDF的输入如下:
(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,ArkansasSrc1,Source)

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,ArkansaSrc2,Source)

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,Arkansasdest1,Destination)

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,Arkanssdest2,Destination)

非常感谢帮助!!
提前致谢..!

最佳答案

请了解PIG是DAG生成器,并基于DAG生成Map Reduce Jobs。

较高级别的PIG结构(例如LOAD,FOREACH,JOIN)归结为较低级别的MR结构。

> Load       => Mapper in MR 
> GENRERATE => a function call in Mapper or Reduce
> JOIN => SHUFFLE (Join in Map Reduce)
> Filter => Filter function in Map or Reduce

由于databag函数的调用不是在Reducer的Mapper中执行的,而是被调用一次,而是多次。

对于每个输入ROW(取决于数据包UDF成为映射器或化简器的一部分),将执行dataBag。

请在Pig中执行EXPAIN命令,该命令会将PIG脚本转换为跟踪基础MR作业

详细了解,请遵循:

  1. http://bytepadding.com/big-data/map-reduce/pig-to-map-and-reduce/

  2. http://bytepadding.com/big-data/map-reduce/understanding-map-reduce-the-missing-guide/

关于java - 对于Databag中的每个元组一次又一次地从try block 执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42786346/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com