Array intersections - part 3

In part1 we demonstrated that a very short C++ standard library approach could be a very poor way of finding an array intersection and went on to show the simple but reasonably efficient method mostly used.

in part2 we showed how the special case of intersecting a short array with a long one would be better done by other means.

Now we will try to go further, consider this piece of code.

The github is here

#include <cstdio>
#include <xmmintrin.h>
#include <stdint.h>
#include <cstring>

// bits set in the range of numbers 0 - 15
static uint32_t mask_count[16] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4};

int main(int args, char **argc)
{
uint32_t x[] = {0, 1, 3, 4};
uint32_t y[] = {1, 2, 3, 6};

// the functions that use this need it as a constant known at compile time
#define rotate _MM_SHUFFLE(0, 3, 2, 1)

// load the array into a 128 bit variable
__m128i a1 = _mm_loadu_si128((__m128i *)x);
__m128i a2 = _mm_loadu_si128((__m128i *)y);

// compare 4 32 bit values
__m128i match1 = _mm_cmpeq_epi32(a1, a2);

// rotate the 32 bit values within the 128 variable
a1 = _mm_shuffle_epi32(a1, rotate);
__m128i match2 = _mm_cmpeq_epi32(a1, a2);

a1 = _mm_shuffle_epi32(a1, rotate);
__m128i match3 = _mm_cmpeq_epi32(a1, a2);

a1 = _mm_shuffle_epi32(a1, rotate);
__m128i match4 = _mm_cmpeq_epi32(a1, a2);

__m128i cmp_mask = _mm_or_si128(_mm_or_si128(match1, match2), _mm_or_si128(match3, match4));

uint32_t mask = _mm_movemask_ps((__m128)cmp_mask);

printf("x{%d,%d,%d,%d} intersect y{%d,%d,%d,%d}\n", x[0], x[1], x[2], x[3], y[0], y[1], y[2], y[3]);
printf("mask %d%d%d%d\n",(mask&1)>0?1:0,(mask&2)>0?1:0,(mask&4)>0?1:0,(mask&8)>0?1:0);

if ((mask & 1)>0) printf("Found %d\n",y[0]);
if ((mask & 2)>0) printf("Found %d\n",y[1]);
if ((mask & 4)>0) printf("Found %d\n",y[2]);
if ((mask & 8)>0) printf("Found %d\n",y[3]);

// how many populated bits?
uint32_t matched = mask_count[mask];
printf("%d matches\n",matched);
}




It outputs:


x{0,1,3,4} intersect y{1,2,3,6}

mask 1010

Found 1

Found 3


We have used some very low level simd instructions to perform an intersection of arrays of 4 integers sort of all at once.


We do 4 compare operation rotating the numbers in one of the array each time until each number has occupied all 4 positions. The rotation looks a little magical using:

a1 = _mm_shuffle_epi32(a1, _MM_SHUFFLE(0, 3, 2, 1));

But with a bit of thinking we can see that it is saying move the last integer into first place, the first into second place, the second into third place and the first into fourth place. Nothing is lost it is a rotation.

The compare operation produces a mask (albeit 4 32 bit masks each time) that we can "or" together and convert back to a 4 bit mask telling us which elements matched.


To utilise this to intersect longer array, we are left with the puzzle that we need to be able to increment to the next elements to compare.


The array with the lowest maximum value will always need to move on 4, the other array according to the highest bit. e.g. if the mask value is 8 or above we can move on 4 as well.


So lets take what we have seen here and actually try and code a proper function.

 

 
#include <cstdio>
#include <xmmintrin.h>
#include <stdint.h>
// bits set in the range of numbers 0 - 15
static uint32_t mask_count[16] = {0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4};
// number of elements to move forward if we know the array positions
static uint32_t move_forward[16] = {0, 1, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4};


int simd_scan(uint32_t x[], int long_length, uint32_t y[], int short_length, uint32_t array3[])
{
uint32_t count = 0;
int posx = 0;
int posy = 0;
// trim lengths to be a multiple of 4 that won't overstep the end
int x_length = (long_length / 4) * 4 - 4;
int y_length = (short_length / 4) * 4 - 4;

// the functions that use this need it as a constant known at compile time
#define rotate _MM_SHUFFLE(0, 3, 2, 1)
while (posx < x_length && posy < y_length)
{
// load the array into a 128 bit variable
__m128i a1 = _mm_loadu_si128((__m128i *)&x[posx]);
__m128i a2 = _mm_loadu_si128((__m128i *)&y[posy]);

// compare 4 32 bit values
__m128i match1 = _mm_cmpeq_epi32(a1, a2);

// rotate the 32 bit values within the 128 variable
a1 = _mm_shuffle_epi32(a1, rotate);
__m128i match2 = _mm_cmpeq_epi32(a1, a2);

a1 = _mm_shuffle_epi32(a1, rotate);
__m128i match3 = _mm_cmpeq_epi32(a1, a2);

a1 = _mm_shuffle_epi32(a1, rotate);
__m128i match4 = _mm_cmpeq_epi32(a1, a2);

__m128i cmp_mask = _mm_or_si128(_mm_or_si128(match1, match2), _mm_or_si128(match3, match4));

uint32_t mask = _mm_movemask_ps((__m128)cmp_mask);
if ((mask & 1) > 0)
{
array3[count++] = y[posy];
}
if ((mask & 2) > 0)
{
array3[count++] = y[posy + 1];
}
if ((mask & 4) > 0)
{
array3[count++] = y[posy + 2];
}
if ((mask & 8) > 0)
{
array3[count++] = y[posy + 3];
}
// the array with the lowest end element can move forward 4
if (y[posy + 3] > x[posx + 3])
{
posx += 4;
// the y array elements were not shuffled
// so we know the position of the last match
posy += move_forward[mask];
}
else
{
posy += 4;
// we do notknow the position of the last matched element in the
// x array but it we had a certain number of matches
// me must be able to move forward by at least the count of
// the bits in the mask
posx += mask_count[mask];
}
}

// remainder

count += linear_scan(&x[posx], long_length - posx, &y[posy], short_length - posy, &array3[count]);
return count;
}
 
The good news is that this works. The program in the repository
simd_intersect.cpp
Benchmarks and checks values against a linear_scan.
The bads news is that compiled with optimisation it is about half the speed of the linear scan.
 
We will look into this in part 4

 




Comments

Popular posts from this blog

A reliable tuya authentication in node-red

node-red a full home automation example

Arduino boards - what I wish I'd known when starting