gpt4 book ai didi

使用 Hadoop Streaming 进行 avro 转换的 python 脚本

转载 作者:可可西里 更新时间:2023-11-01 14:49:52 30 4
gpt4 key购买 nike

我有 10 GB 的输入文件,我正在尝试使用 python hadoop 流将其转换为 avro,工作成功但我无法使用 avro 阅读器读取输出。

它给出“utf8”编解码器无法解码位置 13924 中的字节 0xb4:无效的起始字节。

这里的问题是我将标准输出用于 hadoop 流的映射器输出,如果我使用文件名并在本地使用脚本,则 avro 输出是可读的。

任何想法,如何解决这个问题?我认为问题在于处理流媒体中的键/值....

hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar \
-input "xxx.txt" \
-mapper "/opt/anaconda/anaconda21/bin/python mapper.py x.avsc" \
-reducer NONE \
-output "xxxxx" -file "mapper.py" \
-lazyOutput \
-file "x.avsc"

映射器脚本是

import sys
import re
import os
from avro import schema, datafile
import avro.io as io
import StringIO

schema_str = open("xxxxx.avsc", 'r').read()
SCHEMA = schema.parse(schema_str)
rec_writer = io.DatumWriter(SCHEMA)
df_writer = datafile.DataFileWriter(sys.stdout, rec_writer, SCHEMA,)
header = []
for field in SCHEMA.fields:
header.append(field.name)

for line in sys.stdin:
fields = line.rstrip().split("\x01")
data = dict(zip(header, fields))
try:
df_writer.append(data)
except Exception, e:
print "failed with data: %s" % str(data)
print str(e)
df_writer.close()

最佳答案

终于可以解决这个问题了。使用输出格式类,并将 avro 二进制转换留给它。在流式映射器中,只需发出 json 记录。

hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar \
-libjars avro-json-1.2.jar \
-jobconf output.schema.url=hdfs:///x.avsc \
-input "xxxxx" \
-mapper "/opt/anaconda/anaconda21/bin/python mapper.py x.avsc" \
-reducer NONE \
-output "/xxxxx" \
-outputformat com.cloudera.science.avro.streaming.AvroAsJSONOutputFormat \
-lazyOutput \
-file "mapper.py" \
-file "x.avsc"

这里是 mapper.py

import sys
from avro import schema
import json

schema_str = open("xxxxx.avsc", 'r').read()
SCHEMA = schema.parse(schema_str)

header = []
for field in SCHEMA.fields:
header.append(field.name)

for line in sys.stdin:
fields = line.rstrip().split("\x01")
data = dict(zip(header, fields))
try:
print >> sys.stdout, json.dumps(data, encoding='ISO-8859-1')
except Exception, e:
print "failed with data: %s" % str(data)
print str(e)

关于使用 Hadoop Streaming 进行 avro 转换的 python 脚本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25269179/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com