Оптимизировать без выравнивания SSE2/AVX2 XOR

В моем коде я должен обрабатывать "разоблачение" пакетов веб-сокетов, что по сути означает, что XOR не выравнивает данные произвольной длины. Благодаря SO ( разоблачение данных Websocket / многобайтовый xor) я уже выяснил, как (надеюсь) ускорить это с помощью расширений SSE2/AVX2, но, глядя на это сейчас, мне кажется, что моя обработка невыровненных данных совершенно неэффективна. -оптимальный. Есть ли способ оптимизировать мой код или, по крайней мере, сделать его проще с той же производительностью, или мой код уже работает лучше?

Вот важная часть кода (для вопроса, который я предполагаю, что данных всегда будет, по крайней мере, достаточно для запуска цикла AVX2 один раз, но в то же самое время он будет в основном выполняться только несколько раз):

// circular shift left for uint32
int cshiftl_u32(uint32_t num, uint8_t shift) {
   return (num << shift) | (num >> (32 - shift));                                                                       
}                                                                                                                     

// circular shift right for uint32
int cshiftr_u32(uint32_t num, uint8_t shift) {
   return (num >> shift) | (num << (32 - shift));                                                                       
}                                                                                                                     

void optimized_xor_32( uint32_t mask, uint8_t *ds, uint8_t *de ) {
   if (ds == de) return; // zero data len -> nothing to do

   uint8_t maskOffset = 0;

// process single bytes till 4 byte alignment ( <= 3 )
   for (; ds < de && ( (uint64_t)ds & (uint64_t)3 ); ds++) {
      *ds ^= *((uint8_t *)(&mask) + maskOffset);
      maskOffset = (maskOffset + 1) & (uint8_t)3;
   }

   if (ds == de) return; // done, return

   if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions
      mask = cshiftl_u32(mask, maskOffset);

      maskOffset = 0;
   }

// process 4 byte block till 8 byte alignment ( <= 1 )
   uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31));

   if ( ds < de32 && ( (uint64_t)de & (uint64_t)7 ) ) {
      *(uint32_t *)ds ^= mask; // mask is uint32_t

      if (++ds == de) return;
   }

// process 8 byte block till 16 byte alignment ( <= 1 )
   uint64_t mask64 = mask | (mask << 4);
   uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63));

   if ( ds < de64 && ( (uint64_t)ds & (uint64_t)15 ) ) {
      *(uint64_t *)ds ^= mask64;

      if (++ds == de) return; // done, return
   }


// process 16 byte block till 32 byte alignment ( <= 1) (if supported)
#ifdef CPU_SSE2 
   __m128i v128, v128_mask;
   v128_mask = _mm_set1_epi32(mask);

   uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127));

   if ( ds < de128 && ( (uint64_t)ds & (uint64_t)31 ) ) {
      v128 = _mm_load_si128((__m128i *)ds);
      v128 = _mm_xor_si128(v128, v128_mask);
      _mm_store_si128((__m128i *)ds, v128);

      if (++ds == de) return; // done, return
   }

#endif
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards)
   __m256i v256, v256_mask;
   v256_mask = _mm256_set1_epi32(mask);

   uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255));

   for (; ds < de256; ds+=32) {
      v256 = _mm256_load_si256((__m256i *)ds);
      v256 = _mm256_xor_si256(v256, v256_mask);
      _mm256_store_si256((__m256i *)ds, v256);
   }

   if (ds == de) return; // done, return
#endif
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported)
   for (; ds < de128; ds+=16) {
      v128 = _mm_load_si128((__m128i *)ds);
      v128 = _mm_xor_si128(v128, v128_mask);
      _mm_store_si128((__m128i *)ds, v128);
   }

   if (ds == de) return; // done, return

#endif
   // process remaining 8 byte blocks 
   // this should always be supported, so remaining can be assumed to be executed <= 1 times
   for (; ds < de64; ds += 8) {
      *(uint64_t *)ds ^= mask64;
   }

   if (ds == de) return; // done, return

   // process remaining 4 byte blocks ( <= 1)
   if (ds < de32) {
      *(uint32_t *)ds ^= mask;

      if (++ds == de) return; // done, return
   }


   // process remaining bytes ( <= 3)

   for (; ds < de; ds ++) {
      *ds ^= *((uint8_t *)(&mask) + maskOffset);
      maskOffset = (maskOffset + 1) & (uint8_t)3;
   }

}

PS: Пожалуйста, игнорируйте использование #ifdef вместо cpuid и т.п. для определения флага процессора.

1 ответ

В отличие от того, что говорится в руководстве, большинство процессоров Intel на самом деле довольно хорошо справляются с невыровненными данными. Поскольку вы используете встроенные компиляторы Intel для обработки векторов, я предполагаю, что у вас есть доступ к достаточно свежей версии icc,

Если вы не можете выровнять свои данные естественным образом, то я боюсь, что то, что вы делаете, максимально близко к максимальной производительности. С точки зрения того, чтобы сделать код более читабельным и развертываемым на Xeon Phi(64-байтных векторных регистрах)/ будущих более длинных векторных процессорах, я бы посоветовал вам начать использовать Intel Cilk Plus.

Пример:

void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) {
    while (length & 0x3) {
        *(d++) ^= mask;
        asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
        length--;
    }

    // switch to 4 bytes per block
    uint32_t _d = d;
    length >>= 2;

    // Intel Cilk Plus Array Notation
    // Should expand automatically to the best possible SIMD instructions
    // you are compiling for
    _d[0:length] ^= mask;
}

Обратите внимание, что я не тестировал этот код, поскольку у меня нет доступа к компилятору Intel прямо сейчас. Если у вас возникнут проблемы, тогда я смогу пройти через это, когда вернусь в свой офис на следующей неделе.

Если вы предпочитаете встроенные функции, то правильное использование макросов препроцессора может значительно облегчить вашу жизнь:

#if defined(__MIC__)
// intel Xeon Phi
#define VECTOR_BLOCKSIZE 64
// I do not remember the correct types/instructions right now
#error "TODO: MIC handling"
#elif defined(CPU_AVX2)
#define VECTOR_BLOCKSIZE 32
typedef __m256i my_vector_t;
#define VECTOR_LOAD_MASK _mm256_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask))
#elif defined(CPU_SSE2) 
#define VECTOR_BLOCKSIZE 16
typedef __m128i my_vector_t;
#define VECTOR_LOAD_MASK _mm128_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask))
#else
#define VECTOR_BLOCKSIZE 8
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask))
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask)
typedef uint64_t my_vector_t;
#fi

void optimized_xor_32( uint32_t mask, uint8_t *d, size_t length ) {
    size_t i;

    // there really is no point in having extra
    // branches for different vector lengths if they are
    // executed at most once
    // branch prediction is your friend here
    // so we do one byte at a time until the block size
    // is reached

    while (length && (d & (VECTOR_BLOCKSIZE - 1))) {
        *(d++) ^= mask;
        asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
        length--;
    }

    my_vector_t * d_vector = (my_vector_t *)d;
    my_vector_t vector_mask = VECTOR_LOAD_MASK(mask);

    size_t vector_legth = length / VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift
    length &= VECTOR_BLOCKSIZE -1; // remaining length

    for (i = 0; i < vector_legth; i++) {
      VECTOR_XOR(d_vector + i, vector_mask);
    }

    // process the tail
    d = (uint8_t*)(d_vector + i);
    for (i = 0; i < length; i++) {
      d[i] ^= mask;
      asm ("rold $8, %0" : "+g" (mask) :: "cc");
    }

}

Еще одно примечание: вы можете использовать команду вращения x86 вместо сдвигов битов для вращения mask:

#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc")
Другие вопросы по тегам