Оптимизировать без выравнивания SSE2/AVX2 XOR
В моем коде я должен обрабатывать "разоблачение" пакетов веб-сокетов, что по сути означает, что XOR не выравнивает данные произвольной длины. Благодаря SO ( разоблачение данных Websocket / многобайтовый xor) я уже выяснил, как (надеюсь) ускорить это с помощью расширений SSE2/AVX2, но, глядя на это сейчас, мне кажется, что моя обработка невыровненных данных совершенно неэффективна. -оптимальный. Есть ли способ оптимизировать мой код или, по крайней мере, сделать его проще с той же производительностью, или мой код уже работает лучше?
Вот важная часть кода (для вопроса, который я предполагаю, что данных всегда будет, по крайней мере, достаточно для запуска цикла AVX2 один раз, но в то же самое время он будет в основном выполняться только несколько раз):
// circular shift left for uint32
int cshiftl_u32(uint32_t num, uint8_t shift) {
return (num << shift) | (num >> (32 - shift));
}
// circular shift right for uint32
int cshiftr_u32(uint32_t num, uint8_t shift) {
return (num >> shift) | (num << (32 - shift));
}
void optimized_xor_32( uint32_t mask, uint8_t *ds, uint8_t *de ) {
if (ds == de) return; // zero data len -> nothing to do
uint8_t maskOffset = 0;
// process single bytes till 4 byte alignment ( <= 3 )
for (; ds < de && ( (uint64_t)ds & (uint64_t)3 ); ds++) {
*ds ^= *((uint8_t *)(&mask) + maskOffset);
maskOffset = (maskOffset + 1) & (uint8_t)3;
}
if (ds == de) return; // done, return
if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions
mask = cshiftl_u32(mask, maskOffset);
maskOffset = 0;
}
// process 4 byte block till 8 byte alignment ( <= 1 )
uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31));
if ( ds < de32 && ( (uint64_t)de & (uint64_t)7 ) ) {
*(uint32_t *)ds ^= mask; // mask is uint32_t
if (++ds == de) return;
}
// process 8 byte block till 16 byte alignment ( <= 1 )
uint64_t mask64 = mask | (mask << 4);
uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63));
if ( ds < de64 && ( (uint64_t)ds & (uint64_t)15 ) ) {
*(uint64_t *)ds ^= mask64;
if (++ds == de) return; // done, return
}
// process 16 byte block till 32 byte alignment ( <= 1) (if supported)
#ifdef CPU_SSE2
__m128i v128, v128_mask;
v128_mask = _mm_set1_epi32(mask);
uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127));
if ( ds < de128 && ( (uint64_t)ds & (uint64_t)31 ) ) {
v128 = _mm_load_si128((__m128i *)ds);
v128 = _mm_xor_si128(v128, v128_mask);
_mm_store_si128((__m128i *)ds, v128);
if (++ds == de) return; // done, return
}
#endif
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards)
__m256i v256, v256_mask;
v256_mask = _mm256_set1_epi32(mask);
uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255));
for (; ds < de256; ds+=32) {
v256 = _mm256_load_si256((__m256i *)ds);
v256 = _mm256_xor_si256(v256, v256_mask);
_mm256_store_si256((__m256i *)ds, v256);
}
if (ds == de) return; // done, return
#endif
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported)
for (; ds < de128; ds+=16) {
v128 = _mm_load_si128((__m128i *)ds);
v128 = _mm_xor_si128(v128, v128_mask);
_mm_store_si128((__m128i *)ds, v128);
}
if (ds == de) return; // done, return
#endif
// process remaining 8 byte blocks
// this should always be supported, so remaining can be assumed to be executed <= 1 times
for (; ds < de64; ds += 8) {
*(uint64_t *)ds ^= mask64;
}
if (ds == de) return; // done, return
// process remaining 4 byte blocks ( <= 1)
if (ds < de32) {
*(uint32_t *)ds ^= mask;
if (++ds == de) return; // done, return
}
// process remaining bytes ( <= 3)
for (; ds < de; ds ++) {
*ds ^= *((uint8_t *)(&mask) + maskOffset);
maskOffset = (maskOffset + 1) & (uint8_t)3;
}
}
PS: Пожалуйста, игнорируйте использование #ifdef вместо cpuid и т.п. для определения флага процессора.
1 ответ
В отличие от того, что говорится в руководстве, большинство процессоров Intel на самом деле довольно хорошо справляются с невыровненными данными. Поскольку вы используете встроенные компиляторы Intel для обработки векторов, я предполагаю, что у вас есть доступ к достаточно свежей версии icc
,
Если вы не можете выровнять свои данные естественным образом, то я боюсь, что то, что вы делаете, максимально близко к максимальной производительности. С точки зрения того, чтобы сделать код более читабельным и развертываемым на Xeon Phi(64-байтных векторных регистрах)/ будущих более длинных векторных процессорах, я бы посоветовал вам начать использовать Intel Cilk Plus.
Пример:
void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) {
while (length & 0x3) {
*(d++) ^= mask;
asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
length--;
}
// switch to 4 bytes per block
uint32_t _d = d;
length >>= 2;
// Intel Cilk Plus Array Notation
// Should expand automatically to the best possible SIMD instructions
// you are compiling for
_d[0:length] ^= mask;
}
Обратите внимание, что я не тестировал этот код, поскольку у меня нет доступа к компилятору Intel прямо сейчас. Если у вас возникнут проблемы, тогда я смогу пройти через это, когда вернусь в свой офис на следующей неделе.
Если вы предпочитаете встроенные функции, то правильное использование макросов препроцессора может значительно облегчить вашу жизнь:
#if defined(__MIC__)
// intel Xeon Phi
#define VECTOR_BLOCKSIZE 64
// I do not remember the correct types/instructions right now
#error "TODO: MIC handling"
#elif defined(CPU_AVX2)
#define VECTOR_BLOCKSIZE 32
typedef __m256i my_vector_t;
#define VECTOR_LOAD_MASK _mm256_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask))
#elif defined(CPU_SSE2)
#define VECTOR_BLOCKSIZE 16
typedef __m128i my_vector_t;
#define VECTOR_LOAD_MASK _mm128_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask))
#else
#define VECTOR_BLOCKSIZE 8
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask))
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask)
typedef uint64_t my_vector_t;
#fi
void optimized_xor_32( uint32_t mask, uint8_t *d, size_t length ) {
size_t i;
// there really is no point in having extra
// branches for different vector lengths if they are
// executed at most once
// branch prediction is your friend here
// so we do one byte at a time until the block size
// is reached
while (length && (d & (VECTOR_BLOCKSIZE - 1))) {
*(d++) ^= mask;
asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
length--;
}
my_vector_t * d_vector = (my_vector_t *)d;
my_vector_t vector_mask = VECTOR_LOAD_MASK(mask);
size_t vector_legth = length / VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift
length &= VECTOR_BLOCKSIZE -1; // remaining length
for (i = 0; i < vector_legth; i++) {
VECTOR_XOR(d_vector + i, vector_mask);
}
// process the tail
d = (uint8_t*)(d_vector + i);
for (i = 0; i < length; i++) {
d[i] ^= mask;
asm ("rold $8, %0" : "+g" (mask) :: "cc");
}
}
Еще одно примечание: вы можете использовать команду вращения x86 вместо сдвигов битов для вращения mask
:
#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc")