我有这样一种编码方法,它的工作原理是将16 x int64_t
的小块编码为16个标志半字节的小块,这些标志半字节被打包成8个字节,然后为每个输入int64_t
提供1个或更多字节的有效载荷:
- 使用半字节(4位)表示
flag
- 为
flag
保留1个字节(1个字节可以存储2个值的标志) - 将
sign(value)
存储在标志中(半字节中的高位) - 将
abs(value)
存储在缓冲区中,字节数可变 - 在标志中存储所需的字节数。(半字节中的3个低位)
这个示例基准是简化的。我已经用实际数据测试了和:它比LEB 128(又名varint)和lz 4更快,压缩效果也更好。所以让我们专注于这种编码方法或类似的方法。
如何改进这段代码?我主要关心解压缩速度。如果可以提高速度,可以对编码/解码格式(或其他格式)进行一些更改。我想知道在这种情况下是否可以使用SIMD。
特别说明:
- 大多数
abs(value)
的长度<= 2字节。 - 99%的情况下,4个连续的
abs(value)
可以用<= 7字节表示。同样,99%的情况下,16个连续的abs(value)
可以用<= 31字节表示。 - 两个重要的函数是
encode
和decode_original
// ##### from @aqrit's answer, tweaked by @petercordes #######
#include <cstddef> // size_t
#include <cstdint> // uint64_t, int64_t
#include <immintrin.h> // lzcnt intrinsic
#include <cstring> // memcpy
unsigned char* encode_buf_bmi(size_t n, const int64_t* src, unsigned char* dst) {
unsigned char* key_ptr = dst;
size_t num_chunks = n >> 3;
unsigned char* data_ptr = &dst[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
while (num_chunks--) {
uint64_t key = 0;
for (size_t i = 0; i < 8; i++) {
uint64_t v = (uint64_t)*src++;
memcpy(data_ptr, &v, sizeof(v)); // assumes little endian.
v = (v + v) ^ v; // clear redundant sign bits (and trash everything else)
v = (v << 8) | v; // combine lengths of 7 and 8
v = (_lzcnt_u64(v | 1) ^ 63) >> 3; // use BSR on intel...
data_ptr += v + ((v + 1) >> 3); // key of 7 is length of 8
key |= v << (i * 3);
}
memcpy(key_ptr, &key, 3); // assumes little endian. Probably faster with overlapping 4-byte stores, except for the last chunk to avoid stepping on data
key_ptr += 3;
}
// TODO: tail loop...
return data_ptr; // end of used part of buffer
}
void decode_buf_bmi(size_t n, const unsigned char* src, int64_t* dst) {
const unsigned char* key_ptr = src;
size_t num_chunks = n >> 3;
const unsigned char* data_ptr = &src[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
if (n >= 19) { // if has at least 8 bytes of padding before data_ptr
data_ptr -= sizeof(uint64_t); // left aligned the truncated val in register (little endian)
while (num_chunks--) {
unsigned keys;
//memcpy(&keys, key_ptr, 3); // big slowdown with GCC13.2: like 258 ms per buffer on Skylake 3.9GHz, n_msg = 100'000'000
memcpy(&keys, key_ptr, 4); // 138 ms per buffer with 4-byte loads. (Clang 16: 138 ms per buffer either way.)
key_ptr += 3;
for (size_t i = 0; i < 8; i++) {
uint64_t v;
size_t k = keys & 0x07;
size_t len = k + ((k + 1) >> 3); // k==7 is len=8
uint64_t mask = (uint64_t)0 - ((k + 7) >> 3); // k ? -1 : 0
size_t shift = (64 - (len * 8)) & (size_t)mask;
keys >>= 3;
data_ptr += len;
memcpy(&v, data_ptr, sizeof(v));
v &= mask;
*dst++ = (int64_t)v >> shift;
}
}
data_ptr += sizeof(uint64_t);
}
// TODO: tail loop...
}
// ############## From @Soonts' answer ##############
#include <immintrin.h>
#include <stdint.h>
// Exclusive prefix sum of unsigned bytes
// When the sum of all bytes exceeds 0xFF, the output is garbage
// Which is fine here because our bytes are in [0..8] interval
inline __m128i exclusivePrefixSum( const __m128i src, size_t& sum )
{
__m128i v = src;
// https://en.wikipedia.org/wiki/Prefix_sum#/media/File:Hillis-Steele_Prefix_Sum.svg
v = _mm_add_epi8( v, _mm_slli_si128( v, 1 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 2 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 4 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 8 ) );
// So far we have computed inclusive prefix sum
// Keep the last byte which is total sum of bytes in the vector
uint16_t tmp = _mm_extract_epi16( v, 7 );
tmp >>= 8;
sum = tmp;
// Subtract original numbers to make exclusive sum = byte offsets to load
return _mm_sub_epi8( v, src );
}
// Load 4 uint64 numbers from ( rsi + offsets[ i ] ), keep the lowest bits[ i ] bits in the numbers
inline __m256i loadAndMask( const uint8_t* rsi, uint32_t offsets, __m128i bits )
{
// Load 4 uint64_t numbers from the correct locations, without AVX2 gathers
const int64_t* s0 = (const int64_t*)( rsi + (uint8_t)offsets );
const int64_t* s1 = (const int64_t*)( rsi + (uint8_t)( offsets >> 8 ) );
const int64_t* s2 = (const int64_t*)( rsi + (uint8_t)( offsets >> 16 ) );
const int64_t* s3 = (const int64_t*)( rsi + ( offsets >> 24 ) );
__m256i r = _mm256_setr_epi64x( *s0, *s1, *s2, *s3 );
// Mask away the higher pieces in the loaded numbers
__m256i shift = _mm256_cvtepu8_epi64( bits );
const __m256i ones = _mm256_set1_epi32( -1 );
__m256i mask = _mm256_sllv_epi64( ones, shift );
return _mm256_andnot_si256( mask, r );
}
inline __m256i applySigns( __m256i v, __m128i signs )
{
// Sign extend the masks from bytes to int64
__m256i mask = _mm256_cvtepi8_epi64( signs );
// Conditionally negate the numbers
__m256i neg = _mm256_sub_epi64( _mm256_setzero_si256(), v );
return _mm256_blendv_epi8( v, neg, mask );
}
class BlockDecoder
{
// Load offsets, in bytes
__m128i offsetBytes;
// Payload bits per element, the bytes are in [ 0 .. 64 ] interval
__m128i payloadBits;
// 16 bytes with the 0x80 bit set when the corresponding input was negative; the rest of the bits are unused
__m128i signs;
// Count of payload bytes in the complete block
size_t payloadBytes;
// Decode the block header
inline void loadHeader( const uint8_t* rsi )
{
// Load 8 bytes, and zero extend them into uint16_t
const __m128i v = _mm_cvtepu8_epi16( _mm_loadu_si64( rsi ) );
// Unpack lengths
const __m128i seven = _mm_set1_epi8( 7 );
const __m128i l4 = _mm_slli_epi16( v, 4 );
__m128i lengths = _mm_or_si128( v, l4 );
lengths = _mm_and_si128( lengths, seven );
// Transform 7 into 8
__m128i tmp = _mm_cmpeq_epi8( lengths, seven );
lengths = _mm_sub_epi8( lengths, tmp );
// Byte offsets to load 16 numbers, and count of payload bytes in the complete block
offsetBytes = exclusivePrefixSum( lengths, payloadBytes );
// Count of payload bits in the loaded numbers, lengths * 8
payloadBits = _mm_slli_epi16( lengths, 3 );
// Signs vector, we only use the highest 0x80 bit in these bytes
signs = _mm_or_si128( _mm_slli_epi16( v, 8 ), l4 );
}
// Decode and store 4 numbers
template<int slice>
inline void decodeSlice( int64_t* rdi, const uint8_t* payload ) const
{
uint32_t off;
__m128i bits, s;
if constexpr( slice != 0 )
{
off = (uint32_t)_mm_extract_epi32( offsetBytes, slice );
constexpr int imm = _MM_SHUFFLE( slice, slice, slice, slice );
bits = _mm_shuffle_epi32( payloadBits, imm );
s = _mm_shuffle_epi32( signs, imm );
}
else
{
off = (uint32_t)_mm_cvtsi128_si32( offsetBytes );
// For the first slice of the block, the 4 lowest bytes are in the correct locations already
bits = payloadBits;
s = signs;
}
__m256i v = loadAndMask( payload, off, bits );
v = applySigns( v, s );
_mm256_storeu_si256( ( __m256i* )rdi, v );
}
public:
// Decode and store a block of 16 numbers, return pointer to the next block
const uint8_t* decode( int64_t* rdi, const uint8_t* rsi )
{
loadHeader( rsi );
rsi += 8;
decodeSlice<0>( rdi, rsi );
decodeSlice<1>( rdi + 4, rsi );
decodeSlice<2>( rdi + 8, rsi );
decodeSlice<3>( rdi + 12, rsi );
return rsi + payloadBytes;
}
};
// ############ reference version and updated benchmark framework ###########
#include <iostream>
#include <string>
#include <cstring>
#include <algorithm>
#include <vector>
#include <chrono>
#include <iomanip>
using namespace std;
// https://en.wikipedia.org/wiki/Xorshift IIRC, wikipedia used to have different constants for some PRNG, maybe some of these, vs. the upstream site; I just copied wikipedia
#include <stdint.h>
uint64_t rol64(uint64_t x, int k)
{
return (x << k) | (x >> (64 - k));
}
struct xoshiro256ss_state {
uint64_t s[4];
};
static xoshiro256ss_state rng_state; // init by main
uint64_t xoshiro256ss(struct xoshiro256ss_state *state)
{
uint64_t *s = state->s;
uint64_t const result = rol64(s[1] * 5, 7) * 9;
uint64_t const t = s[1] << 17;
s[2] ^= s[0];
s[3] ^= s[1];
s[1] ^= s[2];
s[0] ^= s[3];
s[2] ^= t;
s[3] = rol64(s[3], 45);
return result;
}
namespace {
class MyTimer {
std::chrono::time_point<std::chrono::system_clock> start;
public:
void startCounter() {
start = std::chrono::system_clock::now();
}
int64_t getCounterNs() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count();
}
int64_t getCounterMs() {
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count();
}
double getCounterMsPrecise() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count()
/ 1000000.0;
}
};
}
template <typename T>
void leb128_encode(uint8_t* data, T value, size_t &idx)
{
bool more = true;
bool negative = value < 0;
constexpr int size = sizeof(value) * 8;
while (more) {
uint8_t byte = value & 0x7F;
value >>= 7;
if (negative) value |= (~0LL << (size - 7));
if ((value == 0 && (byte & 0x40) == 0) || (value == -1 && (byte & 0x40) != 0)) {
more = false;
} else {
byte |= 0x80;
}
data[idx++] = byte;
}
}
template <typename T>
void leb128_decode(uint8_t* data, T &value, size_t &idx)
{
value = 0;
int shift = 0;
constexpr int size = sizeof(value) * 8;
uint8_t byte;
do {
byte = data[idx++];
value |= int64_t(byte & 0x7F) << shift;
shift += 7;
} while (byte & 0x80);
if (shift < size && (byte & 0x40)) {
value |= (~0LL << shift);
}
}
int64_t n_msg = 100'000'000;
constexpr int BLK_SIZE = 16;
uint8_t* buffer;
int64_t *original;
int64_t *decoded;
int64_t *dummy_array;
// encode 1 value using variable length encoding,
// storing the result in a flag
void encode(int64_t value, uint8_t &flag, size_t &idx)
{
bool negative = value < 0;
value = abs(value);
uint8_t byte_cnt = 0;
while (value > 0) {
buffer[idx] = value & 0xFF; // lowest byte
idx++;
value >>= 8;
byte_cnt++;
}
// special cases. Since we only have 3 bits to represent 9 values (0->8),
// we choose to group case 7-8 together because they're rarest.
if (byte_cnt == 7) {
buffer[idx] = 0;
idx++;
} else if (byte_cnt == 8) {
// since we only have 3 bits, we use 7 to represent 8 bytes
byte_cnt = 7;
}
flag = byte_cnt; // bit 0-2 represent length
if (negative) flag |= 0b1000; // bit 3 represent sign (1 means negative)
}
// returns compression ratio
double __attribute__ ((noinline)) encode_all(int type = 0)
{
if (type == 0) {
// Soonts version uses the same format as this reference version
size_t idx = 0;
// encode in blocks of 16 values
for (size_t i = 0; i < n_msg; i += 16) {
size_t flag_idx = idx;
idx += 8; // first 8 bytes of a block are for sign/length flags
for (int t = 0; t < BLK_SIZE; t += 2) {
uint8_t low_flag, high_flag;
encode(original[i + t], low_flag, idx);
encode(original[i + t + 1], high_flag, idx);
buffer[flag_idx + t / 2] = (high_flag << 4) | low_flag;
}
}
return (double(idx) / (n_msg * 8));
} else if (type == 1) {
const unsigned char *end = encode_buf_bmi(n_msg, original, buffer);
return (double(end - buffer) / (n_msg * 8));
} else if (type == 2) {
size_t idx = 0;
for (int i = 0; i < n_msg; i++) leb128_encode(buffer, original[i], idx);
return (double(idx) / (n_msg * 8));
}
cout << "encode_all unknown type " << type << std::endl;
exit(1);
}
template <typename T>
void extract_flag(uint8_t flag, T &low_sign, T &low_length, T &high_sign, T &high_length)
{
low_sign = flag & 0b1000;
low_length = flag & 0b0111;
high_sign = (flag >> 4) & 0b1000;
high_length = (flag >> 4) & 0b0111;
}
void __attribute__ ((noinline)) decode_original()
{
static constexpr int64_t mult[] = {
1, 1LL << 8, 1LL << 16, 1LL << 24, 1LL << 32, 1LL << 40, 1LL << 48, 1LL << 56
};
size_t idx = 0, num_idx = 0;
for (size_t i = 0; i < n_msg; i += 16) {
// first 8 bytes of a block are flags
int signs[BLK_SIZE], lens[BLK_SIZE];
for (int t = 0; t < BLK_SIZE; t += 2) {
extract_flag(buffer[idx], signs[t], lens[t], signs[t + 1], lens[t + 1]);
idx++;
}
for (int t = 0; t < BLK_SIZE; t++) if (lens[t] == 7) lens[t] = 8; // special case
// decode BLK_SIZE values
for (int t = 0; t < BLK_SIZE; t++) {
int64_t value = 0;
for (int i = 0; i < lens[t]; i++) value += mult[i] * buffer[idx + i];
if (signs[t]) value = -value;
decoded[num_idx + t] = value;
idx += lens[t];
}
num_idx += BLK_SIZE;
}
}
void __attribute__ ((noinline)) decode_soonts()
{
BlockDecoder dec;
const uint8_t* rsi = buffer;
int64_t* rdi = decoded;
for( size_t i = 0; i < n_msg; i += 16 )
{
rsi = dec.decode( rdi, rsi );
rdi += 16;
}
}
void __attribute__ ((noinline)) decode_aqrit()
{
decode_buf_bmi(n_msg, buffer, decoded);
}
void __attribute__ ((noinline)) decode_leb()
{
size_t idx = 0;
for (int i = 0; i < n_msg; i++) {
leb128_decode(buffer, decoded[i], idx);
}
}
//------------------
//------------------
//------------------ MAIN
void __attribute__ ((noinline)) gen_data()
{
for (size_t i = 0; i < n_msg; i++) {
uint64_t modifier = xoshiro256ss(&rng_state); // low 32 bits decide magnitude range, high bits decide sign, to avoid correlation with %100 over the full thing.
if ( uint32_t(modifier) % 100 == 0) {
original[i] = int64_t(xoshiro256ss(&rng_state)) * 1'000'000'000;
} else {
original[i] = xoshiro256ss(&rng_state) % 70'000;
}
if ((modifier>>62) == 0)
original[i] *= -1;
}
}
void __attribute__ ((noinline)) check(const string &name)
{
for (size_t i = 0; i < n_msg; i++) if (original[i] != decoded[i]) {
cout << name << " wrong at " << i << " " << original[i] << " " << decoded[i] << "\n";
exit(1);
}
}
int64_t volatile dummy = 42;
constexpr int N_DUMMY = 8'000'000;
void __attribute__ ((noinline)) clear_cache()
{
#if 1
for (int i = 0; i < N_DUMMY; i++) dummy_array[i] = dummy;
std::sort(dummy_array, dummy_array + N_DUMMY);
dummy = dummy_array[rand() % N_DUMMY];
#else
asm("" ::: "memory");
#endif
}
using FuncPtr = void (*)();
int64_t __attribute__ ((noinline)) test(FuncPtr func, const string &name)
{
clear_cache();
MyTimer timer;
timer.startCounter();
func();
int64_t cost = timer.getCounterNs();
check(name);
return cost;
}
void printTableRow(const std::vector<std::string>& columns, int columnWidth) {
for (const std::string& column : columns) {
std::cout << std::left << std::setw(columnWidth) << column << " | ";
}
std::cout << std::endl;
}
int main()
{
// srand(time(NULL));
rng_state.s[0] = time(nullptr); // other elements left zero. This is terrible but sufficient for our purposes
MyTimer timer;
buffer = new uint8_t[n_msg * 10];
original = new int64_t[n_msg];
decoded = new int64_t[n_msg];
dummy_array = new int64_t[N_DUMMY]; // enough to flush cache
memset(buffer, 0, n_msg * 10);
memset(original, 0, n_msg * 8);
memset(decoded, 0, n_msg * 8);
memset(dummy_array, 0, N_DUMMY * 8);
int n_test = 10;
constexpr int L = 4;
string names[L] = {"original", "soonts", "aqrit", "leb"};
int64_t total_costs[L] = {0, 0, 0, 0};
for (int t = 0; t < n_test; t++) {
gen_data();
double ratio_original = encode_all(0);
int64_t costs[L];
costs[0] = test(decode_original, names[0]);
costs[1] = test(decode_soonts, names[1]);
double ratio_aqrit = encode_all(1);
costs[2] = test(decode_aqrit, names[2]);
double ratio_leb = encode_all(2);
costs[3] = test(decode_leb, names[3]);
if (t == 0) {
cout << "Compression ratios: " << ratio_original << " " << ratio_aqrit << " " << ratio_leb << "\n";
}
cout << t << ": ";
for (int i = 0; i < L; i++) {
cout << (double(costs[i]) / 1'000'000) << " ";
total_costs[i] += costs[i];
}
cout << "\n";
}
vector<string> headers = {"name", "cost (ms)", "cost/int64 (ns)", "cost/byte (ns)"};
printTableRow(headers, 15);
for (int i = 0; i < L; i++) {
double average_cost_ms = double(total_costs[i]) / n_test / 1'000'000;
double cost_int64 = double(total_costs[i]) / (n_msg * n_test);
double cost_byte = cost_int64 / 8;
vector<string> row;
row.push_back(names[i]);
row.push_back(to_string(average_cost_ms));
row.push_back(to_string(cost_int64));
row.push_back(to_string(cost_byte));
printTableRow(row, 15);
}
return 0;
}
字符串
**编辑:**我刚刚发现有人在2017年发现了几乎相同的编码/解码格式(但对于int 32)。多么有趣的巧合,两个人在两个不同的时间独立地“发明”了同样的东西:D
Soonts答案的第二个版本(func
,inl func
,vpgather
)代码here。它看起来更干净,并且不会读取输入缓冲区的末尾,但速度较慢,所以我保留了第一个版本。
func
:此版本不需要解码器对象。force_inline = 0, use_gather = 0
inl func
:force_inline = 1, use_gather = 0
vpgather
:force_inline = 0, use_gather = 1
编译命令:g++ -o main test.cpp -std=c+=17 -O3 -mavx2 -mlzcnt -march=native
. -march=native
上的基准测试提供了可衡量的性能提升(最佳情况下约8%)
达到10次运行的平均值的时间(10次运行的平均值):
// AMD EPYC 75F3 (Zen 3 x86-64), 2950MHz
Compression ratios: 0.327437 0.369956
0: 447.651 65.0414 110.171
1: 442.853 64.5952 105.635
2: 434.715 64.1977 108.984
3: 430.09 63.0572 104.074
4: 424.451 64.6397 103.604
5: 436.631 65.0076 104.04
6: 429.59 64.1896 102.936
7: 434.184 64.0522 104.035
8: 430.223 69.3877 105.922
9: 420.659 63.7519 105.563
// AMD EPYC 75F3 -march=native
name | cost (ms) | cost/int64 (ns) | cost/byte (ns) |
original | 433.104635 | 4.331046 | 0.541381 |
soonts | 64.792034 | 0.647920 | 0.080990 |
aqrit | 105.496544 | 1.054965 | 0.131871 |
leb | 438.236469 | 4.382365 | 0.547796 |
soonts (func) | 68.403465 | 0.684035 | 0.085504 |
soon (inl func) | 71.202335 | 0.712023 | 0.089003 |
soon (vpgather) | 78.308508 | 0.783085 | 0.097886 |
// AMD EPYC 75F3 without -march=native
name | cost (ms) | cost/int64 (ns) | cost/byte (ns) |
original | 424.631426 | 4.246314 | 0.530789 |
soonts | 69.623498 | 0.696235 | 0.087029 |
aqrit | 109.895916 | 1.098959 | 0.137370 |
// Intel(R) Xeon(R) Gold 6252N CPU (supposedly 2.3 GHz, I can't change/lock frequency on this borrowed machine)
name | cost (ms) | cost/int64 (ns) | cost/byte (ns) |
original | 549.280441 | 5.492804 | 0.686601 |
soonts | 118.401718 | 1.184017 | 0.148002 |
aqrit | 151.811314 | 1.518113 | 0.189764 |
soon (func) | 122.440794 | 1.224408 | 0.153051 |
soon (inl func) | 120.559447 | 1.205594 | 0.150699 |
soon (vpgather) | 115.704298 | 1.157043 | 0.144630 |
aqrit | 151.811314 | 1.518113 | 0.189764 |
型
2条答案
按热度按时间wfsdck301#
这是相对棘手的,但仍然可以矢量化你的解码器。这里有一个实现,在我的电脑与AMD Zen 3 CPU优于您的参考版本的2.8倍。
字符串
我对代码的测试很少,它有可能进一步提高性能。
如您所见,该实现需要AVX 2,并且是无分支的。
使用示例:
型
**NB!**通常情况下,当流中的最后一个数字没有使用最大8字节时,该实现会在编码缓冲区的末尾加载几个字节。您可以填充压缩数据,或实现特殊情况来解码流的最后一个块,或调整编码器以始终为最后一个块的最后一个数字发出8字节。
P.S.初始版本使用
_mm256_i32gather_epi64
指令从内存中加载四个int 64数字,使用一个基指针+另一个向量的四个字节偏移量。然而,在AMD CPU上,它比4个标量加载稍慢。如果你的目标是Intel,可能更好地使用初始版本,请参阅编辑历史。qltillow2#
在大多数情况下,标量实现可以是无分支的。
从长远来看,用符号位填充控制位可能会花费位。相反,这里所有的控制位都打包在流的开头(类似于streamvbyte)。
使用可变符号扩展是因为它比字节标量实现的zigzag更便宜。
概念验证假设:
字符串