Как сделать более быстрый Riak MapReduce Query?

Как мы можем сделать наши запросы MapReduce быстрее?

Мы создали приложение с использованием пятиузлового кластера Riak DB. Наша модель данных состоит из трех сегментов: матчей, лиг и команд.

Матчи содержат ссылки на лиги и команды:

модель

var match = {
        id: matchId,
        leagueId: meta.leagueId,
        homeTeamId: meta.homeTeamId,
        awayTeamId: meta.awayTeamId,
        startTime: m.match.startTime,
        firstHalfStartTime: m.match.firstHalfStartTime,
        secondHalfStartTime: m.match.secondHalfStartTime,
        score: {
            goals: {
                a: 1*safeGet(m.match, 'score.goals.a'),
                b: 1*safeGet(m.match, 'score.goals.b')
            },
            corners: {
                a: 1*safeGet(m.match, 'score.corners.a'),
                b: 1*safeGet(m.match, 'score.corners.b')
            }
        }
    };

var options = {
        index: {
            leagueId: match.leagueId,
            teamId: [match.homeTeamId, match.awayTeamId],
            startTime: match.startTime || match.firstHalfStartTime || match.secondHalfStartTime
        },
        links: [
            { bucket: 'leagues', key: match.leagueId, tag: 'league' },
            { bucket: 'teams', key: match.homeTeamId, tag: 'home' },
            { bucket: 'teams', key: match.awayTeamId, tag: 'away' }
        ]
    };
    match.model = 'match';
    modelCache.save('matches', match.id, match, options, callback);

Запросы

Мы пишем запрос, который возвращает результаты из нескольких сегментов, один из способов - запросить каждый сегмент отдельно. Другой способ - использовать ссылки для объединения результатов из одного запроса.

Две версии запроса, которые мы попробовали, занимают по две секунды, независимо от того, насколько мал размер нашего сегмента. Первая версия использует две фазы карты, которые мы смоделировали после этого поста ( Практическая карта-уменьшение: пересылка и сбор).

#!/bin/bash
curl -X POST \
-H "content-type: application/json" \
-d @- \
http://localhost:8091/mapred \
<<EOF
{
    "inputs":{
        "bucket":"matches",
        "index":"startTime_bin",
        "start":"2012-10-22T23:00:00",
        "end":"2012-10-24T23:35:00"
    },
    "query": [
        {"map":{"language": "javascript", "source":"
                function(value, keydata, arg){
                    var match = Riak.mapValuesJson(value)[0];
                    var links = value.values[0].metadata.Links;
                    var result = links.map(function(l) {
                        return [l[0], l[1], match];
                    });
                    return result;
                }
            "}
        },
        {"map":{"language": "javascript", "source": "
                function(value, keydata, arg) {
                    var doc = Riak.mapValuesJson(value)[0];
                    return [doc, keydata];
                }
            "}
        },
        {"reduce":{
            "language": "javascript",
                "source":"
                    function(values) {
                        var merged = {};
                        values.forEach(function(v) {
                            if(!merged[v.id]) {
                                merged[v.id] = v;
                            }
                        });
                        var results = [];
                        for(key in merged) {
                            results.push(merged[key]);
                        }
                        return results;
                    }
                "
            }
        }
    ]
}
EOF

Во второй версии мы выполняем четыре отдельных запроса Map-Reduce, чтобы получить объекты из трех сегментов:

async.series([
        //First get all matches
        function(callback) {
            db.mapreduce
                .add(inputs)
                .map(function (val, key, arg) {
                    var data = Riak.mapValuesJson(val)[0];
                    if(arg.leagueId && arg.leagueId != data.leagueId) {
                        return [];
                    }
                    var d = new Date();
                    var date = data.startTime || data.firstHalfStartTime || data.secondHalfStartTime;
                    d.setFullYear(date.substring(0, 4));
                    d.setMonth(date.substring(5, 7) - 1);
                    d.setDate(date.substring(8, 10));
                    d.setHours(date.substring(11, 13));
                    d.setMinutes(date.substring(14, 16));
                    d.setSeconds(date.substring(17, 19));
                    d.setMilliseconds(0);
                    startTimestamp = d.getTime();
                    var short = {
                        id: data.id,
                        l: data.leagueId,
                        h: data.homeTeamId,
                        a: data.awayTeamId,
                        t: startTimestamp,
                        s: data.score,
                        c: startTimestamp
                    };
                    return [short];
                }, {leagueId: query.leagueId, page: query.page}).reduce(function (val, key) {
                    return val;
                }).run(function (err, matches) {
                    matches.forEach(function(match) {
                        result.match[match.id] = match; //Should maybe filter this
                        leagueIds.push(match.l);
                        teamIds.push(match.h);
                        teamIds.push(match.a);
                    });
                    callback();
                });
        },
        //Then get all leagues, teams and lines in parallel
        function(callback) {
            async.parallel([
                //Leagues
                function(callback) {
                    db.getMany('leagues', leagueIds, function(err, leagues) {
                        if (err) { callback(err); return; }
                        leagues.forEach(function(league) {
                            visibleLeagueIds[league.id] = true;
                            result.league[league.id] = {
                                r: league.regionId,
                                n: league.name,
                                s: league.name
                            };
                        });
                        callback();
                    });
                },
                //Teams
                function(callback) {
                    db.getMany('teams', teamIds, function(err, teams) {
                        if (err) { callback(err); return; }
                        teams.forEach(function(team) {
                            result.team[team.id] = {
                                n: team.name,
                                h: team.name,
                                s: team.stats
                            };
                        });
                        callback();
                    });
                }
            ], callback);
        }
    ], function(err) {
        if (err) { callback(err); return; }
        _.each(regionModel.getAll(), function(region) {
           result.region[region.id] = {
               id: region.id,
               c: 'https://d1goqbu19rcwi8.cloudfront.net/icons/silk-flags/' + region.icon + '.png',
               n: region.name
           };
        });
        var response = {
            success: true,
            result: {
                modelRecords: result,
                paging: {
                    page: query.page,
                    pageSize: 50,
                    total: result.match.length
                },
                time: moment().diff(a)/1000.00,
                visibleLeagueIds: visibleLeagueIds
            }
        };
        callback(null, JSON.stringify(response, null, '\t'));
    });

Как мы делаем эти запросы быстрее?

Дополнительная информация:

Мы используем riak-js и node.js для запуска наших запросов.

1 ответ

Решение

Один из способов сделать это, по крайней мере, немного быстрее, - это развернуть функции JavaScript mapreduce на сервере, а не передавать их как часть работы. (см. описание параметра js_source_dir здесь). Обычно это рекомендуется, если у вас есть функции JavaScript, которые вы запускаете повторно.

Поскольку есть некоторые накладные расходы, связанные с выполнением функций JavaScript mapreduce по сравнению с нативными, реализованными в Erlang, использование функций не-JavaScript, где это возможно, также может помочь.

Похоже, что две функции фазы карты в вашем первом запросе предназначены для обхода ограничения, заключающегося в том, что обычная фаза связывания (которая, на мой взгляд, более эффективна) не передает обрабатываемую запись (запись совпадений). Первая функция включает все ссылки и передает данные о совпадении в качестве дополнительных данных в форме JSON, а вторая передает данные о совпадении, а также связанную запись в форме JSON.

Я написал простую функцию Erlang, которая включает в себя все ссылки, а также идентификатор передаваемой записи. Это можно использовать вместе с встроенной функцией Erlang riak_kv_mapreduce:map_object_value для замены двух функций фазы карты в первом примере, удалив некоторые из использование JavaScript. Как и в существующем решении, я ожидаю, что вы получите несколько дубликатов, поскольку несколько матчей могут быть связаны с одной лигой / командой.

-module(riak_mapreduce_example).

-export([map_link/3]).

%% @spec map_link(riak_object:riak_object(), term(), term()) ->
%%                   [{{Bucket :: binary(), Key :: binary()}, Props :: term()}]
%% @doc map phase function for adding linked records to result set
map_link({error, notfound}, _, _) ->
    [];
map_link(RiakObject, Props, _) ->
    Bucket = riak_object:bucket(RiakObject),
    Key = riak_object:key(RiakObject),
    Meta = riak_object:get_metadata(RiakObject),
    Current = [{{Bucket, Key}, Props}],
    Links = case dict:find(<<"Links">>, Meta) of
        {ok, List} ->
            [{{B, K}, Props} || {{B, K}, _Tag} <- List];
        error ->
            []
    end,
    lists:append([Current, Links]).

Результаты этого могут быть либо отправлены обратно клиенту для агрегирования, либо переданы в функцию сокращения фазы, как в приведенном вами примере.

Пример функции должен быть скомпилирован и установлен на всех узлах и может потребовать перезапуска.

Другим способом повышения производительности (который вполне может не подойти вам), возможно, было бы изменение модели данных, чтобы избежать необходимости использовать запросы mapreduce для запросов, критичных к производительности.

Другие вопросы по тегам