Простая Java Map/Reduce Framework
Может кто-нибудь указать мне на простой, с открытым исходным кодом, Map/Reduce Framework/API для Java? Кажется, не так уж много доказательств существования такой вещи, но кто-то может знать другое.
Лучшее, что я могу найти, это, конечно, Hadoop MapReduce, но это не соответствует "простым" критериям. Мне не нужна способность запускать распределенные задания, просто что-то, что позволило бы мне запускать задания отображения / сокращения в стиле на многоядерной машине, в одной JVM, используя стандартный параллелизм в стиле Java5.
Это не сложно написать самому, но я бы предпочел не делать этого.
10 ответов
Я думаю, что стоит упомянуть, что эти проблемы являются историей с Java 8. Пример:
int heaviestBlueBlock =
blocks.filter(b -> b.getColor() == BLUE)
.map(Block::getWeight)
.reduce(0, Integer::max);
Другими словами: одноузловой MapReduce доступен в Java 8.
Для более подробной информации смотрите презентацию Брайана Гетца о проекте lambda.
Вы проверили Акку? В то время как akka действительно является распределенной средой параллелизма на основе модели Actor, вы можете реализовать множество вещей просто с помощью небольшого кода. С ним так просто разделить работу на части, и она автоматически использует все преимущества многоядерной машины, а также возможность использовать несколько машин для обработки работы. В отличие от использования потоков, это кажется мне более естественным.
У меня есть Java карта уменьшить пример с помощью Акка. Это не самый простой пример уменьшения карты, поскольку он использует фьючерсы; но это должно дать вам приблизительное представление о том, что происходит. Есть несколько основных вещей, которые демонстрирует мой пример сокращения карты:
- Как разделить работу.
- Как назначить работу: у akka действительно простая система обмена сообщениями, а также рабочий разделитель, расписание которого вы можете настроить. Как только я научился им пользоваться, я не мог остановиться. Это так просто и гибко. Я использовал все четыре ядра процессора в кратчайшие сроки. Это действительно здорово для реализации услуг.
- Как узнать, когда работа выполнена и результат готов к обработке: на самом деле это та часть, которая может быть наиболее сложной и запутанной для понимания, если вы уже не знакомы с Futures. Вам не нужно использовать фьючерсы, так как есть другие варианты. Я просто использовал их, потому что я хотел что-то более короткое для людей, чтобы впасть.
Если у вас есть какие-либо вопросы, у Stackru действительно есть замечательный раздел QA.
Я использую следующую структуру
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(procs);
List<Future<TaskResult>> results = new ArrayList();
for(int i=0;i<tasks;i++)
results.add(es.submit(new Task(i)));
for(Future<TaskResult> future:results)
reduce(future);
Я понимаю, что это может быть немного позже, но вы можете взглянуть на классы JSR166y ForkJoin из JDK7.
Существует библиотека с обратным портированием, которая работает под JDK6 без каких-либо проблем, так что вам не нужно ждать следующего тысячелетия, чтобы продолжить. Он находится где-то между непосредственным исполнителем и hadoop, предоставляя основу для работы над сокращением карты в рамках текущей JVM.
Я создал для себя уникальную пару лет назад, когда приобрел 8-ядерный компьютер, но мне это не очень понравилось. Мне никогда не удавалось использовать его так просто, как я надеялся, а задачи, требующие большого объема памяти, плохо масштабировались.
Если вы не получите никаких реальных ответов, я могу поделиться еще, но суть этого:
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
while (inputIterator.hasNext()) {
TMapInput m = inputIterator.next();
Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
futureSet.add(f);
Thread.sleep(10);
}
while (!futureSet.isEmpty()) {
Thread.sleep(5);
for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
Future<TMapOutput> f = fit.next();
if (f.isDone()) {
fit.remove();
TMapOutput x = f.get();
m_reducer.reduce(x);
}
}
}
return m_reducer.getResult();
}
}
РЕДАКТИРОВАТЬ: на основе комментария ниже версия без sleep
, Хитрость заключается в использовании CompletionService
который по существу обеспечивает блокировку очереди завершенных Future
s.
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Collection<TMapInput> input) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
CompletionService<TMapOutput> futurePool =
new ExecutorCompletionService<TMapOutput>(pool);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
for (TMapInput m : input) {
futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
}
pool.shutdown();
int n = futureSet.size();
for (int i = 0; i < n; i++) {
m_reducer.reduce(futurePool.take().get());
}
return m_reducer.getResult();
}
Я также отмечу, что это очень изощренный алгоритм сокращения карт, включающий в себя один рабочий редукции, который выполняет операции уменьшения и слияния.
Мне нравится использовать Skandium для параллелизма в Java. Платформа реализует определенные шаблоны параллелизма (а именно Master-Slave, Map/Reduce, Pipe, Fork и Divide & Conquer) для многоядерных машин с общей памятью. Эта техника называется "алгоритмические скелеты". Шаблоны могут быть вложенными.
В деталях есть скелеты и мышцы. Мышцы выполняют реальную работу (расщепление, слияние, выполнение и кондиционирование). Скелеты представляют шаблоны параллелизма, за исключением "Пока", "Для" и "Если", которые могут быть полезны при вложении шаблонов.
Примеры можно найти внутри фреймворка. Мне нужно было немного понять, как использовать мышцы и скелеты, но после преодоления этого препятствия мне действительно нравятся эти рамки.:)
Возможно, вы захотите взглянуть на веб-сайт проекта Functionals 4 Java: http://f4j.rethab.ch/ Он представляет фильтр, отображение и сокращение до версий Java до 8.
API MapReduce был введен в версии 3.2 Hazelcast (см. Раздел API MapReduce в документации). Хотя Hazelcast предназначен для использования в распределенной системе, он отлично работает в конфигурации с одним узлом и довольно легкий.
Вы можете попробовать LeoTask: параллельная задача и структура агрегации результатов
Это бесплатно и с открытым исходным кодом: https://github.com/mleoking/leotask
Вот краткое введение, показывающее его API: https://github.com/mleoking/leotask/blob/master/leotask/introduction.pdf?raw=true
Это легковесный фреймворк, работающий на одном компьютере с использованием всех доступных процессорных ядер.
Он имеет следующие особенности:
- Автоматическое и параллельное исследование пространства параметров
- Гибкая и основанная на конфигурации агрегация результатов
- Модель программирования, ориентированная только на ключевую логику
- Надежное и автоматическое восстановление прерываний
и коммунальные услуги:
- Динамические и клонируемые сетевые структуры.
- Интеграция с Gnuplot
- Генерация сети по распространенным сетевым моделям
- DelimitedReader: сложный читатель, который исследует файлы CSV (значения, разделенные запятыми), такие как база данных
- Быстрый генератор случайных чисел на основе алгоритма Мерсенна Твистера
- Интегрированный CurveFitter из проекта ImageJ