greenplumn datumstreamblock 源码
greenplumn datumstreamblock 代码
文件路径:/src/include/utils/datumstreamblock.h
/*-------------------------------------------------------------------------
*
* datumstreamblock.h
*
* Portions Copyright (c) 201, EMC Inc.
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
*
* IDENTIFICATION
* src/include/utils/datumstreamblock.h
*
*-------------------------------------------------------------------------
*/
#ifndef DATUMSTREAMBLOCK_H
#define DATUMSTREAMBLOCK_H
#include "catalog/pg_attribute.h"
#include "utils/guc.h"
typedef enum DatumStreamVersion
{
DatumStreamVersion_Original = 0, /* first valid version */
DatumStreamVersion_Dense = 1,
/*
* Version used for denser header (higher row count) and RLE_TYPE
* compression done by this module.
*/
DatumStreamVersion_Dense_Enhanced = 2, /* Version used for RLE_TYPE
* compression enhanced with
* Delta Range done by this
* module. */
MaxDatumStreamVersion /* must always be last */
} DatumStreamVersion;
/*
* This depicts how different structures defined below fit together in AO
* block on-disk for CO table, in case of RLE_TYPE compression.
*
* +------------------------------------------------------------------------------+
* | Datum Stream Version |
* +------------------------------------------------------------------------------+
* | DatumStreamBlock_Orig | DatumStreamBlock_Dense_Enhanced |
* +-----------------------+------------------------------------------------------+
* | Null Bit Map | Rle_Extension | Rle_Extension + | Null Bit Map |
* | (optional) | | Delta_extension | (optional) |
* +-----------------------+-------------------+-------------------+--------------+
* | | Null Bitmap | Null Bitmap | |
* | | (optional) | (optional) | |
* | +-------------------+-------------------+ |
* | | RLE Compression | RLE Compression | |
* | | Bitmap (optional) | Bitmap (optional) | |
* | +-------------------+-------------------+ Datum |
* | Datum | RLE counts | RLE counts | + |
* | + | (optional) | (optional) | Alignment |
* | Alignment +-------------------+-------------------+ |
* | | | Delta Compression | |
* | | Datum | Bitmap | |
* | | + +-------------------+ |
* | | Alignment | Deltas | |
* | | +-------------------+ |
* | | | Datum + Alignment | |
* +-----------------------+-------------------+-------------------+--------------+
*/
/*
* Datum Stream Block (Original).
* 16 bytes header. Followed by data.
*/
typedef struct DatumStreamBlock_Orig
{
int16 version; /* version number */
int16 flags; /* some flags */
int16 ndatum; /* number of datum, including null */
int16 unused; /* unused */
int32 nullsz; /* size nullbitmaps */
int32 sz; /* logical data size, not including header,
* nullbitmap, and padding */
} DatumStreamBlock_Orig;
/*
* Datum Stream Block (Dense).
*
* Dense format uses an Append-Only Storage Block header that has a larger
* RowCount field so we can store more NULL and/or represent more RLE_TYPE
* compressed items.
*
* Small Content Header allows maximum 16k logical rows per block (only 14
* bits are reserved for Row Count). If small content header is used with RLE
* it becomes very inefficient for storage as
* - If 16k rows are repeated then RLE_TYPE would compress that into one row
* with 16k RLE counter and that is all that it could be stored into one AO
* block.
* - If 1G rows are repeated then RLE_TYPE would compress that into 64k AO
* blocks, each AO block would have one row with 16k RLE counter, lots of
* space wasted.
* - If 16k rows are NULL then AO block would contain only header and null
* bitmap, lots of space wasted.
*
* Hence DatumStreamBlock_Dense is used, which allows recording higher number
* of logical rows. Data gets stored more efficiently. Benefits are to allow:
*
* - better compression with RLE_TYPE
* - additional space saving when column has large number of NULLs
* - better compression with DELTA_RANGE
*
* 16 bytes header.
*/
typedef struct DatumStreamBlock_Dense
{
struct orig_4_bytes
{
int16 version; /* version number */
int16 flags; /* some flags */
} orig_4_bytes;
int32 logical_row_count;
/*
* Number of physical datum PLUS number of NULLs; The logical
* count of rows.
*/
int32 physical_datum_count;
/* Number of physical datum. May be 0 if block is all NULLs. */
int32 physical_data_size;
/*
* Total data size of all physical datums in block. Does not
* include header(s), bitmap(s), padding between headers and
* datum area, etc.
*/
} DatumStreamBlock_Dense;
/*
* Datum Stream Block extension to DatumStreamBlock_Dense with RLE_TYPE compression data.
* 16 bytes more.
*/
typedef struct DatumStreamBlock_Rle_Extension
{
int32 norepeats_null_bitmap_count;
/*
* For non-RLE_TYPE compression, if there is a NULL bit-map,
* it will have row_count (see above) number of bits.
*
* However, for RLE_TYPE compression, we do not have a bunch
* of NULL bit-map bits for a repeated item. We just use one bit
* (a 0, of course, indicating the repeated item is not NULL). So,
* the NULL bit-map count is different than row_count.
*/
int32 compress_bitmap_count;
/*
* Number of bits in the compress bit-map.
*/
int32 repeatcounts_count;
/*
* Total number of repeated items.
*
* Also, the count of the ON bits in the compress bit-map.
*/
int32 repeatcounts_size;
/*
* Total size of the repeat counts array, when you account for
* the different 1, 2, 3, and 4 byte encoding size of each count.
*/
} DatumStreamBlock_Rle_Extension;
/*
* Datum Stream Block extension to Rle_Extension with DeltaRange.
* 12 bytes more.
*/
typedef struct DatumStreamBlock_Delta_Extension
{
int32 delta_bitmap_count;
/*
* Number of bits in the delta bit-map.
*/
int32 deltas_count;
/*
* Total number of items stored with delta.
*
* Also, the count of the ON bits in the Delta bit-map.
*/
int32 deltas_size;
/*
* Total size of the delta counts array, where you account for
* the different 1, 2, 3, and 4 byte encoding size of each delta.
*/
} DatumStreamBlock_Delta_Extension;
/* Flags */
enum
{
DSB_HAS_NULLBITMAP = 0x1,
DSB_HAS_RLE_COMPRESSION = 0x2,
DSB_HAS_DELTA_COMPRESSION = 0x4,
};
typedef struct DatumStreamBitMapWrite
{
uint8 *buffer;
int32 bufferSize;
uint8 byteBit;
uint8 *bytePointer;
int32 bitOnCount;
int32 bitCount;
} DatumStreamBitMapWrite;
typedef struct DatumStreamBitMapRead
{
uint8 *buffer;
uint8 byteBit;
uint8 *bytePointer;
int32 bitCount;
int32 bitPosition;
#ifdef USE_ASSERT_CHECKING
int32 readBitOnCount;
#endif
} DatumStreamBitMapRead;
/* Inline Forwards. */
static inline void DatumStreamBitMapRead_Init(
DatumStreamBitMapRead * bmr,
uint8 * buffer,
int32 bitCount);
static inline bool DatumStreamBitMapRead_IsExhausted(
DatumStreamBitMapRead * bmr);
static inline void DatumStreamBitMapRead_Next(
DatumStreamBitMapRead * bmr);
static inline bool DatumStreamBitMapRead_CurrentIsOn(
DatumStreamBitMapRead * bmr);
#ifdef USE_ASSERT_CHECKING
static inline int32
DatumStreamBitMapRead_OnSeenCount(
DatumStreamBitMapRead * bmr)
{
return bmr->readBitOnCount;
}
#endif
/*
* DatumStreamBitMap routines.
*/
static inline int32
DatumStreamBitMap_Size(
int32 bitCount)
{
return ((bitCount + 7) >> 3);
}
static inline int32
DatumStreamBitMap_CountOn(
uint8 * buffer,
int32 bitCount)
{
DatumStreamBitMapRead bmr;
int count;
DatumStreamBitMapRead_Init(
&bmr,
buffer,
bitCount);
/*
* Scan the whole bit-map.
*/
count = 0;
while (!DatumStreamBitMapRead_IsExhausted(&bmr))
{
DatumStreamBitMapRead_Next(&bmr);
if (DatumStreamBitMapRead_CurrentIsOn(&bmr))
{
count++;
}
}
return count;
}
/*
* DatumStreamBitMapWrite routines.
*/
/* Forward. */
static inline void DatumStreamBitMapWrite_Set(
DatumStreamBitMapWrite * bmw);
/*
* Initialize writing a bit-map.
*
* Writing is automatically positioned BEFORE THE FIRST BIT.
*
* Use DatumStreamBitMapWrite_AddBit to add a bit to the bit-map.
*/
static inline void
DatumStreamBitMapWrite_Init(
DatumStreamBitMapWrite * bmw,
uint8 * buffer,
int32 bufferSize)
{
bmw->buffer = buffer;
bmw->bufferSize = bufferSize;
bmw->bytePointer = buffer;
bmw->byteBit = 0;
bmw->bitOnCount = 0;
bmw->bitCount = 0;
}
static inline void
DatumStreamBitMapWrite_ZeroFill(
DatumStreamBitMapWrite * bmw,
int32 bitCount)
{
int32 newSize;
int32 bitNumberInByte;
/*
* Assert the bit-map is newly initialized.
*/
Assert(bmw->byteBit == 0);
Assert(bmw->bitOnCount == 0);
Assert(bmw->bitCount == 0);
if (bitCount > 0)
{
/*
* Assert we are not going beyond the maximum buffer size.
*/
newSize = DatumStreamBitMap_Size(bitCount);
#ifdef USE_ASSERT_CHECKING
if (newSize > bmw->bufferSize)
{
elog(ERROR,
"New fill bit-map buffer size %d greater than maximum buffer size %d",
newSize,
bmw->bufferSize);
}
#endif
/*
* Zero fill out and set the write variables.
*/
memset(bmw->buffer, 0, newSize);
bitNumberInByte = (bitCount - 1) & 7;
bmw->byteBit = 1 << bitNumberInByte;
bmw->bytePointer = &bmw->buffer[newSize - 1];
bmw->bitCount = bitCount;
}
}
static inline void
DatumStreamBitMapWrite_AddBit(
DatumStreamBitMapWrite * bmw,
bool on)
{
if (bmw->bitCount == 0)
{
Assert(bmw->byteBit == 0);
/* Zero first byte of bit-map. */
(*bmw->bytePointer) = 0;
bmw->byteBit = 1;
}
else
{
bmw->byteBit <<= 1;
if (bmw->byteBit == 0)
{
/* Zero next byte of bit-map */
*(++bmw->bytePointer) = 0;
bmw->byteBit = 1;
}
}
bmw->bitCount++;
if (on)
{
DatumStreamBitMapWrite_Set(bmw);
}
}
static inline bool
DatumStreamBitMapWrite_CurrentIsOn(
DatumStreamBitMapWrite * bmw)
{
return (((*bmw->bytePointer) & bmw->byteBit) != 0);
}
static inline void
DatumStreamBitMapWrite_Set(
DatumStreamBitMapWrite * bmw)
{
Assert(!DatumStreamBitMapWrite_CurrentIsOn(bmw));
(*bmw->bytePointer) |= bmw->byteBit;
bmw->bitOnCount++;
}
static inline int32
DatumStreamBitMapWrite_OnCount(
DatumStreamBitMapWrite * bmw)
{
return bmw->bitOnCount;
}
static inline int32
DatumStreamBitMapWrite_Count(
DatumStreamBitMapWrite * bmw)
{
return bmw->bitCount;
}
static inline int32
DatumStreamBitMapWrite_Size(
DatumStreamBitMapWrite * bmw)
{
return DatumStreamBitMap_Size(bmw->bitCount);
}
static inline int32
DatumStreamBitMapWrite_NextSize(
DatumStreamBitMapWrite * bmw)
{
return DatumStreamBitMap_Size(bmw->bitCount + 1);
}
static inline int32
DatumStreamBitMapWrite_MaxSize(
DatumStreamBitMapWrite * bmw)
{
return bmw->bufferSize;
}
static inline void
DatumStreamBitMapWrite_CopyToLargerBuffer(
DatumStreamBitMapWrite * bmw,
uint8 * newBuffer,
int32 newBufferSize)
{
Assert(newBufferSize > bmw->bufferSize);
memcpy(newBuffer, bmw->buffer, DatumStreamBitMapWrite_Size(bmw));
bmw->bytePointer = newBuffer + (bmw->bytePointer - bmw->buffer);
bmw->buffer = newBuffer;
bmw->bufferSize = newBufferSize;
}
/*
* DatumStreamBitMapRead routines.
*/
/*
* Initialize reading a bit-map.
*
* Reading is automatically positioned BEFORE THE FIRST BIT.
*/
static inline void
DatumStreamBitMapRead_Init(
DatumStreamBitMapRead * bmr,
uint8 * buffer,
int32 bitCount)
{
Assert(bitCount >= 0);
bmr->buffer = buffer;
bmr->bytePointer = buffer;
bmr->byteBit = 0;
bmr->bitCount = bitCount;
bmr->bitPosition = -1;
#ifdef USE_ASSERT_CHECKING
bmr->readBitOnCount = 0;
#endif
}
static inline bool
DatumStreamBitMapRead_IsExhausted(
DatumStreamBitMapRead * bmr)
{
return (bmr->bitPosition >= bmr->bitCount - 1);
}
static inline bool
DatumStreamBitMapRead_InRange(
DatumStreamBitMapRead * bmr)
{
return (bmr->bitPosition < bmr->bitCount);
}
static inline bool
DatumStreamBitMapRead_CurrentIsOn(
DatumStreamBitMapRead * bmr)
{
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for DEBUG
* builds...
*/
#ifdef USE_ASSERT_CHECKING
if (bmr->bitPosition >= bmr->bitCount)
{
elog(ERROR, "Past end of bitmap (bit position %d, total bit count %d)",
bmr->bitPosition,
bmr->bitCount);
}
#endif
return (((*bmr->bytePointer) & bmr->byteBit) != 0);
}
static inline int32
DatumStreamBitMapRead_Position(
DatumStreamBitMapRead * bmr)
{
return bmr->bitPosition;
}
static inline void
DatumStreamBitMapRead_Next(
DatumStreamBitMapRead * bmr)
{
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for DEBUG
* builds...
*/
#ifdef USE_ASSERT_CHECKING
if (bmr->bitPosition >= bmr->bitCount)
{
elog(ERROR, "Moving past end of bitmap (bit position %d, total bit count %d)",
bmr->bitPosition,
bmr->bitCount);
}
#endif
if (bmr->bitPosition == -1)
{
Assert(bmr->byteBit == 0);
Assert(bmr->bytePointer == bmr->buffer);
bmr->byteBit = 1;
}
else
{
bmr->byteBit <<= 1;
if (bmr->byteBit == 0)
{
++bmr->bytePointer;
bmr->byteBit = 1;
}
}
bmr->bitPosition++;
#ifdef USE_ASSERT_CHECKING
if (DatumStreamBitMapRead_CurrentIsOn(bmr))
{
bmr->readBitOnCount++;
}
#endif
}
static inline int32
DatumStreamBitMapRead_Size(
DatumStreamBitMapRead * bmr)
{
return DatumStreamBitMap_Size(bmr->bitCount);
}
static inline int32
DatumStreamBitMapRead_Count(
DatumStreamBitMapRead * bmr)
{
return bmr->bitCount;
}
/*
* DatumStreamBlockInt32Compress: Used currently for storing rle counter (how
* many times Datum is repeated). Upper 2 bits are reserved for tracking RLE
* counter length (00 =>1 byte, 01=>2 bytes, 10=>3 bytes, 11=>4 bytes).
*/
#define Int32Compress_MaxByteLen 4
#define Int32Compress_1ByteLimit 0x3F
#define Int32Compress_2ByteLimit 0x3FFF
#define Int32Compress_3ByteLimit 0x3FFFFF
#define Int32Compress_4ByteLimit 0x3FFFFFFF
#define Int32Compress_LenFieldMask 0xC0
#define Int32Compress_LenFieldShift 6
inline static int32 DatumStreamInt32Compress_Decode(
uint8 * buffer,
int32 * byteLen);
#ifdef USE_ASSERT_CHECKING
/* Currently only used in an Assert expression. */
inline static int32 DatumStreamInt32Compress_DecodeLen(
uint8 * buffer);
#endif
inline static int32
DatumStreamInt32Compress_Size(int32 value)
{
if (value <= Int32Compress_1ByteLimit)
{
return 1;
}
else if (value <= Int32Compress_2ByteLimit)
{
return 2;
}
else if (value <= Int32Compress_3ByteLimit)
{
return 3;
}
else
{
Assert(value <= Int32Compress_4ByteLimit);
return 4;
}
}
inline static int32
DatumStreamInt32Compress_Encode(
uint8 * buffer,
int32 value)
{
int32 byteLen;
if (value <= Int32Compress_1ByteLimit)
{
buffer[0] = (uint8) value;
byteLen = 1;
}
else if (value <= Int32Compress_2ByteLimit)
{
buffer[0] = (1 << Int32Compress_LenFieldShift) | (uint8) (value >> 8);
buffer[1] = (uint8) (value);
byteLen = 2;
}
else if (value <= Int32Compress_3ByteLimit)
{
buffer[0] = (2 << Int32Compress_LenFieldShift) | (uint8) (value >> 16);
buffer[1] = (uint8) (value >> 8);
buffer[2] = (uint8) (value);
byteLen = 3;
}
else
{
Assert(value <= Int32Compress_4ByteLimit);
buffer[0] = (3 << Int32Compress_LenFieldShift) | (uint8) (value >> 24);
buffer[1] = (uint8) (value >> 16);
buffer[2] = (uint8) (value >> 8);
buffer[3] = (uint8) (value);
byteLen = 4;
}
#ifdef USE_ASSERT_CHECKING
{
int32 decodeLen;
int32 decodedValue;
int32 decodedLen;
decodeLen = DatumStreamInt32Compress_DecodeLen(buffer);
if (decodeLen != byteLen)
{
elog(ERROR, "integer compress encode / decode length difference (expected %d, found %d)",
byteLen, decodeLen);
}
decodedValue = DatumStreamInt32Compress_Decode(buffer, &decodedLen);
if (decodedValue != value)
{
elog(ERROR, "integer compress encode / decode value difference (expected %d, found %d)",
value, decodedValue);
}
if (decodedLen != byteLen)
{
elog(ERROR, "integer compress encode / decode length difference #2 (expected %d, found %d)",
byteLen, decodedLen);
}
}
#endif
return byteLen;
}
#ifdef USE_ASSERT_CHECKING
/* Currently only used in an Assert expression. */
inline static int32
DatumStreamInt32Compress_DecodeLen(
uint8 * buffer)
{
uint8 masked;
uint8 lenFieldOnly;
masked = (buffer[0] & Int32Compress_LenFieldMask);
lenFieldOnly = masked >> Int32Compress_LenFieldShift;
return lenFieldOnly + 1;
}
#endif
inline static int32
DatumStreamInt32Compress_Decode(
uint8 * buffer,
int32 * byteLen)
{
uint8 masked;
uint8 lenFieldOnly;
int32 len;
int32 value = 0;
masked = (buffer[0] & Int32Compress_LenFieldMask);
lenFieldOnly = masked >> Int32Compress_LenFieldShift;
len = lenFieldOnly + 1;
switch (len)
{
case 1:
value = (buffer[0] & ~Int32Compress_LenFieldMask);
break;
case 2:
value = ((int32) (buffer[0] & ~Int32Compress_LenFieldMask) << 8) |
buffer[1];
break;
case 3:
value = ((int32) (buffer[0] & ~Int32Compress_LenFieldMask) << 16) |
((int32) buffer[1] << 8) |
buffer[2];
break;
case 4:
value = ((int32) (buffer[0] & ~Int32Compress_LenFieldMask) << 24) |
((int32) buffer[1] << 16) |
((int32) buffer[2] << 8) |
buffer[3];
break;
default:
ereport(FATAL,
(errmsg("Unexpected compressed integer byte length %d",
len)));
break;
}
*byteLen = len;
return value;
}
/*
* DatumStreamBlockInt32CompressReserved3: Used to encode and decode deltas
* (by how much the datum differs from its previous datum) for delta
* compression. Upper 3 bits are reserved. Upper 2 bits are used for tracking
* how much minimal space is needed to store the delta value (00 =>1 byte,
* 01=>2 bytes, 10=>3 bytes, 11=>4 bytes). 3rd bit is used for tracking if
* delta is positive (0) or negative (1). Lastly delta is stored as variable
* length but max with 29 bits of space. So, for example if datums are
* differing by 32 (2^5) or less then 1 byte is used to store the delta. If
* datums are differing by say 536870912 (2^29) then 4 bytes are used to
* store the delta.
*/
#define Int32CompressReserved3_MaxByteLen 4
#define Int32CompressReserved3_1ByteLimit 0x1F
#define Int32CompressReserved3_2ByteLimit 0x1FFF
#define Int32CompressReserved3_3ByteLimit 0x1FFFFF
#define Int32CompressReserved3_4ByteLimit 0x1FFFFFFF
#define Int32CompressReserved3_LenFieldMask 0xC0
#define Int32CompressReserved3_LenFieldShift 6
#define Int32CompressReserved3_ReservedFieldMask 0xE0
#define Int32CompressReserved3_SignPositiveMask 0x20
inline static int32 DatumStreamInt32CompressReserved3_Decode(
uint8 * buffer,
int32 * byteLen,
bool *sign);
#ifdef USE_ASSERT_CHECKING
/* Currently only used in an Assert expression. */
inline static int32 DatumStreamInt32CompressReserved3_DecodeLen(
uint8 * buffer);
#endif
inline static int32
DatumStreamInt32CompressReserved3_Size(int32 value)
{
if (value <= Int32CompressReserved3_1ByteLimit)
{
return 1;
}
else if (value <= Int32CompressReserved3_2ByteLimit)
{
return 2;
}
else if (value <= Int32CompressReserved3_3ByteLimit)
{
return 3;
}
else
{
Assert(value <= Int32CompressReserved3_4ByteLimit);
return 4;
}
}
inline static int32
DatumStreamInt32CompressReserved3_Encode(
uint8 * buffer,
int32 value,
bool sign)
{
int32 byteLen;
if (value <= Int32CompressReserved3_1ByteLimit)
{
buffer[0] = (uint8) value;
byteLen = 1;
}
else if (value <= Int32CompressReserved3_2ByteLimit)
{
buffer[0] = (1 << Int32CompressReserved3_LenFieldShift) | (uint8) (value >> 8);
buffer[1] = (uint8) (value);
byteLen = 2;
}
else if (value <= Int32CompressReserved3_3ByteLimit)
{
buffer[0] = (2 << Int32CompressReserved3_LenFieldShift) | (uint8) (value >> 16);
buffer[1] = (uint8) (value >> 8);
buffer[2] = (uint8) (value);
byteLen = 3;
}
else
{
Assert(value <= Int32CompressReserved3_4ByteLimit);
buffer[0] = (3 << Int32CompressReserved3_LenFieldShift) | (uint8) (value >> 24);
buffer[1] = (uint8) (value >> 16);
buffer[2] = (uint8) (value >> 8);
buffer[3] = (uint8) (value);
byteLen = 4;
}
/* Set the sign bit if positive */
if (sign)
{
buffer[0] = buffer[0] | Int32CompressReserved3_SignPositiveMask;
}
#ifdef USE_ASSERT_CHECKING
{
int32 decodeLen;
int32 decodedValue;
int32 decodedLen;
bool decodedSign;
decodeLen = DatumStreamInt32CompressReserved3_DecodeLen(buffer);
if (decodeLen != byteLen)
{
elog(ERROR, "integer compress Reserved3 encode / decode length difference (expected %d, found %d)",
byteLen, decodeLen);
}
decodedValue = DatumStreamInt32CompressReserved3_Decode(buffer, &decodedLen, &decodedSign);
if (decodedValue != value)
{
elog(ERROR, "integer compress Reserved3 encode / decode value difference (expected %d, found %d)",
value, decodedValue);
}
if (decodedSign != sign)
{
elog(ERROR, "integer compress Reserved3 encode / decode sign difference (expected %d, found %d)",
sign, decodedSign);
}
if (decodedLen != byteLen)
{
elog(ERROR, "integer compress Reserved3 encode / decode length difference #2 (expected %d, found %d)",
byteLen, decodedLen);
}
}
#endif
return byteLen;
}
#ifdef USE_ASSERT_CHECKING
/* Currently only used in an Assert expression. */
inline static int32
DatumStreamInt32CompressReserved3_DecodeLen(
uint8 * buffer)
{
uint8 masked;
uint8 lenFieldOnly;
masked = (buffer[0] & Int32CompressReserved3_LenFieldMask);
lenFieldOnly = masked >> Int32CompressReserved3_LenFieldShift;
return lenFieldOnly + 1;
}
#endif
inline static int32
DatumStreamInt32CompressReserved3_Decode(
uint8 * buffer,
int32 * byteLen,
bool *sign)
{
uint8 masked;
uint8 lenFieldOnly;
int32 len;
int32 value = 0;
masked = (buffer[0] & Int32CompressReserved3_LenFieldMask);
lenFieldOnly = masked >> Int32CompressReserved3_LenFieldShift;
len = lenFieldOnly + 1;
if (buffer[0] & Int32CompressReserved3_SignPositiveMask)
*sign = true;
//Positive
else
*sign = false;
//Negative
switch (len)
{
case 1:
value = (buffer[0] & ~Int32CompressReserved3_ReservedFieldMask);
break;
case 2:
value = ((int32) (buffer[0] & ~Int32CompressReserved3_ReservedFieldMask) << 8) |
buffer[1];
break;
case 3:
value = ((int32) (buffer[0] & ~Int32CompressReserved3_ReservedFieldMask) << 16) |
((int32) buffer[1] << 8) |
buffer[2];
break;
case 4:
value = ((int32) (buffer[0] & ~Int32CompressReserved3_ReservedFieldMask) << 24) |
((int32) buffer[1] << 16) |
((int32) buffer[2] << 8) |
buffer[3];
break;
default:
ereport(FATAL,
(errmsg("Unexpected compressed integer byte length %d",
len)));
break;
}
*byteLen = len;
return value;
}
typedef struct DatumStreamTypeInfo
{
/* Info determined by schema */
int32 datumlen; /* Datum length */
int32 typid; /* type id */
char align; /* Align */
bool byval; /* if it is a by value type */
} DatumStreamTypeInfo;
#define MAXREPEAT_COUNT 0x3FFFFFFF
#define DatumStreamBlockWrite_Eyecatcher "DBW"
#define DatumStreamBlockWrite_EyecatcherLen 4
typedef struct DatumStreamBlockWrite
{
char eyecatcher[DatumStreamBlockWrite_EyecatcherLen];
/*
* Used to validate this is a DatumStreamBlockWrite
* reference. 3 characters plus NUL.
*/
DatumStreamTypeInfo *typeInfo;
/*
* Version of datum stream block format -- original or RLE_TYPE,
*/
DatumStreamVersion datumStreamVersion;
bool rle_want_compression;
bool delta_want_compression;
int32 initialMaxDatumPerBlock;
int32 maxDatumPerBlock;
int32 maxDataBlockSize;
int (*errdetailCallback) (void *errdetailArg);
void *errdetailArg;
int (*errcontextCallback) (void *errcontextArg);
void *errcontextArg;
/* Common current pointer and null bit-map */
int32 nth; /* Current count of datum in the block,
* including NULLs. */
int32 physical_datum_count;
bool has_null; /* if we have any NULLs at all */
DatumStreamBitMapWrite null_bitmap;
uint8 *datump; /* pointer to datum */
int32 always_null_bitmap_count;
/* RLE_TYPE variables */
bool rle_has_compression;
uint8 *rle_last_item;
int32 rle_last_item_size;
bool rle_last_item_is_repeated;
int32 rle_total_repeat_items_written;
DatumStreamBitMapWrite rle_compress_bitmap;
int32 rle_repeatcounts_count;
int32 rle_repeatcounts_current_size;
/* Delta Range variables */
bool delta_has_compression;
bool not_first_datum;
/* 8 bytes is max length Delta Compression supports */
Datum compare_item;
DatumStreamBitMapWrite delta_bitmap;
int32 deltas_count;
int32 deltas_current_size;
/* Common buffers */
MemoryContext memctxt;
uint8 *null_bitmap_buffer;
int null_bitmap_buffer_size;
uint8 *datum_buffer;
int32 datum_buffer_size;
uint8 *datum_afterp;
/* RLE_TYPE buffers */
uint8 *rle_compress_bitmap_buffer;
int32 rle_compress_bitmap_buffer_size;
int32 *rle_repeatcounts;
int32 rle_repeatcounts_maxcount;
/* Delta Range buffers */
uint8 *delta_bitmap_buffer;
int32 delta_bitmap_buffer_size;
int32 *deltas;
bool *delta_sign;
int32 deltas_maxcount;
/* EOF of current file */
int64 savings;
int64 remember_savings;
} DatumStreamBlockWrite;
#define DatumStreamBlockRead_Eyecatcher "DBE"
#define DatumStreamBlockRead_EyecatcherLen 4
typedef struct DatumStreamBlockRead
{
/* A copy of the type information from DatumStream module since it is */
/* referenced so often. */
DatumStreamTypeInfo typeInfo;
/*
* Version of datum stream block format -- original or RLE_TYPE,
*/
DatumStreamVersion datumStreamVersion;
/* Common current pointer and null bit-map */
int32 nth; /* CURRENT position of datum in the block,
* including NULLs. */
int32 logical_row_count;
/*
* Total number of datum in block, including NULLs.
*/
uint8 *datump; /* pointer to datum */
int32 physical_datum_index;
int32 physical_datum_count;
uint8 *datum_beginp;
uint8 *datum_afterp;
bool has_null; /* if we have any NULLs at all */
/*
* Pointer to buffer containing the read data.
*/
uint8 *buffer_beginp;
DatumStreamBitMapRead null_bitmap;
/*
* Information unpacked from the block header, etc.
*/
uint8 *null_bitmap_beginp;
bool rle_can_have_compression;
uint8 *rle_compress_beginp;
uint8 *rle_repeatcountsp;
uint8 *delta_beginp;
uint8 *delta_deltasp;
Datum delta_datum_p;
int32 rle_norepeats_null_bitmap_count;
int32 rle_compress_bitmap_count;
int32 rle_repeatcounts_count;
int32 rle_repeatcounts_size;
bool delta_item;
int32 delta_bitmap_count;
int32 deltas_count;
int32 deltas_size;
/* Dense write variables */
bool rle_block_was_compressed;
DatumStreamBitMapRead rle_compress_bitmap;
int32 rle_repeatcounts_index;
bool rle_in_repeated_item;
int32 rle_repeated_item_count;
int32 rle_total_repeat_items_read;
/* Delta varibales */
bool delta_block_was_compressed;
DatumStreamBitMapRead delta_bitmap;
/*
* Keep less frequently accessed fields down here for possible better CPU data cache
* performance.
*/
char eyecatcher[DatumStreamBlockRead_EyecatcherLen];
/*
* Used to validate this is a DatumStreamBlockRead
* reference. 3 characters plus NUL.
*/
int32 physical_data_size;
int32 maxDataBlockSize;
int (*errdetailCallback) (void *errdetailArg);
void *errdetailArg;
int (*errcontextCallback) (void *errcontextArg);
void *errcontextArg;
MemoryContext memctxt;
} DatumStreamBlockRead;
extern char *DatumStreamVersion_String(DatumStreamVersion datumStreamVersion);
/*
* varlena header info to string.
*/
extern char *VarlenaInfoToString(uint8 * p);
extern int errdetail_datumstreamblockread(
DatumStreamBlockRead * dsr);
extern int errcontext_datumstreamblockread(
DatumStreamBlockRead * dsr);
#ifdef USE_ASSERT_CHECKING
extern void DatumStreamBlockRead_PrintVarlenaInfo(
DatumStreamBlockRead * dsr,
uint8 * p);
#endif
#ifdef USE_ASSERT_CHECKING
extern void DatumStreamBlockRead_CheckDenseGetInvariant(
DatumStreamBlockRead * dsr);
#endif
/* Stream access method */
inline static void
DatumStreamBlockRead_Get(DatumStreamBlockRead * dsr, Datum *datum, bool *null)
{
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for DEBUG
* builds...
*/
#ifdef USE_ASSERT_CHECKING
if (strncmp(dsr->eyecatcher, DatumStreamBlockRead_Eyecatcher, DatumStreamBlockRead_EyecatcherLen) != 0)
elog(FATAL, "DatumStreamBlockRead data structure not valid (eyecatcher)");
#endif
#ifdef USE_ASSERT_CHECKING
if ((dsr->datumStreamVersion == DatumStreamVersion_Dense) ||
(dsr->datumStreamVersion == DatumStreamVersion_Dense_Enhanced))
{
DatumStreamBlockRead_CheckDenseGetInvariant(dsr);
}
#endif
if (dsr->has_null && DatumStreamBitMapRead_CurrentIsOn(&dsr->null_bitmap))
{
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for
* DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block %s read is returning NULL "
"(nth %d)",
DatumStreamVersion_String(dsr->datumStreamVersion),
dsr->nth),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
*null = true;
return;
}
else
*null = false;
if (dsr->typeInfo.datumlen == -1)
{
#ifdef USE_ASSERT_CHECKING
int32 varLen;
#endif
Assert(dsr->delta_item == false);
*datum = PointerGetDatum(dsr->datump);
Assert(VARATT_IS_SHORT(DatumGetPointer(*datum)) || !VARATT_IS_EXTERNAL(DatumGetPointer(*datum)));
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for
* DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
varLen = VARSIZE_ANY(DatumGetPointer(*datum));
if (varLen < 0 || varLen > dsr->physical_data_size)
{
ereport(ERROR,
(errmsg("Datum stream block %s read variable-length item index %d length too large "
"(nth %d, logical row count %d, "
"item length %d, total physical data size %d, "
"current datum pointer %p, after data pointer %p)",
DatumStreamVersion_String(dsr->datumStreamVersion),
dsr->physical_datum_index,
dsr->nth,
dsr->logical_row_count,
varLen,
dsr->physical_data_size,
dsr->datump,
dsr->datum_afterp),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
if (dsr->datump + varLen > dsr->datum_afterp)
{
ereport(ERROR,
(errmsg("Datum stream block %s read variable-length item index %d length goes beyond end of block "
"(nth %d, logical row count %d, "
"item length %d, "
"current datum pointer %p, after data pointer %p)",
DatumStreamVersion_String(dsr->datumStreamVersion),
dsr->physical_datum_index,
dsr->nth,
dsr->logical_row_count,
varLen,
dsr->datump,
dsr->datum_afterp),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
if (Debug_datumstream_read_print_varlena_info)
{
DatumStreamBlockRead_PrintVarlenaInfo(
dsr,
dsr->datump);
}
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block %s read is returning variable-length item #%d "
"(nth %d, item begin %p, item offset " INT64_FORMAT ")",
DatumStreamVersion_String(dsr->datumStreamVersion),
dsr->physical_datum_index,
dsr->nth,
dsr->datump,
(int64) (dsr->datump - dsr->datum_beginp)),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
}
else if (!dsr->typeInfo.byval)
{
Assert(dsr->delta_item == false);
*datum = PointerGetDatum(dsr->datump);
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for
* DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block %s read is returning fixed-length item #%d "
"(nth %d, item size %d, item begin %p, item offset " INT64_FORMAT ")",
DatumStreamVersion_String(dsr->datumStreamVersion),
dsr->physical_datum_index,
dsr->nth,
dsr->typeInfo.datumlen,
dsr->datump,
(int64) (dsr->datump - dsr->datum_beginp)),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
}
else
{
if (!dsr->delta_item)
{
/*
* Performance is so critical we don't use a switch statement here.
*/
if (dsr->typeInfo.datumlen == 1)
{
*datum = *(uint8 *) dsr->datump;
}
else if (dsr->typeInfo.datumlen == 2)
{
Assert(IsAligned(dsr->datump, 2));
*datum = *(uint16 *) dsr->datump;
}
else if (dsr->typeInfo.datumlen == 4)
{
Assert(IsAligned(dsr->datump, 4));
*datum = *(uint32 *) dsr->datump;
}
else if (dsr->typeInfo.datumlen == 8)
{
Assert(IsAligned(dsr->datump, 8) || IsAligned(dsr->datump, 4));
*datum = *(Datum *) dsr->datump;
}
else
{
*datum = 0;
Assert(false);
}
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking
* for DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block %s read is returning fixed-length item #%d "
"(nth %d, item size %d, item begin %p, item offset " INT64_FORMAT ", integer " INT64_FORMAT ")",
DatumStreamVersion_String(dsr->datumStreamVersion),
dsr->physical_datum_index,
dsr->nth,
dsr->typeInfo.datumlen,
dsr->datump,
(int64) (dsr->datump - dsr->datum_beginp),
(int64) * datum),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
}
else
{
if (dsr->typeInfo.datumlen == 4)
{
*datum = (uint32) dsr->delta_datum_p;
}
else if (dsr->typeInfo.datumlen == 8)
{
*datum = dsr->delta_datum_p;
}
else
{
*datum = 0;
Assert(false);
}
}
}
}
inline static int
DatumStreamBlockRead_AdvanceOrig(DatumStreamBlockRead * dsr)
{
Assert(dsr);
++dsr->nth;
//Initially, -1.
/* Advance out of bounds? */
if (dsr->nth >= dsr->logical_row_count)
return 0;
if (dsr->has_null)
{
DatumStreamBitMapRead_Next(&dsr->null_bitmap);
Assert(DatumStreamBitMapRead_InRange(&dsr->null_bitmap));
if (DatumStreamBitMapRead_CurrentIsOn(&dsr->null_bitmap))
{
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking
* for DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block read is positioned to NULL "
"(nth %d)",
dsr->nth),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
return 1;
}
}
Assert(dsr->datump >= dsr->datum_beginp);
Assert(dsr->datump < dsr->datum_afterp);
++dsr->physical_datum_index;
//Initially, -1.
if (dsr->physical_datum_index == 0)
{
/* Pre-positioned by block read to first item. */
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for
* DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block read advance is positioned to first item "
"(nth %d, logical row count %d, item begin %p, item offset " INT64_FORMAT ")",
dsr->nth,
dsr->logical_row_count,
dsr->datump,
(int64) (dsr->datump - dsr->datum_beginp)),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
}
else
{
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for
* DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
uint8 *item_beginp;
item_beginp = dsr->datump;
#endif
/*
* Advance the item pointer.
*/
if (dsr->typeInfo.datumlen == -1)
{
struct varlena *s = (struct varlena *) dsr->datump;
Assert(dsr->datump >= dsr->datum_beginp);
Assert(dsr->datump < dsr->datum_afterp);
dsr->datump += VARSIZE_ANY(s);
/*
* Skip any possible zero paddings AFTER PREVIOUS varlena data.
*/
if (*dsr->datump == 0)
{
dsr->datump = (uint8 *) att_align_nominal(dsr->datump, dsr->typeInfo.align);
}
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking
* for DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
if (dsr->datump > dsr->datum_afterp)
{
ereport(ERROR,
(errmsg("Datum stream block read pointer to variable-length item index %d out of bounds "
"(nth %d, logical row count %d, "
"current datum pointer %p, next datum pointer %p, after data pointer %p)",
dsr->physical_datum_index,
dsr->nth,
dsr->logical_row_count,
item_beginp,
dsr->datump,
dsr->datum_afterp),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block read advanced to variable-length item index %d "
"(nth %d, logical row count %d, "
"previous item begin %p, previous item offset " INT64_FORMAT ", next item begin %p)",
dsr->physical_datum_index,
dsr->nth,
dsr->logical_row_count,
item_beginp,
(int64) (item_beginp - dsr->datum_beginp),
dsr->datump),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
}
else if (dsr->typeInfo.datumlen == -2)
{
dsr->datump += strlen((char *) dsr->datump) + 1;
}
else
{
dsr->datump += dsr->typeInfo.datumlen;
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking
* for DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block read advanced to fixed-item item index %d "
"(nth %d, logical row count %d, "
"item size %d, previous item begin %p, previous item offset " INT64_FORMAT ", next item begin %p)",
dsr->physical_datum_index,
dsr->nth,
dsr->logical_row_count,
dsr->typeInfo.datumlen,
item_beginp,
(int64) (item_beginp - dsr->datum_beginp),
dsr->datump),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
}
}
return 1;
}
typedef enum Delta_Compression_status
{
DELTA_COMPRESSION_OK = 0,
DELTA_COMPRESSION_NOT_APPLIED = 1,
DELTA_COMPRESSION_ERROR = 2,
} Delta_Compression_status;
inline static Delta_Compression_status
DatumStreamBlockRead_AdvanceDenseDelta(DatumStreamBlockRead * dsr)
{
int64 delta;
bool sign;
int32 byteLen;
if (!dsr->delta_block_was_compressed)
{
return DELTA_COMPRESSION_NOT_APPLIED;
}
Assert((dsr->typeInfo.datumlen >= 4) && (dsr->typeInfo.datumlen <= 8));
Assert(dsr->typeInfo.byval);
DatumStreamBitMapRead_Next(&dsr->delta_bitmap);
Assert(DatumStreamBitMapRead_InRange(&dsr->delta_bitmap));
if (!DatumStreamBitMapRead_CurrentIsOn(&dsr->delta_bitmap))
{
/*
* NOT Delta Item
*/
dsr->delta_item = false;
uint8 *d;
if (dsr->physical_datum_index == -1)
{
d = dsr->datump;
}
else
{
d = dsr->datump + dsr->typeInfo.datumlen;
}
memcpy(&dsr->delta_datum_p, d, dsr->typeInfo.datumlen);
return DELTA_COMPRESSION_NOT_APPLIED;
}
dsr->delta_item = true;
/*
* delta_datum_p should contain the current datum value
* So, lets read the delta value and sign and compute it
*/
delta = DatumStreamInt32CompressReserved3_Decode(dsr->delta_deltasp, &byteLen, &sign);
dsr->delta_deltasp += byteLen;
if (dsr->typeInfo.datumlen == 4)
{
if (sign)
{
/* Add delta */
*(uint32 *) (&dsr->delta_datum_p) += delta;
}
else
{
/* Substract delta */
*(uint32 *) (&dsr->delta_datum_p) -= delta;
}
}
else if (dsr->typeInfo.datumlen == 8)
{
if (sign)
{
/* Add delta */
dsr->delta_datum_p += delta;
}
else
{
/* Substract delta */
dsr->delta_datum_p -= delta;
}
}
else
{
dsr->delta_datum_p = 0;
Assert(false);
}
#ifdef USE_ASSERT_CHECKING
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block read Delta value "
"(nth %d, logical row count %d, delta " INT64_FORMAT ", sign %d)",
dsr->nth,
dsr->logical_row_count,
delta,
sign),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
return DELTA_COMPRESSION_OK;
}
inline static int
DatumStreamBlockRead_AdvanceDense(DatumStreamBlockRead * dsr)
{
Assert(dsr);
++dsr->nth;
//Initially, -1.
/* Advance out of bounds? */
if (dsr->nth >= dsr->logical_row_count)
return 0;
if (!dsr->rle_block_was_compressed)
{
/*
* A Dense block with optional NULL bit-map.
*/
if (dsr->has_null)
{
DatumStreamBitMapRead_Next(&dsr->null_bitmap);
Assert(DatumStreamBitMapRead_InRange(&dsr->null_bitmap));
if (DatumStreamBitMapRead_CurrentIsOn(&dsr->null_bitmap))
{
/*
* NULL item.
*/
return 1;
}
}
}
else
{
/*
* For RLE_TYPE compression, we only represent the repeated item once in
* the NULL bit-map.
*/
if (dsr->rle_in_repeated_item)
{
Assert(dsr->rle_repeated_item_count > 0);
#ifdef USE_ASSERT_CHECKING
if (dsr->has_null)
{
if (!DatumStreamBitMapRead_InRange(&dsr->null_bitmap))
{
ereport(ERROR,
(errmsg("Datum stream block read advance NULL bit-map out-of-range "
"(nth %d, logical row count %d, NULL bit-map count %d)",
dsr->nth,
dsr->logical_row_count,
DatumStreamBitMapRead_Count(&dsr->null_bitmap)),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
if (DatumStreamBitMapRead_CurrentIsOn(&dsr->null_bitmap))
{
ereport(ERROR,
(errmsg("Datum stream block read advance NULL bit-map ON for repeated item "
"(nth %d, logical row count %d, NULL bit-map count %d)",
dsr->nth,
dsr->logical_row_count,
DatumStreamBitMapRead_Count(&dsr->null_bitmap)),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
}
if (!DatumStreamBitMapRead_InRange(&dsr->rle_compress_bitmap))
{
ereport(ERROR,
(errmsg("Datum stream block read advance COMPRESS bit-map out-of-range "
"(nth %d, logical row count %d, COMPRESS bit-map count %d)",
dsr->nth,
dsr->logical_row_count,
DatumStreamBitMapRead_Count(&dsr->rle_compress_bitmap)),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
if (!DatumStreamBitMapRead_CurrentIsOn(&dsr->rle_compress_bitmap))
{
ereport(ERROR,
(errmsg("Datum stream block read advance COMPRESS bit-map OFF for repeated item "
"(nth %d, logical row count %d, COMPRESS bit-map count %d)",
dsr->nth,
dsr->logical_row_count,
DatumStreamBitMapRead_Count(&dsr->rle_compress_bitmap)),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
dsr->rle_repeated_item_count--;
dsr->rle_total_repeat_items_read++;
if (dsr->rle_repeated_item_count <= 0)
{
/*
* The nth item is the last of the repeated item.
*
* Leave the RLE_TYPE compression bit-map positions and
* optional NULL bit-map positions the same since they still represent
* the last copy of the repeated item.
*/
dsr->rle_in_repeated_item = false;
}
/*
* Item pointers are already setup.
*/
return 1;
}
/*
* The first step of advancing not being inside current repeated item is advancing the
* NULL bit-map.
*
* (We only advance the RLE_TYPE compress bit-map inside a non-NULL item)
*/
if (dsr->has_null)
{
DatumStreamBitMapRead_Next(&dsr->null_bitmap);
#ifdef USE_ASSERT_CHECKING
if (!DatumStreamBitMapRead_InRange(&dsr->null_bitmap))
{
/* pg_usleep(300 * 1000000L); */
ereport(ERROR,
(errmsg("Datum stream block read advance NULL bit-map out-of-range "
"(nth %d, logical row count %d, NULL bit-map count %d, "
"physical datum count %d, COMPRESS bit-map count %d)",
dsr->nth,
dsr->logical_row_count,
DatumStreamBitMapRead_Count(&dsr->null_bitmap),
dsr->physical_datum_count,
DatumStreamBitMapRead_Count(&dsr->rle_compress_bitmap)),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
if (DatumStreamBitMapRead_CurrentIsOn(&dsr->null_bitmap))
{
/*
* NULL item.
*/
return 1;
}
}
/*
* Now that we have an item that is not NULL, see if it is repeated.
*/
DatumStreamBitMapRead_Next(&dsr->rle_compress_bitmap);
#ifdef USE_ASSERT_CHECKING
if (!DatumStreamBitMapRead_InRange(&dsr->rle_compress_bitmap))
{
ereport(ERROR,
(errmsg("Datum stream block read advance COMPRESS bit-map out-of-range "
"(nth %d, logical row count %d, COMPRESS bit-map count %d)",
dsr->nth,
dsr->logical_row_count,
DatumStreamBitMapRead_Count(&dsr->rle_compress_bitmap)),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
if (DatumStreamBitMapRead_CurrentIsOn(&dsr->rle_compress_bitmap))
{
int32 repeatCount;
int32 byteLen;
/*
* New repeated item.
*/
dsr->rle_repeatcounts_index++;
dsr->rle_total_repeat_items_read++;
repeatCount = DatumStreamInt32Compress_Decode(dsr->rle_repeatcountsp, &byteLen);
dsr->rle_repeatcountsp += byteLen;
/* UNDONE: Integrity check. */
dsr->rle_repeated_item_count = repeatCount;
dsr->rle_in_repeated_item = true;
}
/*
* Fall through and setup pointer to new item.
*/
}
if (dsr->delta_block_was_compressed)
{
if (DatumStreamBlockRead_AdvanceDenseDelta(dsr) == DELTA_COMPRESSION_OK)
{
return 1;
}
}
Assert(dsr->datump >= dsr->datum_beginp);
Assert(dsr->datump < dsr->datum_afterp);
++dsr->physical_datum_index;
//Initially, -1.
if (dsr->physical_datum_index == 0)
{
/* Pre-positioned by block read to first item. */
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for
* DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block read advance is positioned to first item "
"(nth %d, logical row count %d, item begin %p, item offset " INT64_FORMAT ")",
dsr->nth,
dsr->logical_row_count,
dsr->datump,
(int64) (dsr->datump - dsr->datum_beginp)),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
}
else
{
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for
* DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
uint8 *item_beginp;
item_beginp = dsr->datump;
#endif
/*
* Advance the item pointer.
*/
if (dsr->typeInfo.datumlen == -1)
{
struct varlena *s;
Assert(dsr->typeInfo.datumlen == -1);
Assert(dsr->datump >= dsr->datum_beginp);
Assert(dsr->datump < dsr->datum_afterp);
s = (struct varlena *) dsr->datump;
dsr->datump += VARSIZE_ANY(s);
/*
* Skip any possible zero paddings AFTER varlena data.
*/
if (*dsr->datump == 0)
{
dsr->datump = (uint8 *) att_align_nominal(dsr->datump, dsr->typeInfo.align);
}
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking
* for DEBUG builds...
*/
#ifdef USE_ASSERT_CHECKING
if (dsr->datump > dsr->datum_afterp)
{
ereport(ERROR,
(errmsg("Datum stream block read pointer to variable-length item index %d out of bounds "
"(nth %d, logical row count %d, "
"current datum pointer %p, next datum pointer %p, after data pointer %p)",
dsr->physical_datum_index,
dsr->nth,
dsr->logical_row_count,
item_beginp,
dsr->datump,
dsr->datum_afterp),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block read advanced to variable-length item index %d "
"(nth %d, logical row count %d, "
"previous item begin %p, previous item offset " INT64_FORMAT ", next item begin %p)",
dsr->physical_datum_index,
dsr->nth,
dsr->logical_row_count,
item_beginp,
(int64) (item_beginp - dsr->datum_beginp),
dsr->datump),
errdetail_datumstreamblockread(dsr),
errcontext_datumstreamblockread(dsr)));
}
#endif
}
else if (dsr->typeInfo.datumlen == -2)
{
dsr->datump += strlen((char *) dsr->datump) + 1;
}
else
{
dsr->datump += dsr->typeInfo.datumlen;
}
}
return 1;
}
inline static int
DatumStreamBlockRead_Advance(DatumStreamBlockRead * dsr)
{
/*
* PERFORMANCE EXPERIMENT: Only do integrity and trace checking for DEBUG
* builds...
*/
#ifdef USE_ASSERT_CHECKING
if (strncmp(dsr->eyecatcher, DatumStreamBlockRead_Eyecatcher, DatumStreamBlockRead_EyecatcherLen) != 0)
elog(FATAL, "DatumStreamBlockRead data structure not valid (eyecatcher)");
#endif
if (dsr->datumStreamVersion == DatumStreamVersion_Original)
{
return DatumStreamBlockRead_AdvanceOrig(dsr);
}
else
{
Assert((dsr->datumStreamVersion == DatumStreamVersion_Dense) ||
(dsr->datumStreamVersion == DatumStreamVersion_Dense_Enhanced));
return DatumStreamBlockRead_AdvanceDense(dsr);
}
}
inline static int
DatumStreamBlockRead_Nth(DatumStreamBlockRead * dsr)
{
return dsr->nth;
}
extern void DatumStreamBlockRead_GetReadyOrig(
DatumStreamBlockRead * dsr,
uint8 * buffer,
int32 bufferSize,
int64 firstRowNum,
int32 rowCount,
bool *hadToAdjustRowCount,
int32 * adjustedRowCount);
extern void DatumStreamBlockRead_GetReadyDense(
DatumStreamBlockRead * dsr,
uint8 * buffer,
int32 bufferSize,
int64 firstRowNum,
int32 rowCount,
bool *hadToAdjustRowCount,
int32 * adjustedRowCount);
inline static void
DatumStreamBlockRead_GetReady(
DatumStreamBlockRead * dsr,
uint8 * buffer,
int32 bufferSize,
int64 firstRowNum,
int32 rowCount,
bool *hadToAdjustRowCount,
int32 * adjustedRowCount)
{
if (dsr->datumStreamVersion == DatumStreamVersion_Original)
{
return DatumStreamBlockRead_GetReadyOrig(
dsr,
buffer,
bufferSize,
firstRowNum,
rowCount,
hadToAdjustRowCount,
adjustedRowCount);
}
else
{
Assert(dsr->datumStreamVersion == DatumStreamVersion_Dense ||
(dsr->datumStreamVersion == DatumStreamVersion_Dense_Enhanced));
return DatumStreamBlockRead_GetReadyDense(
dsr,
buffer,
bufferSize,
firstRowNum,
rowCount,
hadToAdjustRowCount,
adjustedRowCount);
}
}
extern void DatumStreamBlockRead_ResetOrig(DatumStreamBlockRead * dsr);
extern void DatumStreamBlockRead_ResetDense(DatumStreamBlockRead * dsr);
inline static void
DatumStreamBlockRead_Reset(DatumStreamBlockRead * dsr)
{
if (dsr->datumStreamVersion == DatumStreamVersion_Original)
{
DatumStreamBlockRead_ResetOrig(dsr);
}
else
{
Assert(dsr->datumStreamVersion == DatumStreamVersion_Dense ||
(dsr->datumStreamVersion == DatumStreamVersion_Dense_Enhanced));
DatumStreamBlockRead_ResetDense(dsr);
}
}
extern void DatumStreamBlockRead_Init(
DatumStreamBlockRead * dsr,
DatumStreamTypeInfo * typeInfo,
DatumStreamVersion datumStreamVersion,
bool rle_can_have_compression,
int (*errdetailCallback) (void *errdetailArg),
void *errdetailArg,
int (*errcontextCallback) (void *errcontextArg),
void *errcontextArg);
extern void DatumStreamBlockRead_Finish(
DatumStreamBlockRead * dsr);
extern void DatumStreamBlockWrite_Init(
DatumStreamBlockWrite * dsw,
DatumStreamTypeInfo * typeInfo,
DatumStreamVersion datumStreamVersion,
bool rle_want_compression,
bool delta_want_compression,
int32 initialMaxDatumPerBlock,
int32 maxDatumPerBlock,
int32 maxDataBlockSize,
int (*errdetailCallback) (void *errdetailArg),
void *errdetailArg,
int (*errcontextCallback) (void *errcontextArg),
void *errcontextArg);
extern void DatumStreamBlockWrite_Finish(
DatumStreamBlockWrite * dsw);
extern int DatumStreamBlockWrite_Put(
DatumStreamBlockWrite * dsw,
Datum d,
bool null,
void **toFree);
extern int DatumStreamBlockWrite_Nth(DatumStreamBlockWrite * dsw);
extern void DatumStreamBlockWrite_GetReady(
DatumStreamBlockWrite * dsw);
extern int64 DatumStreamBlockWrite_Block(
DatumStreamBlockWrite * dsw,
uint8 * buffer);
#endif /* DATUMSTREAMBLOCK_H */
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦