Многочисленные краткосрочные задачи медленнее, когда многопоточные, даже с пулом потоков
Фон
В настоящее время у меня есть линейный физический движок (но этот вопрос не требует знаний физического движка), части которого я экспериментирую с многопоточностью в надежде на повышение эффективности.
Одним из таких разделов является широкая фаза, в данном случае это включает перемещение по всем объектам вдоль всех трех осей, чтобы проверить, какое перекрытие (любое, что происходит по всем осям, считается столкнувшимся в широкой фазе). 3-осевые развертки, помимо использования обычных объектов, полностью независимы и, таким образом, кажутся хорошим местом для многопоточности.
Чтобы избежать возможности блокировки между потоками, каждый из этих трех процессов берет локальную копию всех данных, которые он хочет использовать, прежде чем (если применимо) перейти к многопоточному
В то время как эти развертки представляют собой значительную бутылочную горловину, они очень недолговечны, развертка обычно продолжается 1-4 мс. Это приложение реального времени, где код запускается 60 раз в секунду, поэтому общее время такта составляет максимум 17 мс, поэтому 1-4 мс - это для меня много времени. Поскольку эти циклы недолговечны, я использовал пул потоков. Конкретно Executors.newFixedThreadPool(3)
, 3 для 3 осей.
Мой тестовый компьютер был двухъядерным с гиперпоточностью, поэтому должно быть комфортно до 4 потоков. Проверено с помощью Runtime.getRuntime().availableProcessors();
Вопрос
При выполнении следующего тестового кода, в котором многочисленные краткосрочные задачи выполнялись либо однопоточными, либо многопоточными с использованием пула потоков, многопоточная версия была намного медленнее; посмотреть данные профиля. Это имело место даже тогда, когда многопоточные детали не имели общих объектов. Почему это так и есть ли способ запустить много кратковременных (1-4 мс) задач одновременно?
Даже если сделать задачи намного больше, то только многопоточная версия приближается к однопоточному по производительности, а не превышает его, как я ожидал, что заставляет меня думать, что я делаю что-то серьезно неправильно.
Тестовый код
public class BroadPhaseAxisSweep implements Callable<Set<PotentialCollisionPrecursor>> {
static final int XAXIS=0;
static final int YAXIS=1;
static final int ZAXIS=2;
int axis;
int[] axisIndicies;
boolean[] isStatic;
boolean[] isLightWeight;
boolean[] isCollidable;
//orders the same as axisIndicies
double[] starts;
double[] ends;
private static ExecutorService sweepPool = Executors.newFixedThreadPool(3);
public BroadPhaseAxisSweep(int axis, List<TestObject> allObjects) {
//all data that will be used by the thread is cached internally to avoid
//any concurrent access issues
this.axis = axis;
//allObjects is in reality unsorted, axisIndicies holds sorted indices
//in this case allObjects just "happens" to be already sorted
this.axisIndicies =new int[allObjects.size()];
for(int i=0;i<allObjects.size();i++){
axisIndicies[i]=i;
}
isStatic=new boolean[allObjects.size()];
for(int i=0;i<allObjects.size();i++){
isStatic[i]=allObjects.get(i).isStatic();
}
isLightWeight=new boolean[allObjects.size()];
for(int i=0;i<allObjects.size();i++){
isLightWeight[i]=allObjects.get(i).isLightWeightPhysicsObject();
}
isCollidable=new boolean[allObjects.size()];
for(int i=0;i<allObjects.size();i++){
isCollidable[i]=allObjects.get(i).isCollidable();
}
starts=new double[allObjects.size()];
for(int i=0;i<allObjects.size();i++){
starts[i]=allObjects.get(i).getStartPoint();
}
ends=new double[allObjects.size()];
for(int i=0;i<allObjects.size();i++){
ends[i]=allObjects.get(i).getEndPoint();
}
}
@Override
public Set<PotentialCollisionPrecursor> call() throws Exception {
return axisSweep_simple(axisIndicies);
}
private Set<PotentialCollisionPrecursor> axisSweep_simple(int[] axisIndicies){
Set<PotentialCollisionPrecursor> thisSweep =new HashSet();
for(int i=0;i<starts.length;i++){
if (isCollidable[axisIndicies[i]]){
double activeObjectEnd=ends[i];
//sweep forwards until an objects start is before out end
for(int j=i+1;j<starts.length;j++){
//j<startXsIndicies.length is the bare mininmum contrain, most js wont get that far
if ((isStatic[axisIndicies[i]]&& isStatic[axisIndicies[j]]) || ((isLightWeight[axisIndicies[i]]&& isLightWeight[axisIndicies[j]]))){
//if both objects are static or both are light weight then they cannot by definition collide, we can skip
continue;
}
if (activeObjectEnd>starts[j]){
PotentialCollisionPrecursor potentialCollision=new PotentialCollisionPrecursor(getObjectNumberFromAxisNumber(i),getObjectNumberFromAxisNumber(j));
thisSweep.add(potentialCollision);
}else{
break; //this is as far as this active object goes
}
}
}
}
return thisSweep;
}
private int getObjectNumberFromAxisNumber(int number){
return axisIndicies[number];
}
public static void main(String[] args){
int noOfObjectsUnderTest=250;
List<TestObject> testObjects=new ArrayList<>();
Random rnd=new Random();
double runningStartPosition=0;
for(int i=0;i<noOfObjectsUnderTest;i++){
runningStartPosition+=rnd.nextDouble()*0.01;
testObjects.add(new TestObject(runningStartPosition));
}
while(true){
runSingleTreaded(testObjects);
runMultiThreadedTreaded(testObjects);
}
}
private static void runSingleTreaded(List<TestObject> testObjects) {
try {
//XAXIS used over and over again just for test
Set<PotentialCollisionPrecursor> xSweep=(new BroadPhaseAxisSweep(XAXIS,testObjects)).call();
Set<PotentialCollisionPrecursor> ySweep=(new BroadPhaseAxisSweep(XAXIS,testObjects)).call();
Set<PotentialCollisionPrecursor> zSweep=(new BroadPhaseAxisSweep(XAXIS,testObjects)).call();
System.out.println(xSweep.size()); //just so JIT can't possibly optimise out
System.out.println(ySweep.size()); //just so JIT can't possibly optimise out
System.out.println(zSweep.size()); //just so JIT can't possibly optimise out
} catch (Exception ex) {
//bad practice, example only
Logger.getLogger(BroadPhaseAxisSweep.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static void runMultiThreadedTreaded(List<TestObject> testObjects) {
try {
//XAXIS used over and over again just for test
Future<Set<PotentialCollisionPrecursor>> futureX=sweepPool.submit(new BroadPhaseAxisSweep(XAXIS,testObjects));
Future<Set<PotentialCollisionPrecursor>> futureY=sweepPool.submit(new BroadPhaseAxisSweep(XAXIS,testObjects));
Future<Set<PotentialCollisionPrecursor>> futureZ=sweepPool.submit(new BroadPhaseAxisSweep(XAXIS,testObjects));
Set<PotentialCollisionPrecursor> xSweep=futureX.get();
Set<PotentialCollisionPrecursor> ySweep=futureY.get();
Set<PotentialCollisionPrecursor> zSweep=futureZ.get();
System.out.println(xSweep.size()); //just so JIT can't possibly optimise out
System.out.println(ySweep.size()); //just so JIT can't possibly optimise out
System.out.println(zSweep.size()); //just so JIT can't possibly optimise out
} catch (Exception ex) {
//bad practice, example only
Logger.getLogger(BroadPhaseAxisSweep.class.getName()).log(Level.SEVERE, null, ex);
}
}
public static class TestObject{
final boolean isStatic;
final boolean isLightWeight;
final boolean isCollidable;
final double startPointOnAxis;
final double endPointOnAxis;
public TestObject(double startPointOnAxis) {
Random rnd=new Random();
this.isStatic = rnd.nextBoolean();
this.isLightWeight = rnd.nextBoolean();
this.isCollidable = rnd.nextBoolean();
this.startPointOnAxis = startPointOnAxis;
this.endPointOnAxis =startPointOnAxis+0.2*rnd.nextDouble();
}
public boolean isStatic() {
return isStatic;
}
public boolean isLightWeightPhysicsObject() {
return isLightWeight;
}
public boolean isCollidable() {
return isCollidable;
}
public double getStartPoint() {
return startPointOnAxis;
}
public double getEndPoint() {
return endPointOnAxis;
}
}
}
public class PotentialCollisionPrecursor {
//holds the object numbers of a potential collision, can be converted to a real PotentialCollision using a list of those objects
private final int rigidBodyNumber1;
private final int rigidBodyNumber2;
public PotentialCollisionPrecursor(int rigidBodyNumber1, int rigidBodyNumber2) {
if (rigidBodyNumber1<rigidBodyNumber2){
this.rigidBodyNumber1 = rigidBodyNumber1;
this.rigidBodyNumber2 = rigidBodyNumber2;
}else{
this.rigidBodyNumber1 = rigidBodyNumber2;
this.rigidBodyNumber2 = rigidBodyNumber1;
}
}
public int getRigidBodyNumber1() {
return rigidBodyNumber1;
}
public int getRigidBodyNumber2() {
return rigidBodyNumber2;
}
@Override
public int hashCode() {
int hash = 7;
hash = 67 * hash + this.rigidBodyNumber1;
hash = 67 * hash + this.rigidBodyNumber2;
return hash;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final PotentialCollisionPrecursor other = (PotentialCollisionPrecursor) obj;
if (this.rigidBodyNumber1 != other.rigidBodyNumber1) {
return false;
}
if (this.rigidBodyNumber2 != other.rigidBodyNumber2) {
return false;
}
return true;
}
}
ThreadPools разных размеров
После однопоточной обработки следующим самым быстрым является пул из 2/3 потоков, затем самым медленным из всех является один поток в пуле потоков (что неудивительно, поскольку он имеет все накладные расходы и не имеет усиления)
Неестественно большой размер задач
Чтобы проверить, была ли проблема просто в том, что задачи слишком малы для потоков, я увеличил размер задачи до 100 мс. Это дало еще более запутанные результаты; любое число нитей от 1 до 3 примерно одинаковой скорости и медленнее, чем однопоточная
2 ответа
Я обещал обобщить все результаты здесь... это довольно запутанно, так как главный виновник - профилировщик, который непропорционально замедляет неосновные потоки по отношению к основному потоку. Возможно, он использует атомные счетчики для отслеживания своих данных, и, возможно, их служебные данные настолько велики, что это приводит к таким неочевидным результатам.
Измерение времени вручную дает лучшие результаты, а именно ускорение многопоточности на 30-40 %. Это имеет смысл, поскольку из-за копирования данных возникают большие последовательные издержки.
Это копирование не является ни необходимым, ни полезным. Это не нужно, так как все потоки читают только общие данные. Это бесполезно, поскольку чтение общих переменных не медленнее, чем чтение их собственной копии:
- ядра способны довольно быстро получать данные из кеша друг друга
- они помещают их копию в свои локальные кэши L1 и L2 ("общее" состояние протокола MESI)
- кэш-память третьего уровня является общей, что означает, что из-за ненужного копирования данных требуется больше промахов третьего уровня из-за увеличения объема используемой памяти
Если ваши широкие развертки занимают всего несколько миллисекунд, вам, вероятно, лучше делать все синхронно. Усилие, необходимое для резервирования потока пула потоков (в Windows), занимает более 5 мс. Не говоря уже о том, что вам все еще нужно перемещать данные между потоками и ждать переключения контекста, а затем, наконец, вернуть потоки туда, где вы их нашли.
Весь этот процесс может привести к снижению производительности, особенно если вы берете локальные копии данных. Если бы каждый цикл был, естественно, независимым и занимал более 500 мс, вы, вероятно, могли бы извлечь выгоду из некоторой модели параллелизма, которую вы реализовали.
Интересно отметить, что в наши дни графические процессоры поставляются со встроенными сопроцессорами, предназначенными для физических вычислений. Причина, по которой они так хороши в подобных вещах, заключается в том, что иногда они имеют тысячи процессорных ядер, все они работают с относительно низкой тактовой частотой. Это означает, что они очень хороши для одновременного выполнения множества небольших задач. Возможно, вы захотите попробовать взаимодействовать с графическим процессором напрямую, чтобы перенести физическую обработку в такую среду, а не работать с ней на обычном процессоре.