gpt4 book ai didi

mapreduce - 如何进行更快的 Riak MapReduce 查询?

转载 作者:行者123 更新时间:2023-12-04 23:59:05 33 4
gpt4 key购买 nike

我们怎样才能使我们的 MapReduce 查询更快?

我们使用五节点 Riak 数据库集群构建了一个应用程序。
我们的数据模型由三个部分组成:比赛、联赛和球队。

比赛包含联赛和球队的链接:

型号

var match = {
id: matchId,
leagueId: meta.leagueId,
homeTeamId: meta.homeTeamId,
awayTeamId: meta.awayTeamId,
startTime: m.match.startTime,
firstHalfStartTime: m.match.firstHalfStartTime,
secondHalfStartTime: m.match.secondHalfStartTime,
score: {
goals: {
a: 1*safeGet(m.match, 'score.goals.a'),
b: 1*safeGet(m.match, 'score.goals.b')
},
corners: {
a: 1*safeGet(m.match, 'score.corners.a'),
b: 1*safeGet(m.match, 'score.corners.b')
}
}
};

var options = {
index: {
leagueId: match.leagueId,
teamId: [match.homeTeamId, match.awayTeamId],
startTime: match.startTime || match.firstHalfStartTime || match.secondHalfStartTime
},
links: [
{ bucket: 'leagues', key: match.leagueId, tag: 'league' },
{ bucket: 'teams', key: match.homeTeamId, tag: 'home' },
{ bucket: 'teams', key: match.awayTeamId, tag: 'away' }
]
};
match.model = 'match';
modelCache.save('matches', match.id, match, options, callback);

查询

我们编写了一个查询,从多个桶中返回结果,一种方法是分别查询每个桶。另一种方法是使用链接来组合来自单个查询的结果。

无论我们的存储桶大小有多小,我们尝试的两个版本的查询都会占用一秒钟的时间。
第一个版本使用两个 map 阶段,我们在这篇文章 ( Practical Map-Reduce: Forwarding and Collecting ) 之后建模。
#!/bin/bash
curl -X POST \
-H "content-type: application/json" \
-d @- \
http://localhost:8091/mapred \
<<EOF
{
"inputs":{
"bucket":"matches",
"index":"startTime_bin",
"start":"2012-10-22T23:00:00",
"end":"2012-10-24T23:35:00"
},
"query": [
{"map":{"language": "javascript", "source":"
function(value, keydata, arg){
var match = Riak.mapValuesJson(value)[0];
var links = value.values[0].metadata.Links;
var result = links.map(function(l) {
return [l[0], l[1], match];
});
return result;
}
"}
},
{"map":{"language": "javascript", "source": "
function(value, keydata, arg) {
var doc = Riak.mapValuesJson(value)[0];
return [doc, keydata];
}
"}
},
{"reduce":{
"language": "javascript",
"source":"
function(values) {
var merged = {};
values.forEach(function(v) {
if(!merged[v.id]) {
merged[v.id] = v;
}
});
var results = [];
for(key in merged) {
results.push(merged[key]);
}
return results;
}
"
}
}
]
}
EOF

在第二个版本中,我们执行四个单独的 Map-Reduce 查询以从三个存储桶中获取对象:
async.series([
//First get all matches
function(callback) {
db.mapreduce
.add(inputs)
.map(function (val, key, arg) {
var data = Riak.mapValuesJson(val)[0];
if(arg.leagueId && arg.leagueId != data.leagueId) {
return [];
}
var d = new Date();
var date = data.startTime || data.firstHalfStartTime || data.secondHalfStartTime;
d.setFullYear(date.substring(0, 4));
d.setMonth(date.substring(5, 7) - 1);
d.setDate(date.substring(8, 10));
d.setHours(date.substring(11, 13));
d.setMinutes(date.substring(14, 16));
d.setSeconds(date.substring(17, 19));
d.setMilliseconds(0);
startTimestamp = d.getTime();
var short = {
id: data.id,
l: data.leagueId,
h: data.homeTeamId,
a: data.awayTeamId,
t: startTimestamp,
s: data.score,
c: startTimestamp
};
return [short];
}, {leagueId: query.leagueId, page: query.page}).reduce(function (val, key) {
return val;
}).run(function (err, matches) {
matches.forEach(function(match) {
result.match[match.id] = match; //Should maybe filter this
leagueIds.push(match.l);
teamIds.push(match.h);
teamIds.push(match.a);
});
callback();
});
},
//Then get all leagues, teams and lines in parallel
function(callback) {
async.parallel([
//Leagues
function(callback) {
db.getMany('leagues', leagueIds, function(err, leagues) {
if (err) { callback(err); return; }
leagues.forEach(function(league) {
visibleLeagueIds[league.id] = true;
result.league[league.id] = {
r: league.regionId,
n: league.name,
s: league.name
};
});
callback();
});
},
//Teams
function(callback) {
db.getMany('teams', teamIds, function(err, teams) {
if (err) { callback(err); return; }
teams.forEach(function(team) {
result.team[team.id] = {
n: team.name,
h: team.name,
s: team.stats
};
});
callback();
});
}
], callback);
}
], function(err) {
if (err) { callback(err); return; }
_.each(regionModel.getAll(), function(region) {
result.region[region.id] = {
id: region.id,
c: 'https://d1goqbu19rcwi8.cloudfront.net/icons/silk-flags/' + region.icon + '.png',
n: region.name
};
});
var response = {
success: true,
result: {
modelRecords: result,
paging: {
page: query.page,
pageSize: 50,
total: result.match.length
},
time: moment().diff(a)/1000.00,
visibleLeagueIds: visibleLeagueIds
}
};
callback(null, JSON.stringify(response, null, '\t'));
});

我们如何使这些查询更快?

附加信息:

我们使用 riak-js 和 node.js 来运行我们的查询。

最佳答案

使其至少快一点的一种方法是将 JavaScript mapreduce 函数部署到服务器,而不是将它们作为作业的一部分传递。 (参见 js_source_dir 参数 here 的描述)。如果您有重复运行的 JavaScript 函数,通常建议这样做。

由于与在 Erlang 中实现的原生函数相比,运行 JavaScript mapreduce 函数会产生一些开销,因此在可能的情况下使用非 JavaScript 函数也可能有所帮助。

第一个查询中的两个映射阶段函数似乎旨在解决正常链接阶段(我认为效率更高)不会传递正在处理的记录(匹配记录)的限制。第一个函数包含所有链接,并将匹配数据作为 JSON 格式的附加数据传递,而第二个函数以 JSON 格式传递匹配数据和链接记录。

我编写了一个简单的 Erlang 函数,其中包含所有链接以及传入记录的 ID。这可以与原生 Erlang 函数 一起使用。 riak_kv_mapreduce:map_object_value 替换第一个示例中的两个 map 阶段函数,删除一些 JavaScript 用法。与现有解决方案一样,我希望您会收到许多重复项,因为多场比赛可能会链接到同一个联赛/球队。

-module(riak_mapreduce_example).

-export([map_link/3]).

%% @spec map_link(riak_object:riak_object(), term(), term()) ->
%% [{{Bucket :: binary(), Key :: binary()}, Props :: term()}]
%% @doc map phase function for adding linked records to result set
map_link({error, notfound}, _, _) ->
[];
map_link(RiakObject, Props, _) ->
Bucket = riak_object:bucket(RiakObject),
Key = riak_object:key(RiakObject),
Meta = riak_object:get_metadata(RiakObject),
Current = [{{Bucket, Key}, Props}],
Links = case dict:find(<<"Links">>, Meta) of
{ok, List} ->
[{{B, K}, Props} || {{B, K}, _Tag} <- List];
error ->
[]
end,
lists:append([Current, Links]).

这些结果可以发送回客户端进行聚合,也可以传递到您提供的示例中的 reduce 阶段函数。

示例函数需要在所有节点上编译和安装,并且可能需要重新启动。

另一种提高性能的方法(这可能不是您的选择)可能是改变数据模型,以避免对性能关键查询完全使用 mapreduce 查询。

关于mapreduce - 如何进行更快的 Riak MapReduce 查询?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13059953/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com