8

Profiling suggests that this function here is a real bottle neck for my application:

static inline int countEqualChars(const char* string1, const char* string2, int size) {
    int r = 0;
    for (int j = 0; j < size; ++j) {
        if (string1[j] == string2[j]) {
            ++r;
        }
    }

    return r;
}

Even with -O3 and -march=native, G++ 4.7.2 does not vectorize this function (I checked the assembler output). Now, I'm not an expert with SSE and friends, but I think that comparing more than one character at once should be faster. Any ideas on how to speed things up? Target architecture is x86-64.

Peter Cordes
  • 286,368
  • 41
  • 520
  • 731
Milan
  • 3,312
  • 3
  • 29
  • 40
  • What are the inputs typically like? What size, and are they variable or literal strings? Also, what's the reason for needing this function--what is its "deeper meaning" in your system? – John Zwinck Mar 24 '13 at 13:27
  • did you try using -msse, etc flags? and measuring the performance before and after the fact? See [another example](http://stackoverflow.com/questions/7919304/gcc-sse-code-optimization) – Petesh Mar 24 '13 at 13:27
  • I tried -msse and did not measure any difference in runtime. Both strings are guaranteed to have identical lengths. Sizes vary wildly though. – Milan Mar 24 '13 at 13:33
  • @Petesh: the OP used `-march=native`, which implies whatever `-mfoo` flags his CPU supports. –  Mar 24 '13 at 13:40
  • @Fanael That's what the doc says, but TBH I don't actually trust `-march=native` to do the right thing (this is from experience on older variants of gcc, this may not actually be the case now) – Petesh Mar 24 '13 at 13:43
  • By the way, you can also do this faster without SSE2 (even though all x86_64 CPUs support that), using SWAR techniques. The simplest form of this is skipping the byte-by-byte comparisons if two 64-bit words are equal. A more sophisticated form can create a 64-bit word where each byte is 1 if the corresponding bytes in the strings are equal and 0 if not; these words can be processed as with SSE2. – jilles Mar 24 '13 at 14:17
  • `Now, I'm not an expert with SSE and friends` -> You provide a clue already. My hint: Grab some introductory material ;) This question seems basically like a plea for code, but expressed rather finely. – Sebastian Mach Apr 05 '13 at 19:27
  • [How to count character occurrences using SIMD](//stackoverflow.com/q/54541129) has an AVX2 version (using a single character instead of a second string, but the vectorization is done identically). – Peter Cordes May 02 '19 at 20:47

3 Answers3

9

Of course it can.

pcmpeqb compares two vectors of 16 bytes and produces a vector with zeros where they differed, and -1 where they match. Use this to compare 16 bytes at a time, adding the result to an accumulator vector (make sure to accumulate the results of at most 255 vector compares to avoid overflow). When you're done, there are 16 results in the accumulator. Sum them and negate to get the number of equal elements.

If the lengths are very short, it will be hard to get a significant speedup from this approach. If the lengths are long, then it will be worth pursuing.

Stephen Canon
  • 100,816
  • 18
  • 175
  • 263
8

Compiler flags for vectorization:

-ftree-vectorize

-ftree-vectorize -march=<your_architecture> (Use all instruction-set extensions available on your computer, not just baseline like SSE2 for x86-64). Use -march=native to optimize for the machine the compiler is running on.) -march=<foo> also sets -mtune=<foo>, which is also a good thing.

Using SSEx intrinsics:

  • Padd and align the buffer to 16 bytes (according to the vector size you're actually going to use)

  • Create an accumlator countU8 with _mm_set1_epi8(0)

  • For all n/16 input (sub) vectors, do:

    • Load 16 chars from both strings with _mm_load_si128 or _mm_loadu_si128 (for unaligned loads)

    • _mm_cmpeq_epi8 compare the octets in parallel. Each match yields 0xFF (-1), 0x00 otherwise.

    • Substract the above result vector from countU8 using _mm_sub_epi8 (minus -1 -> +1)

    • Always after 255 cycles, the 16 8bit counters must be extracted into a larger integer type to prevent overflows. See unpack and horizontal add in this nice answer for how to do that: https://stackoverflow.com/a/10930706/1175253

Code:

#include <iostream>
#include <vector>

#include <cassert>
#include <cstdint>
#include <climits>
#include <cstring>

#include <emmintrin.h>

#ifdef __SSE2__

#if !defined(UINTPTR_MAX) ||  !defined(UINT64_MAX) ||  !defined(UINT32_MAX)
#  error "Limit macros are not defined"
#endif

#if UINTPTR_MAX == UINT64_MAX
    #define PTR_64
#elif UINTPTR_MAX == UINT32_MAX
    #define PTR_32
#else
#  error "Current UINTPTR_MAX is not supported"
#endif

template<typename T>
void print_vector(std::ostream& out,const __m128i& vec)
{
    static_assert(sizeof(vec) % sizeof(T) == 0,"Invalid element size");
    std::cout << '{';
    const T* const end   = reinterpret_cast<const T*>(&vec)-1;
    const T* const upper = end+(sizeof(vec)/sizeof(T));
    for(const T* elem = upper;
        elem != end;
        --elem
    )
    {
        if(elem != upper)
            std::cout << ',';
        std::cout << +(*elem);
    }
    std::cout << '}' << std::endl;
}

#define PRINT_VECTOR(_TYPE,_VEC) do{  std::cout << #_VEC << " : "; print_vector<_TYPE>(std::cout,_VEC);    } while(0)

///@note SSE2 required (macro: __SSE2__)
///@warning Not tested!
size_t counteq_epi8(const __m128i* a_in,const __m128i* b_in,size_t count)
{
    assert(a_in != nullptr && (uintptr_t(a_in) % 16) == 0);
    assert(b_in != nullptr && (uintptr_t(b_in) % 16) == 0);
    //assert(count > 0);


/*
    //maybe not so good with all that branching and additional loop variables

    __m128i accumulatorU8 = _mm_set1_epi8(0);
    __m128i sum2xU64 = _mm_set1_epi8(0);
    for(size_t i = 0;i < count;++i)
    {

        //this operation could also be unrolled, where multiple result registers would be accumulated
        accumulatorU8 = _mm_sub_epi8(accumulatorU8,_mm_cmpeq_epi8(*a_in++,*b_in++));
        if(i % 255 == 0)
        {
            //before overflow of uint8, the counter will be extracted
            __m128i sum2xU16 = _mm_sad_epu8(accumulatorU8,_mm_set1_epi8(0));
            sum2xU64 = _mm_add_epi64(sum2xU64,sum2xU16);

            //reset accumulatorU8
            accumulatorU8 = _mm_set1_epi8(0);
        }
    }

    //blindly accumulate remaining values
    __m128i sum2xU16 = _mm_sad_epu8(accumulatorU8,_mm_set1_epi8(0));
    sum2xU64 = _mm_add_epi64(sum2xU64,sum2xU16);

    //do a horizontal addition of the two counter values
    sum2xU64 = _mm_add_epi64(sum2xU64,_mm_srli_si128(sum2xU64,64/8));

#if defined PTR_64
    return _mm_cvtsi128_si64(sum2xU64);
#elif defined PTR_32
    return _mm_cvtsi128_si32(sum2xU64);
#else
#  error "macro PTR_(32|64) is not set"
#endif

*/

    __m128i sum2xU64 = _mm_set1_epi32(0);
    while(count--)
    {
        __m128i matches     = _mm_sub_epi8(_mm_set1_epi32(0),_mm_cmpeq_epi8(*a_in++,*b_in++));
        __m128i sum2xU16    = _mm_sad_epu8(matches,_mm_set1_epi32(0));
                sum2xU64    = _mm_add_epi64(sum2xU64,sum2xU16);
#ifndef NDEBUG
        PRINT_VECTOR(uint16_t,sum2xU64);
#endif
    }

    //do a horizontal addition of the two counter values
    sum2xU64 = _mm_add_epi64(sum2xU64,_mm_srli_si128(sum2xU64,64/8));
#ifndef NDEBUG
    std::cout << "----------------------------------------" << std::endl;
    PRINT_VECTOR(uint16_t,sum2xU64);
#endif

#if !defined(UINTPTR_MAX) ||  !defined(UINT64_MAX) ||  !defined(UINT32_MAX)
#  error "Limit macros are not defined"
#endif

#if defined PTR_64
    return _mm_cvtsi128_si64(sum2xU64);
#elif defined PTR_32
    return _mm_cvtsi128_si32(sum2xU64);
#else
#  error "macro PTR_(32|64) is not set"
#endif

}

#endif

int main(int argc, char* argv[])
{

    std::vector<__m128i> a(64); // * 16 bytes
    std::vector<__m128i> b(a.size());
    const size_t nBytes = a.size() * sizeof(std::vector<__m128i>::value_type);

    char* const a_out = reinterpret_cast<char*>(a.data());
    char* const b_out = reinterpret_cast<char*>(b.data());

    memset(a_out,0,nBytes);
    memset(b_out,0,nBytes);

    a_out[1023] = 1;
    b_out[1023] = 1;

    size_t equalBytes = counteq_epi8(a.data(),b.data(),a.size());

    std::cout << "equalBytes = " << equalBytes << std::endl;

    return 0;
}

The fastest SSE implementation I got for large and small arrays:

size_t counteq_epi8(const __m128i* a_in,const __m128i* b_in,size_t count)
{
    assert((count > 0 ? a_in != nullptr : true) && (uintptr_t(a_in) % sizeof(__m128i)) == 0);
    assert((count > 0 ? b_in != nullptr : true) && (uintptr_t(b_in) % sizeof(__m128i)) == 0);
    //assert(count > 0);

    const size_t maxInnerLoops    = 255;
    const size_t nNestedLoops     = count / maxInnerLoops;
    const size_t nRemainderLoops  = count % maxInnerLoops;

    const __m128i zero  = _mm_setzero_si128();
    __m128i sum16xU8    = zero;
    __m128i sum2xU64    = zero;

    for(size_t i = 0;i < nNestedLoops;++i)
    {
        for(size_t j = 0;j < maxInnerLoops;++j)
        {
            sum16xU8 = _mm_sub_epi8(sum16xU8,_mm_cmpeq_epi8(*a_in++,*b_in++));
        }
        sum2xU64 = _mm_add_epi64(sum2xU64,_mm_sad_epu8(sum16xU8,zero));
        sum16xU8 = zero;
    }

    for(size_t j = 0;j < nRemainderLoops;++j)
    {
        sum16xU8 = _mm_sub_epi8(sum16xU8,_mm_cmpeq_epi8(*a_in++,*b_in++));
    }
    sum2xU64 = _mm_add_epi64(sum2xU64,_mm_sad_epu8(sum16xU8,zero));

    sum2xU64 = _mm_add_epi64(sum2xU64,_mm_srli_si128(sum2xU64,64/8));

#if UINTPTR_MAX == UINT64_MAX
    return _mm_cvtsi128_si64(sum2xU64);
#elif UINTPTR_MAX == UINT32_MAX
    return _mm_cvtsi128_si32(sum2xU64);
#else
#  error "macro PTR_(32|64) is not set"
#endif
}
Peter Cordes
  • 286,368
  • 41
  • 520
  • 731
Sam
  • 7,688
  • 1
  • 21
  • 47
  • 4
    Instead of ANDing the result from pcmpeqb with a mask and then adding it to an accumulator, you can also subtract the result from the accumulator, saving an instruction in the loop. – jilles Mar 24 '13 at 14:08
  • Thank you very much, that is really helpful :) – Milan Mar 24 '13 at 14:30
  • 3
    You can do the horizontal add more efficiently using `psadbw` with zero followed by moving high-to-low 64 bits and an add. – Stephen Canon Mar 24 '13 at 14:38
  • Shall we write this loop ? – hdante Mar 24 '13 at 14:43
  • Just added the code I have so far. I tried to implement all suggested improvements. – Sam Mar 24 '13 at 16:44
  • 1
    My implementation: https://gist.github.com/hdante/5232848 It uses a 255 iteration inner loop. I liked the final inner loop, it's only 7 instructions: https://gist.github.com/hdante/5232856 – hdante Mar 24 '13 at 17:59
  • On the other hand, I wasn't able to see improvement over the auto vectorized version. I tested both implementations and they are 1% faster than gcc version. I'm afraid the loop is stalling on memory loads. simfoo, do you see any improvement ? – hdante Mar 24 '13 at 19:32
  • I added the fastest implementation I currently have. @hdante: `assert(equalBytes == nBytes);` fails with your implementation in my code (seems, that the remaining `size % something` loops are missing). Regarding memory loads: we may try to `_mm_prefetch()` some cache lines, although I believe that these have considerable effects on older Pentium (II/III) processors only. – Sam Mar 24 '13 at 20:03
  • Yes, I only wrote the loop for multiples of 255 vectors for simplicity. Anyway, the final improvement over the original scalar code seems to be minimal. – hdante Mar 24 '13 at 21:31
  • [How to count character occurrences using SIMD](//stackoverflow.com/q/54541129) has an AVX2 version using PSADBW outside the inner loop, and multiple accumulators so it can run faster than 1 vector per clock without bottlenecking on 1 cycle `psubb` latency (or 2 cycles on AMD Bulldozer-family). For a simple version that has SAD inside the inner loop, see [Fast counting the number of equal bytes between two arrays](//stackoverflow.com/a/37936432). You can SAD against a vector of `0x7f` and subtract a correction outside the loop, instead of needing `0-cmp()` to make it positive. – Peter Cordes May 02 '19 at 20:53
3

Auto-vectorization in current gcc is a matter of helping the compiler to understand that's easy to vectorize the code. In your case: it will understand the vectorization request if you remove the conditional and rewrite the code in a more imperative way:

    static inline int count(const char* string1, const char* string2, int size) {
            int r = 0;
            bool b;

            for (int j = 0; j < size; ++j) {
                    b = (string1[j] == string2[j]);
                    r += b;
            }

            return r;
    }

In this case:

movdqa  16(%rsp), %xmm1
movl    $.LC2, %esi
pxor    %xmm2, %xmm2
movzbl  416(%rsp), %edx
movdqa  .LC1(%rip), %xmm3
pcmpeqb 224(%rsp), %xmm1
cmpb    %dl, 208(%rsp)
movzbl  417(%rsp), %eax
movl    $1, %edi
pand    %xmm3, %xmm1
movdqa  %xmm1, %xmm5
sete    %dl
movdqa  %xmm1, %xmm4
movzbl  %dl, %edx
punpcklbw   %xmm2, %xmm5
punpckhbw   %xmm2, %xmm4
pxor    %xmm1, %xmm1
movdqa  %xmm5, %xmm6
movdqa  %xmm5, %xmm0
movdqa  %xmm4, %xmm5
punpcklwd   %xmm1, %xmm6

(etc.)

hdante
  • 7,106
  • 2
  • 28
  • 34
  • 1
    I had a look at the rest of the disassembly, and, well, let's just say there's room for improvement. – harold Mar 24 '13 at 13:59
  • Only about 5% faster with a large data set :( Thanks for the suggestion though – Milan Mar 24 '13 at 14:03
  • simfoo, hand write a vectorized code with Stephen Canon's suggestion, where you separately accumulate 256 values before reducing to a separate final sum. This will factor out part of the code out of the inner loop. – hdante Mar 24 '13 at 14:23
  • 1
    GCC's effort here is really pathetic. You might be able to get it to avoid all the widening conversions if you use an unsigned char inner accumulator, but at that point you might as well write some intrinsics. – Stephen Canon Mar 24 '13 at 14:25