gpt4 book ai didi

ruby和pig处理流式文件实例

转载 作者:qq735679552 更新时间:2022-09-29 22:32:09 25 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章ruby和pig处理流式文件实例由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

大数据操作中涉及到数据清洗步奏还是用脚本处理比较方便,下边介绍一下pig加载hdfs文件后调用ruby脚本处理数据,再返回数据流至pig中处理的一个简单案例.

注意:ruby的流式处理用到wukong这个gem包,相关下载: https://github.com/mrflip/wukong 。

pig中加载分布式文件调用ruby流式处理:

复制代码 代码如下:

log = load '$INFILE' using PigStorage('\t'),

  。

define tracking_parser `/usr/ruby parse_click.rb --map` SHIP('parse_click.rb', 'click_tracking.rb'),

strmo = stream log through tra_parser,

store strmo into '$OUTFILE' using PigStorage('\t'),

  。

  。

复制代码 代码如下:

require 'wukong' require 'json' require './click_tra.rb' 。

  。

module ParseClick   class Mapper < Wukong::Streamer::RecordStreamer     def before_stream       @bad_count = 0     end 。

    def after_stream       raise RuntimeError, "Exceeded bad records : #{@bad_count}" if @bad_count > 10     end 。

    def process *records       yield ClickTra.new(JSON.parse(records[2])).to_a     rescue => e       @bad_count += 1       warn "Bad record #{e}: #{records[2]}"     end   end end 。

Wukong.run ParseClick::Mapper, nil 。

  。

  。

复制代码 代码如下:

require 'date' require './models.rb' 。

  。

class ClickTra     output :ip   output :c_date   #output your other atrributes 。

  def c_date     click_date.strftime("%Y%m%d").to_i   end  。

  def ip     browser_ip.to_i   end 。

end 。

  。

其中 。

strmo = stream log through tra_parser;调用定义的外部程序tra_parser处理log对象。 Wukong.run ParseClick::Mapper, nil执行完后,将ruby执行结果回调pig接收。 store strmo into '$OUTFILE' using PigStorage('\t');做结果存储持久化.

最后此篇关于ruby和pig处理流式文件实例的文章就讲到这里了,如果你想了解更多关于ruby和pig处理流式文件实例的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com