gpt4 book ai didi

hadoop - PIG Join包括必须使用袋外值过滤的袋

转载 作者:行者123 更新时间:2023-12-02 21:22:42 25 4
gpt4 key购买 nike

我打算加快PIG的工作,并结合两个来源的web_log数据和股票定价历史记录。日期/时间被标准化为时间戳,并且对股票代码执行联接。时间戳不匹配。

jnd = JOIN web_time BY w_sym, stock_sort BY group;

该组包含特定于该符号的一袋库存数据。这是组合的架构。

jnd:{web_time::ip:chararray,web_time::user:chararray,web_time::w_time:long,web_time::url:chararray,stock_sort::sort:{(sym:chararray,time:long,price:double )}}

我需要使用web_time::w_time和time过滤stock_sort包,它不是完全匹配的。样本JND数据如下所示。

(14.192.253.226,voraciouszing,1213201721000,“GET /VLCCF.html HTTP / 1.0”,{(VLCCF,1265361975000,13.84),(VLCCF,1265262560000,14.16),(VLCCF,1265192740000,14.44),(VLCCF,1265099390000, 14.48),(VLCCF,1265028034000,14.5),(VLCCF,1262678148000,13.76),(VLCCF,1262607761000,13.82),(VLCCF,1233832497000,16.9),(VLCCF,1233740569000,16.96)...,(VLCCF,884004754000 ,23.99),(VLCCF,883720431000,23.57)})

使用$ 2中的值,最终我需要过滤除一个条目之外的所有条目,但是现在,我尝试删除具有较小时间戳记的元组。
flake = FOREACH jnd {
fits = FILTER jnd BY (w_time > time);
GENERATE ip, user, w_time, url, fits;
}

上面的方法不起作用,这是第1步,删除所有时间戳小于所需时间(w_time)的Bag Bag元组。 w_time不属于组。这真的需要UDF还是我缺少简单的东西?我停滞不前。

开发环境

Apache Pig版本0.15.0.2.4.0.0-169(rexported)
编译2016年2月10日,07:50:04
Hadoop 2.7.1.2.4.0.0-169
颠覆git@github.com:hortonworks / hadoop.git -r 26104d8ac833884c8776473823007f17
4节点Hortonworks集群

任何输入表示赞赏。

最佳答案

我认为在您的foreach中,您需要过滤stock_sort::sort。不是JND。并应通过jnd.w_time> time进行过滤。我设法写了整个流程。没有UDF。见下文。

拿了两个文件:

xact.txt:

VLCCF,1265361975000,13.84
VLCCF,1265262560000,14.16
VLCCF,1265192740000,14.44
VLCCF,1265099390000,14.48
VLCCF,1265028034000,14.5
VLCCF,1262678148000,13.76
VLCCF,1262607761000,13.82
VLCCF,1233832497000,16.9
VLCCF,1233740569000,16.96
VLCCF,884004754000,23.99
VLCCF,883720431000,23.5

stock.txt

14.192.253.226,voraciouszing,1213201721000,“GET /VLCCF.html HTTP / 1.0”,VLCCF
stock = load 'stock.txt' using PigStorage(',') as (
ip:chararray,
user:chararray,
w_time:long,
url:chararray,
symbol:chararray
);

xact = load 'xact.txt' using PigStorage(',') as (
symbol:chararray,
time:long,
price:double
);

xact_grouped = foreach(group xact by symbol) generate
group, xact;

joined = join stock by symbol, xact_grouped by group;

filtered = foreach joined {
grp = filter xact by time < joined.w_time;
generate ip, grp;
};

dump filtered;

给我

(14.192.253.226,{(VLCCF,884004754000,23.99),(VLCCF,883720431000,23.5)})

编辑:另外
stock = load 'stock.txt' using PigStorage(',') as (
ip:chararray,
user:chararray,
w_time:long,
url:chararray,
symbol:chararray
);

xact = load 'xact.txt' using PigStorage(',') as (
symbol:chararray,
time:long,
price:double
);

joined = join stock by symbol, xact by symbol;

joined_filtered = foreach (filter joined by time < w_time) generate
ip as ip,
user as user,
w_time as w_time,
stock::symbol as symbol,
time as time,
price as price;

grouped = foreach (group joined_filtered by (ip, user, w_time)) generate
flatten(group),
joined_filtered;

关于hadoop - PIG Join包括必须使用袋外值过滤的袋,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36883576/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com