"Parallel.For" для Java?
Мне было интересно, если есть Parallel.For эквивалент.NET версии для Java?
Если есть кто-нибудь, пожалуйста, приведите пример? Спасибо!
10 ответов
Я думаю, что самая близкая вещь будет:
ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);
try {
for (final Object o : list) {
exec.submit(new Runnable() {
@Override
public void run() {
// do stuff with o.
}
});
}
} finally {
exec.shutdown();
}
Основываясь на комментариях TheLQ, вы должны установить SUM_NUM_THREADS на Runtime.getRuntime().availableProcessors();
Изменить: Решено добавить базовую реализацию "Parallel.For"
public class Parallel {
private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();
private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For"));
public static <T> void For(final Iterable<T> elements, final Operation<T> operation) {
try {
// invokeAll blocks for us until all submitted tasks in the call complete
forPool.invokeAll(createCallables(elements, operation));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) {
List<Callable<Void>> callables = new LinkedList<Callable<Void>>();
for (final T elem : elements) {
callables.add(new Callable<Void>() {
@Override
public Void call() {
operation.perform(elem);
return null;
}
});
}
return callables;
}
public static interface Operation<T> {
public void perform(T pParameter);
}
}
Пример использования Parallel.For
// Collection of items to process in parallel
Collection<Integer> elems = new LinkedList<Integer>();
for (int i = 0; i < 40; ++i) {
elems.add(i);
}
Parallel.For(elems,
// The operation to perform with each item
new Parallel.Operation<Integer>() {
public void perform(Integer param) {
System.out.println(param);
};
});
Я думаю, что эта реализация действительно больше похожа на Parallel.ForEach
Edit Я разместил это на GitHub, если кому-то интересно. Параллельно для на GitHub
Решение MLaw - это очень практичный Parallel.ForEach. Я добавил немного модификации, чтобы сделать Parallel.For.
public class Parallel
{
static final int iCPU = Runtime.getRuntime().availableProcessors();
public static <T> void ForEach(Iterable <T> parameters,
final LoopBody<T> loopBody)
{
ExecutorService executor = Executors.newFixedThreadPool(iCPU);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (final T param : parameters)
{
Future<?> future = executor.submit(new Runnable()
{
public void run() { loopBody.run(param); }
});
futures.add(future);
}
for (Future<?> f : futures)
{
try { f.get(); }
catch (InterruptedException e) { }
catch (ExecutionException e) { }
}
executor.shutdown();
}
public static void For(int start,
int stop,
final LoopBody<Integer> loopBody)
{
ExecutorService executor = Executors.newFixedThreadPool(iCPU);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (int i=start; i<stop; i++)
{
final Integer k = i;
Future<?> future = executor.submit(new Runnable()
{
public void run() { loopBody.run(k); }
});
futures.add(future);
}
for (Future<?> f : futures)
{
try { f.get(); }
catch (InterruptedException e) { }
catch (ExecutionException e) { }
}
executor.shutdown();
}
}
public interface LoopBody <T>
{
void run(T i);
}
public class ParallelTest
{
int k;
public ParallelTest()
{
k = 0;
Parallel.For(0, 10, new LoopBody <Integer>()
{
public void run(Integer i)
{
k += i;
System.out.println(i);
}
});
System.out.println("Sum = "+ k);
}
public static void main(String [] argv)
{
ParallelTest test = new ParallelTest();
}
}
Основанный на предложении mlaw, добавьте CountDownLatch. Добавьте chunksize, чтобы уменьшить submit().
При тестировании с массивом из 4 миллионов элементов этот показатель дает 5-кратное ускорение по сравнению с последовательностью for() на моем процессоре Core i7 2630QM.
public class Loop {
public interface Each {
void run(int i);
}
private static final int CPUs = Runtime.getRuntime().availableProcessors();
public static void withIndex(int start, int stop, final Each body) {
int chunksize = (stop - start + CPUs - 1) / CPUs;
int loops = (stop - start + chunksize - 1) / chunksize;
ExecutorService executor = Executors.newFixedThreadPool(CPUs);
final CountDownLatch latch = new CountDownLatch(loops);
for (int i=start; i<stop;) {
final int lo = i;
i += chunksize;
final int hi = (i<stop) ? i : stop;
executor.submit(new Runnable() {
public void run() {
for (int i=lo; i<hi; i++)
body.run(i);
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {}
executor.shutdown();
}
public static void main(String [] argv) {
Loop.withIndex(0, 9, new Loop.Each() {
public void run(int i) {
System.out.println(i*10);
}
});
}
}
Вот мой вклад в эту тему https://github.com/pablormier/parallel-loops. Использование очень просто:
Collection<String> upperCaseWords =
Parallel.ForEach(words, new Parallel.F<String, String>() {
public String apply(String s) {
return s.toUpperCase();
}
});
Также возможно изменить некоторые аспекты поведения, например количество потоков (по умолчанию используется пул кэшированных потоков):
Collection<String> upperCaseWords =
new Parallel.ForEach<String, String>(words)
.withFixedThreads(4)
.apply(new Parallel.F<String, String>() {
public String apply(String s) {
return s.toUpperCase();
}
}).values();
Весь код содержится в одном Java-классе и не имеет больше зависимостей, чем JDK. Я также призываю вас проверить новый способ распараллеливания в функциональном стиле с Java 8
Фреймворк Fork Join в Java 7 предназначен для поддержки параллелизма. Но я не знаю о точном эквиваленте для Parallel.For
,
Более простой вариант будет
// A thread pool which runs for the life of the application.
private static final ExecutorService EXEC =
Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);
//later
EXEC.invokeAll(tasks); // you can optionally specify a timeout.
Существует аналог для Parallel.For, доступный как расширение Java. Он называется Ateji PX, у них есть бесплатная версия, с которой можно играть. http://www.ateji.com/px/index.html
Это точный эквивалент параллеля.for и похож на.
For ||
Больше примеров и объяснений в Википедии: http://en.wikipedia.org/wiki/Ateji_PX
Закрытая вещь в Java IMO
Синхронизация часто убивает ускорение параллельных циклов for. Следовательно, параллельные циклы for часто нуждаются в своих личных данных и в механизме сокращения, чтобы свести все потоки частных данных к единому результату.
Поэтому я расширил версию Parallel.For Weimin Xiao
с помощью механизма сокращения.
public class Parallel {
public static interface IntLoopBody {
void run(int i);
}
public static interface LoopBody<T> {
void run(T i);
}
public static interface RedDataCreator<T> {
T run();
}
public static interface RedLoopBody<T> {
void run(int i, T data);
}
public static interface Reducer<T> {
void run(T returnData, T addData);
}
private static class ReductionData<T> {
Future<?> future;
T data;
}
static final int nCPU = Runtime.getRuntime().availableProcessors();
public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) {
ExecutorService executor = Executors.newFixedThreadPool(nCPU);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (final T param : parameters) {
futures.add(executor.submit(() -> loopBody.run(param) ));
}
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
}
executor.shutdown();
}
public static void For(int start, int stop, final IntLoopBody loopBody) {
final int chunkSize = (stop - start + nCPU - 1)/nCPU;
final int loops = (stop - start + chunkSize - 1)/chunkSize;
ExecutorService executor = Executors.newFixedThreadPool(loops);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (int i=start; i < stop; ) {
final int iStart = i;
i += chunkSize;
final int iStop = (i < stop) ? i : stop;
futures.add(executor.submit(() -> {
for (int j = iStart; j < iStop; j++)
loopBody.run(j);
}));
}
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
}
executor.shutdown();
}
public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) {
final int chunkSize = (stop - start + nCPU - 1)/nCPU;
final int loops = (stop - start + chunkSize - 1)/chunkSize;
ExecutorService executor = Executors.newFixedThreadPool(loops);
List<ReductionData<T>> redData = new LinkedList<ReductionData<T>>();
for (int i = start; i < stop; ) {
final int iStart = i;
i += chunkSize;
final int iStop = (i < stop) ? i : stop;
final ReductionData<T> rd = new ReductionData<T>();
rd.data = creator.run();
rd.future = executor.submit(() -> {
for (int j = iStart; j < iStop; j++) {
loopBody.run(j, rd.data);
}
});
redData.add(rd);
}
for (ReductionData<T> rd : redData) {
try {
rd.future.get();
if (rd.data != null) {
reducer.run(result, rd.data);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
}
}
Вот простой тестовый пример: параллельный счетчик символов с использованием несинхронизированной карты.
import java.util.*;
public class ParallelTest {
static class Counter {
int cnt;
Counter() {
cnt = 1;
}
}
public static void main(String[] args) {
String text = "More formally, if this map contains a mapping from a key k to a " +
"value v such that key compares equal to k according to the map's ordering, then " +
"this method returns v; otherwise it returns null.";
Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>();
Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>();
// first sequentially
for(int i=0; i < text.length(); i++) {
char c = text.charAt(i);
Counter cnt = charCounter1.get(c);
if (cnt == null) {
charCounter1.put(c, new Counter());
} else {
cnt.cnt++;
}
}
for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue().cnt);
}
// now parallel without synchronization
Parallel.For(0, text.length(), charCounter2,
// Creator
() -> new TreeMap<Character, Counter>(),
// Loop Body
(i, map) -> {
char c = text.charAt(i);
Counter cnt = map.get(c);
if (cnt == null) {
map.put(c, new Counter());
} else {
cnt.cnt++;
}
},
// Reducer
(result, map) -> {
for(Map.Entry<Character, Counter> entry: map.entrySet()) {
Counter cntR = result.get(entry.getKey());
if (cntR == null) {
result.put(entry.getKey(), entry.getValue());
} else {
cntR.cnt += entry.getValue().cnt;
}
}
}
);
// compare results
assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size();
Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator();
for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
Map.Entry<Character, Counter> entry2 = it2.next();
assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content";
}
System.out.println("Well done!");
}
}
Я обнаружил, что ForkJoinPool и IntStream очень полезны в моем случае (Parallel For с ограниченным числом потоков).
C#:
static void MathParallel(int threads)
{
Parallel.For(1, partitions, new ParallelOptions { MaxDegreeOfParallelism = threads }, (i) => {
partitionScores[i] = Math.Sin(3*i);
});
}
и эквивалент Java:
static void mathParallel(int threads) {
ForkJoinPool pool = new ForkJoinPool(threads);
pool.submit(()-> IntStream.range(0, partitions).parallel().forEach(i -> {
partitionScores[i] = Math.sin(3*i);
}));
pool.shutdown();
while (!pool.isTerminated()){
}
}
У меня есть обновленный класс Java Parallel, который может выполнять Parallel.For, Parallel.ForEach, Parallel.Tasks и секционированный параллельный цикл. Исходный код выглядит следующим образом:
Примеры использования этих параллельных циклов следующие:
public static void main(String [] argv)
{
//sample data
final ArrayList<String> ss = new ArrayList<String>();
String [] s = {"a", "b", "c", "d", "e", "f", "g"};
for (String z : s) ss.add(z);
int m = ss.size();
//parallel-for loop
System.out.println("Parallel.For loop:");
Parallel.For(0, m, new LoopBody<Integer>()
{
public void run(Integer i)
{
System.out.println(i +"\t"+ ss.get(i));
}
});
//parallel for-each loop
System.out.println("Parallel.ForEach loop:");
Parallel.ForEach(ss, new LoopBody<String>()
{
public void run(String p)
{
System.out.println(p);
}
});
//partitioned parallel loop
System.out.println("Partitioned Parallel loop:");
Parallel.ForEach(Parallel.create(0, m), new LoopBody<Partition>()
{
public void run(Partition p)
{
for(int i=p.start; i<p.end; i++)
System.out.println(i +"\t"+ ss.get(i));
}
});
//parallel tasks
System.out.println("Parallel Tasks:");
Parallel.Tasks(new Task []
{
//task-1
new Task() {public void run()
{
for(int i=0; i<3; i++)
System.out.println(i +"\t"+ ss.get(i));
}},
//task-2
new Task() {public void run()
{
for (int i=3; i<6; i++)
System.out.println(i +"\t"+ ss.get(i));
}}
});
}
Это то, что я использую для Java 7 и менее.
Для Java 8 вы можете использовать forEach ()
[ОБНОВИТЬ ]
Параллельный класс:
private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();
private static final int MAX_THREAD = NUM_CORES*2;
public static <T2 extends T, T> void For(final Iterable<T2> elements, final Operation<T> operation) {
if (elements != null) {
final Iterator<T2> iterator = elements.iterator();
if (iterator.hasNext()) {
final Throwable[] throwable = new Throwable[1];
final Callable<Void> callable = new Callable<Void>() {
boolean first = true;
@Override
public final Void call() throws Exception {
if ((first || operation.follow()) && iterator.hasNext()) {
T result;
result = iterator.next();
operation.perform(result);
if (first) {
synchronized (this) {
first = false;
}
}
}
return null;
}
};
final Runnable runnable = new Runnable() {
@Override
public final void run() {
while (iterator.hasNext()) {
try {
synchronized (callable) {
callable.call();
}
if (!operation.follow()) {
break;
}
} catch (Throwable t) {
t.printStackTrace();
synchronized (throwable) {
throwable[0] = t;
}
throw new RuntimeException(t);
}
}
}
};
final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD);
for (int threadIndex=0; threadIndex<MAX_THREAD && iterator.hasNext(); threadIndex++) {
executor.execute(runnable);
}
executor.shutdown();
while (!executor.isTerminated()) {
try {
Thread.sleep(0,1);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if (throwable[0] != null) throw new RuntimeException(throwable[0]);
}
}
}
public interface Operation<T> {
void perform(T pParameter);
boolean follow();
}
пример
@Test
public void test() {
List<Long> longList = new ArrayList<Long>();
for (long i = 0; i < 1000000; i++) {
longList.add(i);
}
final List<Integer> integerList = new LinkedList<>();
Parallel.For((Iterable<? extends Number>) longList, new Parallel.Operation<Number>() {
@Override
public void perform(Number pParameter) {
System.out.println(pParameter);
integerList.add(pParameter.intValue());
}
@Override
public boolean follow() {
return true;
}
});
for (Number num : integerList) {
System.out.println(num);
}
}