- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在尝试学习将 Yelp 的 Python API 用于 MapReduce、MRJob。他们的简单单词计数器示例很有意义,但我很好奇人们将如何处理涉及多个输入的应用程序。例如,不是简单地计算文档中的单词,而是将向量乘以矩阵。我想出了这个解决方案,它起作用了,但感觉很傻:
class MatrixVectMultiplyTast(MRJob):
def multiply(self,key,line):
line = map(float,line.split(" "))
v,col = line[-1],line[:-1]
for i in xrange(len(col)):
yield i,col[i]*v
def sum(self,i,occurrences):
yield i,sum(occurrences)
def steps(self):
return [self.mr (self.multiply,self.sum),]
if __name__=="__main__":
MatrixVectMultiplyTast.run()
此代码运行./matrix.py < input.txt
它起作用的原因是矩阵按列存储在 input.txt 中,相应的向量值位于行尾。
所以,下面的矩阵和向量:
表示为 input.txt 为:
简而言之,我将如何更自然地将矩阵和向量存储在单独的文件中并将它们都传递到 MRJob 中?
最佳答案
如果您需要针对另一个(或相同的 row_i、row_j)数据集处理原始数据,您可以:
1) 创建一个 S3 存储桶来存储您的数据副本。将此副本的位置传递给您的任务类,例如下面代码中的 self.options.bucket 和 self.options.my_datafile_copy_location。警告:不幸的是,似乎整个文件必须在处理之前“下载”到任务机器。如果连接不稳定或加载时间过长,则此作业可能会失败。这是执行此操作的一些 Python/MRJob 代码。
将其放入您的映射器函数中:
d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket) # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
d2 = line2.split('\t', 1)
v2, col2 = d2[0], d2[1]
## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
yield <your output key, value pairs>
conn.close()
2) 创建一个 SimpleDB 域,并将所有数据存储在其中。在 boto 和 SimpleDB 上阅读这里: http://code.google.com/p/boto/wiki/SimpleDbIntro
您的映射器代码如下所示:
dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
v2, c2 = item.name, item['column']
## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
yield <your output key, value pairs>
sdb.close()
如果您有大量数据,第二个选项可能会执行得更好,因为它可以对每一行数据而不是一次请求全部数据。请记住,SimpleDB 值最多只能有 1024 个字符长,因此如果您的数据值超过此长度,您可能需要通过某种方法进行压缩/解压缩。
关于python - MRJob 的多输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9302580/
我尝试从 mrjob 中的映射器输出 python 集。我相应地更改了组合器和 reducer 的函数签名。 但是,我收到此错误: Counters From Step 1 Unencodable o
默认情况下,mrJob 以 key[tab] 输出格式存储输出中的键和值。 即使键(或值)为空、null 或其他不感兴趣的情况,也会发生这种情况。假设我的键值对是 None, {"a":1", "b"
我正在尝试更好地理解 mrjob 的示例 from mrjob.job import MRJob class MRWordFrequencyCount(MRJob): def mapper(
我正在尝试预处理 XML 文件以在放入 mapreduce 之前提取某些节点。我有以下代码: from mrjob.compat import jobconf_from_env from mrjob.
我正在使用 cloudera 虚拟机。这是我的文件结构: [cloudera@quickstart pydoop]$ hdfs dfs -ls -R /input drwxr-xr-x - clo
显示多步 map reduce 作业执行时间的最佳方式是什么? 我试图在工作的step1 的mapper init 中设置一个self 变量 def mapper_init_timer(sel
我正在尝试学习将 Yelp 的 Python API 用于 MapReduce、MRJob。他们的简单单词计数器示例很有意义,但我很好奇人们将如何处理涉及多个输入的应用程序。例如,不是简单地计算文档中
如何在 mrjob 的 reducer 或映射器中放置调试语句(如打印)。如果我尝试使用 print 或 sys.stderr.write(),我会收到一个错误 TypeError: a bytes-
我使用 hadoop 流式传输的 mrjob 失败。我在 oracle vm 上有一个带有 python 模块 mrjob 的 hadoop 沙箱。 需要按照 Hadoop Error: Error
我是 map reduce 的新手,我正在尝试使用 mrjob 运行 map reduce 作业python包。但是,我遇到了这个错误: ERROR:mrjob.launch:Step 1 of 1
如果我对 MRJob 的理解正确,你可以通过运行 MRJob 来模拟 hadoop 的多进程运行 python mrfile.py -r local input.txt 我正在运行 Windows(现
我一直在尝试修改给定的 mapper_pre_filter 示例 here .现在,如果我不直接在步骤中指定命令,而是编写一个返回该命令的方法,如下所示: from mrjob.job import
我正在尝试在 EMR 集群中运行示例 mrjob。我已在 AWS 仪表板中手动创建 EMR 集群并启动 mrjob,如下所示 python keywords.py -r emr s3://common
我正在尝试通过三个步骤来实现映射缩减作业,并且在每个步骤之后我都需要迄今为止所有步骤的数据。有谁有关于如何在 mrjob 中将映射器或 reducer 的结果保存到磁盘的示例/想法? 最佳答案 您可以
我对 Map/Reduce 原理和 python mrjob 框架还很陌生,我写了这个示例代码,它工作正常,但我想知道我可以改变它什么以使其“完美”/更高效. from mrjob.job impor
我正在使用 MrJob 编写 hadoop 应用程序。我需要使用分布式缓存来访问一些文件。我知道 hadoop 流中有一个选项 -files 但不知道如何在程序中访问它。 感谢您的帮助。 最佳答案 我
我正在使用 mrjob 处理一批文件并获取一些统计信息。我知道我可以在单个文件上运行 mapreduce 作业,例如 python count.py output 但是我怎样才能将文件目录提供给脚本
有没有办法使用mrjob对reducer函数的输出进行排序? 我认为 reducer 函数的输入是按键排序的,我尝试利用此功能使用另一个 reducer 对输出进行排序,如下所示,我知道值具有数值,我
场景是我需要处理一个文件(输入),对于每条记录,我需要检查输入文件中的某些字段是否与存储在 Hadoop 集群中的字段匹配。 我们正在考虑使用 MRJob 处理输入文件并使用 HIVE 从 hadoo
我想这样索引化简的结果: 1 "EZmocAborM6z66rTzeZxzQ" 2 "FIk4lQQu1eTe2EpzQ4xhBA" 3 "myql3o3x22_ygECb8gVo7A"
我是一名优秀的程序员,十分优秀!