Как разбить массив на блоки

У меня есть массив, который представляет точки в кубоиде. Это одномерный массив, который использует следующую функцию индексации для реализации трех измерений:

int getCellIndex(int ix, int iy, int iz) {
    return ix + (iy * numCellsX) + (iz * numCellsX * numCellsY);
}

Количество ячеек в домене составляет:

numCells = (numX + 2) * (numY + 2) * (numZ + 2)

Где numX/numY/numZ - количество ячеек в направлении X/Y/Z. Значение +2 в каждом направлении должно создавать ячейки заполнения вокруг внешней части домена. Количество ячеек в каждом направлении определяется как:

numX = 5 * numY
numZ = numY/2
numY = userInput

Для каждой ячейки я хочу вычислить новое значение для этой ячейки на основе значения ее соседей (то есть трафарета), где ее соседи расположены выше, ниже, слева, справа, спереди и сзади. Тем не менее, я хочу сделать этот расчет только для не плохих ячеек. У меня есть логический массив, который отслеживает, если ячейка плохая. Вот как выглядит вычисление в данный момент:

for(int z = 1; z < numZ+1; z++) {
    for(int y = 1; y < numY+1; y++) {
        for(int x = 1; x < numX+1; x++) {
            if(!isBadCell[ getCellIndex(x,y,z) ] {
                // Do stencil Computation
            }
        }
    }
}

Это не очень хорошая производительность. Я хочу иметь возможность векторизовать цикл для повышения производительности, однако я не могу из-за оператора if. Я знаю, если клетки заранее плохие, и это не меняется на протяжении всего вычисления. Я хотел бы разделить домен на блоки, предпочтительно блоки 4x4x4, чтобы я мог рассчитать априори на блок, если он содержит плохие ячейки, и, если это так, обрабатывать его как обычно, или, если нет, использовать оптимизированную функцию, которая может принимать преимущество векторизации, например

for(block : blocks) {
    if(isBadBlock[block]) {
        slowProcessBlock(block) // As above
    } else {
        fastVectorizedProcessBlock(block)
    }
}

ПРИМЕЧАНИЕ: не требуется, чтобы блоки существовали физически, т. Е. Этого можно достичь, изменив функцию индексации и используя различные индексы для циклического перемещения по массиву. Я открыт для всего, что работает лучше всего.

Функция fastVectorizedProcessBlock() будет выглядеть аналогично функции slowProcessBlock(), но с оператором if remove (поскольку мы знаем, что он не содержит плохих ячеек) и прагмой векторизации.

Как я могу разделить свой домен на блоки, чтобы я мог сделать это? Это кажется сложным, потому что а) количество ячеек в каждом направлении не равно, б) нам нужно учитывать ячейки заполнения, так как мы никогда не должны пытаться вычислить их значение, так как это приведет к отсутствию доступа к памяти границ.

Как я могу затем обработать блоки, которые не содержат поврежденных ячеек, без использования оператора if?

РЕДАКТИРОВАТЬ:

Это идея, которая у меня изначально была:

for(int i = 0; i < numBlocks; i++) { // use blocks of 4x4x4 = 64
    if(!isBadBlock[i]) {
        // vectorization pragma here
        for(int z = 0; z < 4; z++) {
            for(int y = 0; y < 4; y++) {
                for(int x = 0; x < 4; x++) {
                    // calculate stencil using getCellIndex(x,y,z)*i
                }
             }
         }
     } else {
         for(int z = 0; z < 4; z++) {
            for(int y = 0; y < 4; y++) {
                for(int x = 0; x < 4; x++) {
                    if(!isBadCell[i*getCellIndex(x,y,z)]) {    
                    // calculate stencil using getCellIndex(x,y,z)*i
                }
             }
         }
     }
 }

Теперь ячейки будут храниться в блоках, то есть все ячейки в первом блоке 4x4x4 будут храниться в позициях 0-63, затем все ячейки во втором блоке будут храниться в позициях 64-127 и т. Д.

Тем не менее, я не думаю, что будет работать, если значения numX/numY/numZ не являются добрыми. Например, что если numY = 2, numZ = 1 и numX = 10? Циклы for ожидают, что направление z должно быть не менее 4 ячеек. Есть ли хороший способ обойти это?

ОБНОВЛЕНИЕ 2 - Вот как выглядит трафарет:

if ( isBadCell[ getCellIndex(x,y,z) ] ) {
  double temp = someOtherArray[ getCellIndex(x,y,z) ] +
                    1.0/CONSTANT/CONSTANT*
                    (
                      - 1.0 * cells[ getCellIndex(x-1,y,z) ]
                      - 1.0 * cells[ getCellIndex(x+1,y,z) ]
                      - 1.0 * cells[ getCellIndex(x,y-1,z) ]
                      - 1.0 * cells[ getCellIndex(x,y+1,z) ]
                      - 1.0 * cells[ getCellIndex(x,y,z-1) ]
                      - 1.0 * cells[ getCellIndex(x,y,z+1) ]
                      + 6.0 * cells[ getCellIndex(x,y,z) ]
                      );
  globalTemp += temp * temp;
  cells[ getCellIndex(x,y,z) ] += -omega * temp / 6.0 * CONSTANT * CONSTANT;
}

4 ответа

Решение

Где же getCellIndex() получить значения numCellX а также numCellY? Было бы лучше передать их в качестве аргументов, а не полагаться на глобальные переменные, и сделать эту функцию static inline чтобы позволить компилятору оптимизировать.

static line int getCellIndex(int ix, int iy, int iz, int numCellsX, numCellsY) {
    return ix + (iy * numCellsX) + (iz * numCellsX * numCellsY);
}

for (int z = 1; z <= numZ; z++) {
    for (int y = 1; y <= numY; y++) {
        for (int x = 1; x <= numX; x++) {
            if (!isBadCell[getCellIndex(x, y, z, numX + 2, numY + 2)] {
                // Do stencil Computation
            }
        }
    }
}

Вы также можете удалить все умножения с некоторыми локальными переменными:

int index = (numY + 2) * (numX + 2);  // skip top padding plane
for (int z = 1; z <= numZ; z++) {
    index += numX + 2;  // skip first padding row
    for (int y = 1; y <= numY; y++) {
        index += 1;   // skip first padding col
        for (int x = 1; x <= numX; x++, index++) {
            if (!isBadCell[index] {
                // Do stencil Computation
            }
        }
        index += 1;   // skip last padding col
    }
    index += numX + 2;   // skip last padding row
}

Являются ли эти направления многообещающими или нет, во многом зависит от фактических вычислений, выполненных для получения значения шаблона. Вы должны опубликовать это тоже.

Если вы можете изменить формат логического массива для плохих ячеек, было бы полезно заполнить строки кратным 8 и использовать горизонтальное заполнение 8 столбцов для улучшения выравнивания. Создание логического массива в виде массива битов позволяет проверять 8, 16, 32 или даже 64 ячейки одновременно с помощью одного теста.

Вы можете настроить указатель массива, чтобы использовать 0 основанные координаты.

Вот как это будет работать:

int numCellsX = 8 + ((numX + 7) & ~7) + 8;
int numCellsY = 1 + numY + 1;
int numCellsXY = numCellsX * numCellsY;
// adjusted array_pointer
array_pointer = allocated_pointer + 8 + numCellsX + numCellsXY;
// assuming the isBadCell array is 0 based too.
for (int z = 0, indexZ = 0; z < numZ; z++, indexZ += numCellsXY) {
    for (int y = 0, indexY = indexZ; y < numY; y++, indexY += numCellsX) {
        for (int x = 0, index = indexY; x <= numX - 8; x += 8, index += 8) {
            int mask = isBadCell[index >> 3];
            if (mask == 0) {
                // let the compiler unroll computation for 8 pixels with
                for (int i = 0; i < 8; i++) {
                   // compute stencil value for x+i,y,z at index+i
                }
            } else {
                for (int i = 0; i < 8; i++, mask >>= 1) {
                    if (!(mask & 1)) {
                       // compute stencil value for x+i,y,z at index+i
                    }
                }
            }
        }
        int mask = isBadCell[index >> 3];
        for (; x < numX; x++, index++, mask >>= 1) {
            if (!(mask & 1)) {
                // compute stencil value for x,y,z at index
            }
        }
    }
}

РЕДАКТИРОВАТЬ:

Функция трафарета использует слишком много вызовов getCellIndex. Вот как оптимизировать его, используя значение индекса, вычисленное в приведенном выше коде:

// index is the offset of cell x,y,z
// numCellsX, numCellsY are the dimensions of the plane
// numCellsXY is the offset between planes: numCellsX * numCellsY

if (isBadCell[index]) {
    double temp = someOtherArray[index] +
                1.0 / CONSTANT / CONSTANT *
                ( - 1.0 * cells[index - 1]
                  - 1.0 * cells[index + 1]
                  - 1.0 * cells[index - numCellsX]
                  - 1.0 * cells[index + numCellsX]
                  - 1.0 * cells[index - numCellsXY]
                  - 1.0 * cells[index + numCellsXY]
                  + 6.0 * cells[index]
                );
    cells[index] += -omega * temp / 6.0 * CONSTANT * CONSTANT;
    globalTemp += temp * temp;
}

предварительно рассчитав &cells[index] так как указатель может улучшить код, но компиляция должна быть способна обнаруживать это общее подвыражение и уже генерировать эффективный код.

EDIT2:

Вот мозаичный подход: вы можете добавить отсутствующие аргументы, большинство размеров предполагается глобальными, но вам, вероятно, следует передать указатель на структуру контекста со всеми этими значениями. Оно использует isBadTile[] а также isGoodTile[]: массивы с логическим значением, указывающим, имеет ли данная ячейка все ячейки, плохие и все ячейки, соответственно.

void handle_tile(int x, int y, int z, int nx, int ny, int nz) {
    int index0 = x + y * numCellsX + z * numCellsXY;
    // skipping a tile with all cells bad.
    if (isBadTile[index0] && nx == 4 && ny == 4 && nz == 4)
        return;
    // handling a 4x4x4 tile with all cells OK.
    if (isGoodTile[index0] && nx == 4 && ny == 4 && nz == 4) {
        for (int iz = 0; iz < 4; iz++) {
            for (int iy = 0; iy < 4; iy++) {
                for (int ix = 0; ix < 4; ix++) {
                    int index = index0 + ix + iy * numCellsX + iz + numCellsXY;
                    // Do stencil computation using `index`
                }
            }
        }
    } else {
        for (int iz = 0; iz < nz; iz++) {
            for (int iy = 0; iy < ny; iy++) {
                for (int ix = 0; ix < nx; ix++) {
                    int index = index0 + ix + iy * numCellsX + iz + numCellsXY;
                    if (!isBadCell[index] {
                        // Do stencil computation using `index`
                }
            }
        }
    }
}

void handle_cells() {
    int x, y, z;
    for (z = 1; z <= numZ; z += 4) {
        int nz = min(numZ + 1 - z, 4);
        for (y = 1; y <= numY; y += 4) {
            int ny = min(numY + 1 - y, 4);
            for (x = 1; x <= numX; x += 4) {
                int nx = min(numX + 1 - x, 4);
                handle_tile(x, y, z, nx, ny, nz);
            }
        }
    }
}

Вот функция для вычисления isGoodTile[] массив. Только правильно рассчитанные смещения соответствуют значениям x, кратным 4 + 1, y и z меньше 3 от их максимальных значений.

Эта реализация является неоптимальной, поскольку может быть вычислено меньше элементов. Неполные граничные плитки (менее 4 от края) могут быть помечены как плохие, чтобы пропустить хороший случай с одним случаем. Тест на плохие плитки может работать для этих краевых плиток, если isBadTile массив был правильно вычислен для краевых плиток, что в настоящее время не так.

void computeGoodTiles() {
    int start = 1 + numCellsX + numCellsXY;
    int stop = numCellsXY * numCellsZ - 1 - numCellsX - numCellsXY;

    memset(isGoodTile, 0, sizeof(*isGoodTile) * numCellsXY * numCellsZ);
    for (int i = start; i < stop; i += 4) {
        isGoodTile[i] = (isBadCell[i + 0] | isBadCell[i + 1] |
                         isBadCell[i + 2] | isBadCell[i + 3]) ^ 1;
    }
    for (int i = start; i < stop - 3 * numCellsX; i += 4) {
        isGoodTile[i] = isGoodTile[i + 0 * numCellsX] &
                        isGoodTile[i + 1 * numCellsX] &
                        isGoodTile[i + 2 * numCellsX] &
                        isGoodTile[i + 3 * numCellsX];
    }
    for (int i = start; i < stop - 3 * numCellsXY; i += 4) {
        isGoodTile[i] = isGoodTile[i + 0 * numCellsXY] &
                        isGoodTile[i + 1 * numCellsXY] &
                        isGoodTile[i + 2 * numCellsXY] &
                        isGoodTile[i + 3 * numCellsXY];
    }
}

void computeBadTiles() {
    int start = 1 + numCellsX + numCellsXY;
    int stop = numCellsXY * numCellsZ - 1 - numCellsX - numCellsXY;

    memset(isBadTile, 0, sizeof(*isBadTile) * numCellsXY * numCellsZ);
    for (int i = start; i < stop; i += 4) {
        isBadTile[i] = isBadCell[i + 0] & isBadCell[i + 1] &
                       isBadCell[i + 2] & isBadCell[i + 3];
    }
    for (int i = start; i < stop - 3 * numCellsX; i += 4) {
        isBadTile[i] = isBadTile[i + 0 * numCellsX] &
                       isBadTile[i + 1 * numCellsX] &
                       isBadTile[i + 2 * numCellsX] &
                       isBadTile[i + 3 * numCellsX];
    }
    for (int i = start; i < stop - 3 * numCellsXY; i += 4) {
        isBadTile[i] = isBadTile[i + 0 * numCellsXY] &
                       isBadTile[i + 1 * numCellsXY] &
                       isBadTile[i + 2 * numCellsXY] &
                       isBadTile[i + 3 * numCellsXY];
    }
}

Хотя ОП требует подхода с использованием блокировки, я бы посоветовал против этого.

Видите ли, каждая последовательная последовательность ячеек (ячейки 1D вдоль оси X) уже является таким блоком. Вместо того, чтобы упростить задачу, блокировка просто заменяет исходную проблему меньшими копиями фиксированного размера, повторяющимися снова и снова.

Проще говоря, блокировка не помогает с реальной проблемой под рукой. Это не должно быть обязательной особенностью решения вообще.

Вместо этого я бы предложил полностью избежать основной проблемы - просто по-другому.

Вы видите, вместо того, чтобы иметь флаг "плохая ячейка" для каждой ячейки, которую вы хотите проверить (один раз для каждой ячейки, не меньше), вы можете сохранить (отсортированный) список индексов плохих ячеек. Затем вы можете обработать весь набор данных за один раз, а затем выполнить цикл исправления для ячеек, перечисленных в списке индексов плохих ячеек.

Также обратите внимание, что если вы не работаете с копией значений ячеек, порядок, в котором вы будете вычислять новые значения ячеек, будет влиять на результат. Это почти наверняка не то, что вы хотите.

Итак, вот мое предложение:

#include <stdlib.h>
#include <errno.h>

typedef struct {
    /* Core cells in the state, excludes border cells */
    size_t   xsize;
    size_t   ysize;
    size_t   zsize;

    /* Index calculation: x + y * ystride + z * zstride */
    /* x is always linear in memory; xstride = 1 */
    size_t   ystride; /* = xsize + 2 */
    size_t   zstride; /* = ystride * (ysize + 2) */

    /* Cell data, points to cell (0,0,0) */
    double  *current;
    double  *previous;

    /* Bad cells */
    size_t   fixup_cells;  /* Number of bad cells */
    size_t  *fixup_index;  /* Array of bad cells' indexes */

    /* Dynamically allocated memory */
    void    *mem[3];
} lattice;

void lattice_free(lattice *const ref)
{
    if (ref) {
        /* Free dynamically allocated memory, */
        free(ref->mem[0]);
        free(ref->mem[1]);
        free(ref->mem[2]);
        /* then initialize/poison the contents. */
        ref->xsize = 0;
        ref->ysize = 0;
        ref->zsize = 0;
        ref->ystride = 0;
        ref->zstride = 0;
        ref->previous = NULL;
        ref->current = NULL;
        ref->fixup_cells = 0;
        ref->fixup_index = NULL;
        ref->mem[0] = NULL;
        ref->mem[1] = NULL;
        ref->mem[2] = NULL;
    }
}


int lattice_init(lattice *const ref, const size_t xsize, const size_t ysize, const size_t zsize)
{
    const size_t  xtotal = xsize + 2;
    const size_t  ytotal = ysize + 2;
    const size_t  ztotal = zsize + 2;
    const size_t  ntotal = xtotal * ytotal * ztotal;
    const size_t  double_bytes = ntotal * sizeof (double);
    const size_t  size_bytes = xsize * ysize * zsize * sizeof (size_t);

    /* NULL reference to the variable to initialize? */
    if (!ref)
        return EINVAL;

    /* Initialize/poison the lattice variable. */
    ref->xsize = 0;
    ref->ysize = 0;
    ref->zsize = 0;
    ref->ystride = 0;
    ref->zstride = 0;
    ref->previous = NULL;
    ref->current = NULL;
    ref->fixup_cells = 0;
    ref->fixup_index = NULL;
    ref->mem[0] = NULL;
    ref->mem[1] = NULL;
    ref->mem[2] = NULL;

    /* Verify size is nonzero */
    if (xsize < 1 || ysize < 1 || zsize < 1)
        return EINVAL;        

    /* Verify size is not too large */
    if (xtotal <= xsize || ytotal <= ysize || ztotal <= zsize ||
        ntotal / xtotal / ytotal != ztotal ||
        ntotal / xtotal / ztotal != ytotal ||
        ntotal / ytotal / ztotal != xtotal ||
        double_bytes / ntotal != sizeof (double) ||
        size_bytes / ntotal != sizeof (size_t))
        return ENOMEM;

    /* Allocate the dynamic memory needed. */
    ref->mem[0] = malloc(double_bytes);
    ref->mem[1] = malloc(double_bytes);
    ref->mem[2] = malloc(size_bytes);
    if (!ref->mem[0] || !ref->mem[1] || !ref->mem[2]) {
        free(ref->mem[2]);
        ref->mem[2] = NULL;
        free(ref->mem[1]);
        ref->mem[1] = NULL;
        free(ref->mem[0]);
        ref->mem[0] = NULL;
        return ENOMEM;
    }

    ref->xsize = xsize;
    ref->ysize = ysize;
    ref->zsize = zsize;

    ref->ystride = xtotal;
    ref->zstride = xtotal * ytotal;

    ref->current = (double *)ref->mem[0] + 1 + xtotal;
    ref->previous = (double *)ref->mem[1] + 1 + xtotal;

    ref->fixup_cells = 0;
    ref->fixup_index = (size_t *)ref->mem[2];

    return 0;
}

Обратите внимание, что я предпочитаю x + ystride * y + zstride * z Форма расчета индекса над x + xtotal * (y + ytotal * z)потому что два умножения в первом могут выполняться параллельно (в суперскалярном конвейере, на архитектурах, которые могут одновременно выполнять два несвязанных умножения на одном ядре ЦП), тогда как во втором умножения должны быть последовательными,

Обратите внимание, что ref->current[-1 - ystride - zstride] относится к текущему значению ячейки в ячейке (-1, -1, -1), т.е. к диагонали граничной ячейки от исходной ячейки (0, 0, 0). Другими словами, если у вас есть ячейка (x, y, z) по индексу i, затем
i-1 это ячейка в (х-1, у, г)
i+1 это ячейка в (x+1, y, z)
i-ystride является ячейкой в ​​(х, у-1, г)
i+ystride это ячейка в точке (x, y+1, z)
i-zstride это ячейка в (х, у, г-1)
i+zstride это ячейка в (х, у, г-1)
i-ystride является ячейкой в ​​(х, у-1, г)
i-1-ystride-zstride является ячейкой в ​​(х-1, у-1, z-1)
i+1+ystride+zstride это ячейка в (x+1, y+1, z+1)
и так далее.

ref->fixup_index массив достаточно велик, чтобы перечислить все ячейки, кроме граничных. Хорошей идеей будет сохранять сортировку (или сортировку после сборки), потому что это помогает с локальностью кэша.

Если ваша решетка имеет периодические граничные условия, вы можете использовать шесть двумерных циклов, двенадцать одномерных циклов и восемь копий, чтобы скопировать первую и последнюю действительные ячейки на границу перед началом нового обновления.

Ваш цикл обновления, следовательно, по сути:

  1. Вычислить или заполнить границы в ->current,

  2. Своп ->current а также ->previous,

  3. Вычислить все ячейки для ->current используя данные из ->previous,

  4. Зацикливаться на ->fixup_cells индексы в ->fixup_indexи пересчитать соответствующий ->current клетки.

Обратите внимание, что на шаге 3 вы можете сделать это линейно для всех индексов между 0 а также xsize-1 + (ysize-1)*ystride + (zsize-1)*zstrideвключительно; в том числе около 67% пограничных ячеек. Их относительно немного по сравнению со всем томом, и наличие одного линейного цикла, вероятно, быстрее, чем пропуск через граничные ячейки, особенно если вы можете векторизовать вычисления. (Что в данном случае нетривиально.)

Вы даже можете разделить работу на несколько потоков, дав каждому потоку непрерывный набор индексов для работы. Потому что вы читаете из ->previous и написать ->currentпотоки не будут растоптывать друг друга, хотя может быть некоторый пинг-понг в кешировании, если поток достигает конца своей области, в то время как другой находится в начале своей области; из-за того, как данные ориентированы (а размер строк кэша составляет всего несколько (обычно 2, 4 или 8) ячеек), этот пинг-понг не должен быть проблемой на практике. (Очевидно, что замки не нужны.)

Эта конкретная проблема не является чем-то новым. Моделирование игры жизни Конвея или модели Изинга с квадратной или кубической решеткой, а также реализация многих других решеточных моделей сопряжены с той же проблемой (но часто с булевыми данными, а не с двойными, и без "плохих ячеек").

Я думаю, что вы можете вложить пару одинаковых наборов петель. Что-то вроде этого:

for(int z = 1; z < numZ+1; z+=4) {
    for(int y = 1; y < numY+1; y+=4) {
        for(int x = 1; x < numX+1; x+=4) {
            if(!isBadBlock[ getBlockIndex(x>>2,y>>2,z>>2) ]) {
                for(int zz = z; zz < z + 4 && zz < numZ+1; zz++) {
                   for(int yy = y; yy < y + 4 && yy < numY+1; yy++) {
                      for(int xx = z; xx < x + 4 && xx < numX+1; xx++) {
                         if(!isBadCell[ getCellIndex(xx,yy,zz) ]) {
                             // Do stencil Computation
                            }
                        }
                    }
                }
            }
        }
    }
}

Как вы сейчас настроили, вы можете просто получить индекс, используя 3d массив следующим образом:

#include <sys/types.h>
#define numX 256
#define numY 128
#define numZ 64
//Note the use of powers of 2 - it will simplify things a lot

int cells[numX][numY][numZ];

size_t getindex(size_t x, size_t y,size_t z){
  return (int*)&cells[x][y][z]-(int*)&cells[0][0][0];
}

Это выложит клетки как:

[0,0,0][0,0,1][0,0,2]...[0,0,numZ-1]
[0,1,0][0,1,1][0,1,2]...[0,1,numZ-1]
...
[0,numY-1,0][0,numY-1,1]...[0,1,numZ-1]
...
[1,0,0][1,0,1][0,0,2]...[1,0,numZ-1]
[1,1,0][1,1,1][1,1,2]...[1,1,numZ-1]
...
[numX-1,numY-1,0][numX-1,numY-1,1]...[numX-1,numY-1,numZ-1]

So efficient loops would look like:

for(size_t x=0;x<numX;x++)
  for(size_t y=0;y<numY;y++)
    for(size_t z=0;z<numZ;z++)
      //vector operations on z values

Но если вы хотите разделить его на блоки 4x4x4, вы можете просто использовать трехмерный массив блоков 4x4x4, например:

#include <sys/types.h>
#define numX 256 
#define numY 128
#define numZ 64

typedef int block[4][4][4];
block blocks[numX][numY][numZ];
//add a compiler specific 64 byte alignment to  help with cache misses?

size_t getblockindex(size_t x, size_t y,size_t z){
  return (block *)&blocks[x][y][z]-(block *)&blocks[0][0][0];
}

Я переставил индексы на x,y,z, чтобы я мог держать их прямо в голове, но убедитесь, что вы упорядочиваете их так, чтобы последним был тот, с которым вы работаете в серии ваших внутренних циклов.

Другие вопросы по тегам