Простая Java Map/Reduce Framework

Может кто-нибудь указать мне на простой, с открытым исходным кодом, Map/Reduce Framework/API для Java? Кажется, не так уж много доказательств существования такой вещи, но кто-то может знать другое.

Лучшее, что я могу найти, это, конечно, Hadoop MapReduce, но это не соответствует "простым" критериям. Мне не нужна способность запускать распределенные задания, просто что-то, что позволило бы мне запускать задания отображения / сокращения в стиле на многоядерной машине, в одной JVM, используя стандартный параллелизм в стиле Java5.

Это не сложно написать самому, но я бы предпочел не делать этого.

10 ответов

Решение

Я думаю, что стоит упомянуть, что эти проблемы являются историей с Java 8. Пример:

int heaviestBlueBlock =
    blocks.filter(b -> b.getColor() == BLUE)
          .map(Block::getWeight)
          .reduce(0, Integer::max);

Другими словами: одноузловой MapReduce доступен в Java 8.

Для более подробной информации смотрите презентацию Брайана Гетца о проекте lambda.

Вы проверили Акку? В то время как akka действительно является распределенной средой параллелизма на основе модели Actor, вы можете реализовать множество вещей просто с помощью небольшого кода. С ним так просто разделить работу на части, и она автоматически использует все преимущества многоядерной машины, а также возможность использовать несколько машин для обработки работы. В отличие от использования потоков, это кажется мне более естественным.

У меня есть Java карта уменьшить пример с помощью Акка. Это не самый простой пример уменьшения карты, поскольку он использует фьючерсы; но это должно дать вам приблизительное представление о том, что происходит. Есть несколько основных вещей, которые демонстрирует мой пример сокращения карты:

  • Как разделить работу.
  • Как назначить работу: у akka действительно простая система обмена сообщениями, а также рабочий разделитель, расписание которого вы можете настроить. Как только я научился им пользоваться, я не мог остановиться. Это так просто и гибко. Я использовал все четыре ядра процессора в кратчайшие сроки. Это действительно здорово для реализации услуг.
  • Как узнать, когда работа выполнена и результат готов к обработке: на самом деле это та часть, которая может быть наиболее сложной и запутанной для понимания, если вы уже не знакомы с Futures. Вам не нужно использовать фьючерсы, так как есть другие варианты. Я просто использовал их, потому что я хотел что-то более короткое для людей, чтобы впасть.

Если у вас есть какие-либо вопросы, у Stackru действительно есть замечательный раздел QA.

Я использую следующую структуру

int procs = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(procs);

List<Future<TaskResult>> results = new ArrayList();
for(int i=0;i<tasks;i++)
    results.add(es.submit(new Task(i)));
for(Future<TaskResult> future:results)
    reduce(future);

Я понимаю, что это может быть немного позже, но вы можете взглянуть на классы JSR166y ForkJoin из JDK7.

Существует библиотека с обратным портированием, которая работает под JDK6 без каких-либо проблем, так что вам не нужно ждать следующего тысячелетия, чтобы продолжить. Он находится где-то между непосредственным исполнителем и hadoop, предоставляя основу для работы над сокращением карты в рамках текущей JVM.

Я создал для себя уникальную пару лет назад, когда приобрел 8-ядерный компьютер, но мне это не очень понравилось. Мне никогда не удавалось использовать его так просто, как я надеялся, а задачи, требующие большого объема памяти, плохо масштабировались.

Если вы не получите никаких реальных ответов, я могу поделиться еще, но суть этого:

public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        while (inputIterator.hasNext()) {
            TMapInput m = inputIterator.next();
            Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
            futureSet.add(f);
            Thread.sleep(10);
        }
        while (!futureSet.isEmpty()) {
            Thread.sleep(5);
            for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
                Future<TMapOutput> f = fit.next();
                if (f.isDone()) {
                    fit.remove();
                    TMapOutput x = f.get();
                    m_reducer.reduce(x);
                }
            }
        }
        return m_reducer.getResult();
    }
}

РЕДАКТИРОВАТЬ: на основе комментария ниже версия без sleep, Хитрость заключается в использовании CompletionService который по существу обеспечивает блокировку очереди завершенных Futures.

 public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Collection<TMapInput> input) {
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        CompletionService<TMapOutput> futurePool = 
                  new ExecutorCompletionService<TMapOutput>(pool);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        for (TMapInput m : input) {
            futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
        }
        pool.shutdown();
        int n = futureSet.size();
        for (int i = 0; i < n; i++) {
            m_reducer.reduce(futurePool.take().get());
        }
        return m_reducer.getResult();
    }

Я также отмечу, что это очень изощренный алгоритм сокращения карт, включающий в себя один рабочий редукции, который выполняет операции уменьшения и слияния.

Мне нравится использовать Skandium для параллелизма в Java. Платформа реализует определенные шаблоны параллелизма (а именно Master-Slave, Map/Reduce, Pipe, Fork и Divide & Conquer) для многоядерных машин с общей памятью. Эта техника называется "алгоритмические скелеты". Шаблоны могут быть вложенными.

В деталях есть скелеты и мышцы. Мышцы выполняют реальную работу (расщепление, слияние, выполнение и кондиционирование). Скелеты представляют шаблоны параллелизма, за исключением "Пока", "Для" и "Если", которые могут быть полезны при вложении шаблонов.

Примеры можно найти внутри фреймворка. Мне нужно было немного понять, как использовать мышцы и скелеты, но после преодоления этого препятствия мне действительно нравятся эти рамки.:)

Возможно, вы захотите взглянуть на веб-сайт проекта Functionals 4 Java: http://f4j.rethab.ch/ Он представляет фильтр, отображение и сокращение до версий Java до 8.

Вы смотрели на GridGain?

API MapReduce был введен в версии 3.2 Hazelcast (см. Раздел API MapReduce в документации). Хотя Hazelcast предназначен для использования в распределенной системе, он отлично работает в конфигурации с одним узлом и довольно легкий.

Вы можете попробовать LeoTask: параллельная задача и структура агрегации результатов

Это бесплатно и с открытым исходным кодом: https://github.com/mleoking/leotask

Вот краткое введение, показывающее его API: https://github.com/mleoking/leotask/blob/master/leotask/introduction.pdf?raw=true

Это легковесный фреймворк, работающий на одном компьютере с использованием всех доступных процессорных ядер.

Он имеет следующие особенности:

  • Автоматическое и параллельное исследование пространства параметров
  • Гибкая и основанная на конфигурации агрегация результатов
  • Модель программирования, ориентированная только на ключевую логику
  • Надежное и автоматическое восстановление прерываний

и коммунальные услуги:

  • Динамические и клонируемые сетевые структуры.
  • Интеграция с Gnuplot
  • Генерация сети по распространенным сетевым моделям
  • DelimitedReader: сложный читатель, который исследует файлы CSV (значения, разделенные запятыми), такие как база данных
  • Быстрый генератор случайных чисел на основе алгоритма Мерсенна Твистера
  • Интегрированный CurveFitter из проекта ImageJ
Другие вопросы по тегам