greenplumn datumstream 源码
greenplumn datumstream 代码
文件路径:/src/backend/utils/datumstream/datumstream.c
/*-------------------------------------------------------------------------
*
* datumstream.c
*
* Portions Copyright (c) 2009, Greenplum Inc.
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
*
* IDENTIFICATION
* src/backend/utils/datumstream/datumstream.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <sys/file.h>
#include <sys/param.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include "access/tupmacs.h"
#include "access/tuptoaster.h"
#include "catalog/pg_attribute_encoding.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbappendonlyblockdirectory.h"
#include "cdb/cdbappendonlystoragelayer.h"
#include "cdb/cdbappendonlystorageread.h"
#include "cdb/cdbappendonlystoragewrite.h"
#include "utils/datumstream.h"
#include "utils/guc.h"
#include "catalog/pg_compression.h"
#include "utils/faultinjector.h"
typedef enum AOCSBK
{
AOCSBK_None = 0,
AOCSBK_BLOCK,
AOCSBK_BLOB,
} AOCSBK;
static void
datumstreamread_check_large_varlena_integrity(
DatumStreamRead * acc,
uint8 * buffer,
int32 contentLen)
{
struct varlena *va;
va = (struct varlena *) buffer;
if (contentLen < VARHDRSZ)
{
elog(ERROR, "Large varlena header too small. Found %d, expected at least %d",
contentLen,
VARHDRSZ);
}
if (VARATT_IS_EXTERNAL(va))
{
elog(ERROR, "Large varlena has a toast reference but Append-Only Column Store tables do not use toast");
}
}
static void
datumstreamread_print_large_varlena_info(
DatumStreamRead * acc,
uint8 * p)
{
elog(LOG, "Read large varlena <%s>",
VarlenaInfoToString(p));
}
/*
* Error detail and context callback for tracing or errors occurring during reading.
*/
static int
datumstreamread_detail_callback(void *arg)
{
DatumStreamRead *acc = (DatumStreamRead *) arg;
/*
* Append-Only Storage Read's detail.
*/
if (acc->need_close_file)
{
errdetail_appendonly_read_storage_content_header(&acc->ao_read);
}
return 0;
}
static int
datumstreamread_context_callback(void *arg)
{
DatumStreamRead *acc = (DatumStreamRead *) arg;
if (Debug_appendonly_print_datumstream)
elog(LOG,
"datumstream_advance filePathName %s nth %u ndatum %u datump %p ",
acc->ao_read.bufferedRead.filePathName,
acc->blockRead.nth,
acc->blockRead.logical_row_count,
acc->blockRead.datump);
/*
* Append-Only Storage Read's context.
*/
if (acc->need_close_file)
{
errcontext_appendonly_read_storage_block(&acc->ao_read);
}
else
{
errcontext("%s", acc->title);
}
return 0;
}
/*
* Error detail and context callback for tracing or errors occurring during writing.
*/
static int
datumstreamwrite_detail_callback(void *arg)
{
/* DatumStreamWrite *acc = (DatumStreamWrite*) arg; */
/*
* Append-Only Storage Write's detail.
*/
/* UNDONE */
return 0;
}
static int
datumstreamwrite_context_callback(void *arg)
{
DatumStreamWrite *acc = (DatumStreamWrite *) arg;
char *str;
/*
* Append-Only Storage Write's context.
*/
if (acc->need_close_file)
{
str = AppendOnlyStorageWrite_ContextStr(&acc->ao_write);
errcontext("%s", str);
pfree(str);
}
else
{
errcontext("%s", acc->title);
}
return 0;
}
void
datumstreamread_getlarge(DatumStreamRead * acc, Datum *datum, bool *null)
{
switch (acc->largeObjectState)
{
case DatumStreamLargeObjectState_HaveAoContent:
ereport(ERROR,
(errmsg("Advance not called on large datum stream object")));
return;
case DatumStreamLargeObjectState_PositionAdvanced:
acc->largeObjectState = DatumStreamLargeObjectState_Consumed;
/* Fall below to ~_Consumed. */
/* fallthrough */
case DatumStreamLargeObjectState_Consumed:
{
int32 len;
len = VARSIZE_ANY(acc->buffer_beginp);
/*
* It is ok to get the same object more than once.
*/
if (Debug_datumstream_read_print_varlena_info)
{
datumstreamread_print_large_varlena_info(
acc,
acc->buffer_beginp);
}
if (Debug_appendonly_print_scan_tuple)
{
ereport(LOG,
(errmsg("Datum stream block read is returning large variable-length object "
"(length %d)",
len)));
}
*datum = PointerGetDatum(acc->buffer_beginp);
*null = false;
return;
}
case DatumStreamLargeObjectState_Exhausted:
ereport(ERROR,
(errmsg("Get called after large datum stream object already consumed")));
return;
default:
ereport(FATAL,
(errmsg("Unexpected large datum stream state %d",
acc->largeObjectState)));
return;
}
}
int
datumstreamread_advancelarge(DatumStreamRead * acc)
{
acc->blockRead.nth++;
switch (acc->largeObjectState)
{
case DatumStreamLargeObjectState_HaveAoContent:
{
struct varlena *va;
int32 len;
va = (struct varlena *) acc->buffer_beginp;
len = VARSIZE_ANY(va);
acc->largeObjectState = DatumStreamLargeObjectState_PositionAdvanced;
return len;
}
case DatumStreamLargeObjectState_PositionAdvanced:
case DatumStreamLargeObjectState_Consumed:
/*
* Second advance returns exhaustion.
*/
acc->largeObjectState = DatumStreamLargeObjectState_Exhausted;
return 0;
case DatumStreamLargeObjectState_Exhausted:
ereport(ERROR,
(errmsg("Advance called after large datum stream object already consumed")));
return 0;
/* Never gets here. */
default:
ereport(FATAL,
(errmsg("Unexpected large datum stream state %d",
acc->largeObjectState)));
return 0;
/* Never reaches here. */
}
}
int
datumstreamread_nthlarge(DatumStreamRead * acc)
{
switch (acc->largeObjectState)
{
case DatumStreamLargeObjectState_HaveAoContent:
ereport(ERROR,
(errmsg("Advance not called on large datum stream object")));
return 0;
/* Never gets here. */
case DatumStreamLargeObjectState_PositionAdvanced:
case DatumStreamLargeObjectState_Consumed:
return 0;
case DatumStreamLargeObjectState_Exhausted:
ereport(ERROR,
(errmsg("Nth called after large datum stream object already consumed")));
return 0;
/* Never gets here. */
default:
ereport(FATAL,
(errmsg("Unexpected large datum stream state %d",
acc->largeObjectState)));
return 0;
/* Never reaches here. */
}
}
int
datumstreamwrite_put(
DatumStreamWrite * acc,
Datum d,
bool null,
void **toFree)
{
return DatumStreamBlockWrite_Put(&acc->blockWrite, d, null, toFree);
}
int
datumstreamwrite_nth(DatumStreamWrite * acc)
{
return DatumStreamBlockWrite_Nth(&acc->blockWrite);
}
static void
init_datumstream_typeinfo(
DatumStreamTypeInfo * typeInfo,
Form_pg_attribute attr)
{
typeInfo->datumlen = attr->attlen;
typeInfo->typid = attr->atttypid;
typeInfo->align = attr->attalign;
typeInfo->byval = attr->attbyval;
}
/*
* DeltaRange compression supported for folowing datatypes
* INTEGER, BIGINT, DATE, TIME and TIMESTAMP
*/
static bool
is_deltarange_compression_supported(Form_pg_attribute attr)
{
switch (attr->atttypid)
{
case INT4OID:
case INT8OID:
case DATEOID:
case TIMEOID:
case TIMESTAMPOID:
case TIMESTAMPTZOID:
/*
* FIXED length attributes and max with 8bytes length
* supported for DeltaRange
*/
Assert((attr->attlen >= 4) && (attr->attlen <= 8));
Assert(attr->attbyval);
return true;
}
return false;
}
static void
init_datumstream_info(
DatumStreamTypeInfo * typeInfo, //OUTPUT
DatumStreamVersion * datumStreamVersion, //OUTPUT
bool *rle_compression, //OUTPUT
bool *delta_compression, //OUTPUT
AppendOnlyStorageAttributes *ao_attr, //OUTPUT
int32 * maxAoBlockSize, //OUTPUT
char *compName,
int32 compLevel,
bool checksum,
int32 safeFSWriteSize,
int32 maxsz,
Form_pg_attribute attr)
{
init_datumstream_typeinfo(
typeInfo,
attr);
/*
* Adjust maxsz for Append-Only Storage.
*/
if (maxsz <= 0 || maxsz > MAX_APPENDONLY_BLOCK_SIZE)
elog(ERROR, "invalid AO block size %d", maxsz);
*maxAoBlockSize = maxsz;
/*
* Assume the folowing unless modified below.
*/
*rle_compression = false;
*delta_compression = false;
ao_attr->compress = false;
ao_attr->compressType = NULL;
ao_attr->compressLevel = 0;
*datumStreamVersion = DatumStreamVersion_Original;
ao_attr->checksum = checksum;
/*
* The original version didn't bother to populate these fields...
*/
ao_attr->safeFSWriteSize = 0;
if (compName != NULL && pg_strcasecmp(compName, "rle_type") == 0)
{
/*
* For RLE_TYPE, we do the compression ourselves in this module.
*
* Optionally, BULK Compression by the AppendOnlyStorage layer may be performed
* as a second compression on the "Access Method" (first) compressed block.
*/
*datumStreamVersion = DatumStreamVersion_Dense_Enhanced;
*rle_compression = true;
ao_attr->safeFSWriteSize = safeFSWriteSize;
/*
* Use the compresslevel as a kludgy way of specifiying the BULK compression
* to use.
*/
switch (compLevel)
{
case 1:
ao_attr->compress = false;
ao_attr->compressType = "none";
ao_attr->compressLevel = 1;
break;
case 2:
ao_attr->compress = true;
ao_attr->compressType = "zlib";
ao_attr->compressLevel = 1;
break;
case 3:
ao_attr->compress = true;
ao_attr->compressType = "zlib";
ao_attr->compressLevel = 5;
break;
case 4:
ao_attr->compress = true;
ao_attr->compressType = "zlib";
ao_attr->compressLevel = 9;
break;
default:
ereport(ERROR,
(errmsg("Unexpected compresslevel %d",
compLevel)));
}
/*
* Check if for this dataype delta encoding is supported.
* With RLE this layer also does Delta range encoding
*/
*delta_compression = is_deltarange_compression_supported(attr);
}
else if (compName == NULL || pg_strcasecmp(compName, "none") == 0)
{
/* No bulk compression. */
}
else
{
/*
* Bulk compression will be used by AppendOnlyStorage{Write|Read} modules.
*/
ao_attr->compress = true;
ao_attr->compressType = compName;
ao_attr->compressLevel = compLevel;
}
}
static void
determine_datumstream_compression_overflow(
AppendOnlyStorageAttributes *ao_attr,
size_t(*desiredCompSizeFunc) (size_t input),
int32 maxAoBlockSize)
{
int desiredOverflowBytes = 0;
if (desiredCompSizeFunc != NULL)
{
/*
* Call the compression's desired size function to find out what additional
* space it requires for our block size.
*/
desiredOverflowBytes =
(int) (desiredCompSizeFunc) (maxAoBlockSize) - maxAoBlockSize;
Assert(desiredOverflowBytes >= 0);
}
ao_attr->overflowSize = desiredOverflowBytes;
}
DatumStreamWrite *
create_datumstreamwrite(
char *compName,
int32 compLevel,
bool checksum,
int32 safeFSWriteSize,
int32 maxsz,
Form_pg_attribute attr,
char *relname,
char *title,
bool needsWAL)
{
DatumStreamWrite *acc = palloc0(sizeof(DatumStreamWrite));
int32 initialMaxDatumPerBlock;
int32 maxDatumPerBlock;
PGFunction *compressionFunctions;
CompressionState *compressionState;
CompressionState *verifyBlockCompressionState;
init_datumstream_info(
&acc->typeInfo,
&acc->datumStreamVersion,
&acc->rle_want_compression,
&acc->delta_want_compression,
&acc->ao_attr,
&acc->maxAoBlockSize,
compName,
compLevel,
checksum,
safeFSWriteSize,
maxsz,
attr);
compressionFunctions = NULL;
compressionState = NULL;
verifyBlockCompressionState = NULL;
if (acc->ao_attr.compress)
{
/*
* BULK compression.
*/
compressionFunctions = get_funcs_for_compression(acc->ao_attr.compressType);
if (compressionFunctions != NULL)
{
TupleDesc td = CreateTupleDesc(1, &attr);
StorageAttributes sa;
sa.comptype = acc->ao_attr.compressType;
sa.complevel = acc->ao_attr.compressLevel;
sa.blocksize = acc->maxAoBlockSize;
compressionState =
callCompressionConstructor(
compressionFunctions[COMPRESSION_CONSTRUCTOR], td, &sa, /* compress */ true);
determine_datumstream_compression_overflow(
&acc->ao_attr,
compressionState->desired_sz,
acc->maxAoBlockSize);
if (gp_appendonly_verify_write_block)
{
verifyBlockCompressionState =
callCompressionConstructor(
compressionFunctions[COMPRESSION_CONSTRUCTOR], td, &sa, /* compress */ false);
}
}
}
AppendOnlyStorageWrite_Init(
&acc->ao_write,
/* memoryContext */ NULL,
acc->maxAoBlockSize,
relname,
title,
&acc->ao_attr,
needsWAL);
acc->ao_write.compression_functions = compressionFunctions;
acc->ao_write.compressionState = compressionState;
acc->ao_write.verifyWriteCompressionState = verifyBlockCompressionState;
acc->title = title;
/*
* Temporarily set the firstRowNum for the block so that we can
* calculate the correct header length.
*/
AppendOnlyStorageWrite_SetFirstRowNum(&acc->ao_write, 1);
switch (acc->datumStreamVersion)
{
case DatumStreamVersion_Original:
initialMaxDatumPerBlock = MAXDATUM_PER_AOCS_ORIG_BLOCK;
maxDatumPerBlock = MAXDATUM_PER_AOCS_ORIG_BLOCK;
acc->maxAoHeaderSize = AoHeader_Size(
/* isLong */ false,
checksum,
/* hasFirstRowNum */ true);
break;
case DatumStreamVersion_Dense:
case DatumStreamVersion_Dense_Enhanced:
initialMaxDatumPerBlock = INITIALDATUM_PER_AOCS_DENSE_BLOCK;
maxDatumPerBlock = MAXDATUM_PER_AOCS_DENSE_BLOCK;
acc->maxAoHeaderSize = AoHeader_Size(
/* isLong */ true,
checksum,
/* hasFirstRowNum */ true);
break;
default:
ereport(ERROR,
(errmsg("Unexpected datum stream version %d",
acc->datumStreamVersion)));
initialMaxDatumPerBlock = 0;
/* Quiet down compiler. */
maxDatumPerBlock = 0;
break;
/* Never reached. */
}
DatumStreamBlockWrite_Init(
&acc->blockWrite,
&acc->typeInfo,
acc->datumStreamVersion,
acc->rle_want_compression,
acc->delta_want_compression,
initialMaxDatumPerBlock,
maxDatumPerBlock,
acc->maxAoBlockSize - acc->maxAoHeaderSize,
/* errdetailCallback */ datumstreamwrite_detail_callback,
/* errdetailArg */ (void *) acc,
/* errcontextCallback */ datumstreamwrite_context_callback,
/* errcontextArg */ (void *) acc);
return acc;
}
DatumStreamRead *
create_datumstreamread(
char *compName,
int32 compLevel,
bool checksum,
int32 safeFSWriteSize,
int32 maxsz,
Form_pg_attribute attr,
char *relname,
char *title)
{
DatumStreamRead *acc = palloc0(sizeof(DatumStreamRead));
PGFunction *compressionFunctions;
CompressionState *compressionState;
acc->memctxt = CurrentMemoryContext;
init_datumstream_info(
&acc->typeInfo,
&acc->datumStreamVersion,
&acc->rle_can_have_compression,
&acc->delta_can_have_compression,
&acc->ao_attr,
&acc->maxAoBlockSize,
compName,
compLevel,
checksum,
safeFSWriteSize,
maxsz,
attr);
compressionFunctions = NULL;
compressionState = NULL;
if (acc->ao_attr.compress)
{
/*
* BULK compression.
*/
compressionFunctions = get_funcs_for_compression(acc->ao_attr.compressType);
if (compressionFunctions != NULL)
{
StorageAttributes sa;
sa.comptype = acc->ao_attr.compressType;
sa.complevel = acc->ao_attr.compressLevel;
sa.blocksize = acc->maxAoBlockSize;
compressionState =
callCompressionConstructor(
compressionFunctions[COMPRESSION_CONSTRUCTOR], NULL, &sa, false /* compress */ );
determine_datumstream_compression_overflow(
&acc->ao_attr,
compressionState->desired_sz,
acc->maxAoBlockSize);
}
}
AppendOnlyStorageRead_Init(
&acc->ao_read,
/* memoryContext */ NULL,
acc->maxAoBlockSize,
relname,
title,
&acc->ao_attr);
acc->ao_read.compression_functions = compressionFunctions;
acc->ao_read.compressionState = compressionState;
acc->title = title;
acc->blockFirstRowNum = 1;
Assert(acc->blockFileOffset == 0);
Assert(acc->blockRowCount == 0);
DatumStreamBlockRead_Init(
&acc->blockRead,
&acc->typeInfo,
acc->datumStreamVersion,
acc->rle_can_have_compression,
/* errdetailCallback */ datumstreamread_detail_callback,
/* errdetailArg */ (void *) acc,
/* errcontextCallback */ datumstreamread_context_callback,
/* errcontextArg */ (void *) acc);
Assert(acc->large_object_buffer == NULL);
Assert(acc->large_object_buffer_size == 0);
Assert(acc->largeObjectState == DatumStreamLargeObjectState_None);
Assert(acc->eof == 0);
Assert(acc->eofUncompress == 0);
if (Debug_appendonly_print_scan)
{
if (!acc->ao_attr.compress)
{
ereport(LOG,
(errmsg("Datum stream read %s created with NO bulk compression for %s"
"(maximum Append-Only blocksize %d, "
"checksum %s)",
DatumStreamVersion_String(acc->datumStreamVersion),
acc->title,
acc->maxAoBlockSize,
(acc->ao_attr.checksum ? "true" : "false"))));
}
else
{
ereport(LOG,
(errmsg("Datum stream read %s created with bulk compression for %s "
"(maximum Append-Only blocksize %d, "
"compression type %s, compress level %d, "
"checksum %s)",
DatumStreamVersion_String(acc->datumStreamVersion),
acc->title,
acc->maxAoBlockSize,
acc->ao_attr.compressType,
acc->ao_attr.compressLevel,
(acc->ao_attr.checksum ? "true" : "false"))));
}
}
return acc;
}
void
destroy_datumstreamwrite(DatumStreamWrite * ds)
{
DatumStreamBlockWrite_Finish(&ds->blockWrite);
AppendOnlyStorageWrite_FinishSession(&ds->ao_write);
if (ds->title)
{
pfree(ds->title);
}
pfree(ds);
}
void
destroy_datumstreamread(DatumStreamRead * ds)
{
DatumStreamBlockRead_Finish(&ds->blockRead);
if (ds->large_object_buffer)
pfree(ds->large_object_buffer);
if (ds->datum_upgrade_buffer)
pfree(ds->datum_upgrade_buffer);
AppendOnlyStorageRead_FinishSession(&ds->ao_read);
if (ds->title)
{
pfree(ds->title);
}
pfree(ds);
}
void
datumstreamwrite_open_file(DatumStreamWrite *ds, char *fn, int64 eof, int64 eofUncompressed,
RelFileNodeBackend *relFileNode, int32 segmentFileNum, int version)
{
ds->eof = eof;
ds->eofUncompress = eofUncompressed;
if (ds->need_close_file)
datumstreamwrite_close_file(ds);
/*
* Segment file #0 is created when the Append-Only table is created.
*
* Other segment files are created on-demand under transaction.
*/
if (segmentFileNum > 0 && eof == 0)
{
AppendOnlyStorageWrite_TransactionCreateFile(&ds->ao_write,
relFileNode,
segmentFileNum);
}
/*
* Open the existing file for write.
*/
AppendOnlyStorageWrite_OpenFile(&ds->ao_write,
fn,
version,
eof,
eofUncompressed,
relFileNode,
segmentFileNum);
ds->need_close_file = true;
}
void
datumstreamread_open_file(DatumStreamRead * ds, char *fn, int64 eof, int64 eofUncompressed, RelFileNode relFileNode, int32 segmentFileNum, int version)
{
ds->eof = eof;
ds->eofUncompress = eofUncompressed;
if (ds->need_close_file)
datumstreamread_close_file(ds);
AppendOnlyStorageRead_OpenFile(&ds->ao_read, fn, version, ds->eof);
ds->need_close_file = true;
}
void
datumstreamwrite_close_file(DatumStreamWrite * ds)
{
AppendOnlyStorageWrite_TransactionFlushAndCloseFile(
&ds->ao_write,
&ds->eof,
&ds->eofUncompress);
/*
* Add the access method "compression" saving.
*
* If the savings are negative, then the compresion ratio could fall below 1.0
*/
ds->eofUncompress += ds->blockWrite.savings;
ds->need_close_file = false;
}
void
datumstreamread_close_file(DatumStreamRead * ds)
{
AppendOnlyStorageRead_CloseFile(&ds->ao_read);
ds->need_close_file = false;
}
static int64
datumstreamwrite_block_orig(DatumStreamWrite * acc)
{
int64 writesz = 0;
uint8 *buffer = NULL;
int32 rowCount;
/*
* Set the BlockFirstRowNum. Need to set this before
* calling AppendOnlyStorageWrite_GetBuffer.
*/
AppendOnlyStorageWrite_SetFirstRowNum(&acc->ao_write,
acc->blockFirstRowNum);
rowCount = DatumStreamBlockWrite_Nth(&acc->blockWrite);
Assert(rowCount <= MAXDATUM_PER_AOCS_ORIG_BLOCK);
buffer = AppendOnlyStorageWrite_GetBuffer(
&acc->ao_write,
AoHeaderKind_SmallContent);
writesz = DatumStreamBlockWrite_Block(
&acc->blockWrite,
buffer);
acc->ao_write.logicalBlockStartOffset =
BufferedAppendNextBufferPosition(&(acc->ao_write.bufferedAppend));
/* Write it out */
AppendOnlyStorageWrite_FinishBuffer(
&acc->ao_write,
(int32) writesz,
AOCSBK_BLOCK,
rowCount);
/* Set up our write block information */
DatumStreamBlockWrite_GetReady(&acc->blockWrite);
return writesz;
}
static int64
datumstreamwrite_block_dense(DatumStreamWrite * acc)
{
int64 writesz = 0;
uint8 *buffer = NULL;
int32 rowCount;
AoHeaderKind aoHeaderKind;
/*
* Set the BlockFirstRowNum. Need to set this before
* calling AppendOnlyStorageWrite_GetBuffer.
*/
AppendOnlyStorageWrite_SetFirstRowNum(&acc->ao_write,
acc->blockFirstRowNum);
rowCount = DatumStreamBlockWrite_Nth(&acc->blockWrite);
if (rowCount <= AOSmallContentHeader_MaxRowCount)
{
aoHeaderKind = AoHeaderKind_SmallContent;
}
else if (acc->ao_attr.compress)
{
aoHeaderKind = AoHeaderKind_BulkDenseContent;
}
else
{
aoHeaderKind = AoHeaderKind_NonBulkDenseContent;
}
buffer = AppendOnlyStorageWrite_GetBuffer(
&acc->ao_write,
aoHeaderKind);
writesz = DatumStreamBlockWrite_Block(
&acc->blockWrite,
buffer);
acc->ao_write.logicalBlockStartOffset =
BufferedAppendNextBufferPosition(&(acc->ao_write.bufferedAppend));
/* Write it out */
AppendOnlyStorageWrite_FinishBuffer(
&acc->ao_write,
(int32) writesz,
AOCSBK_BLOCK,
rowCount);
/* Set up our write block information */
DatumStreamBlockWrite_GetReady(&acc->blockWrite);
return writesz;
}
int64
datumstreamwrite_block(DatumStreamWrite *acc,
AppendOnlyBlockDirectory *blockDirectory,
int columnGroupNo,
bool addColAction)
{
int64 writesz;
int itemCount = DatumStreamBlockWrite_Nth(&acc->blockWrite);
/* Nothing to write, this is just no op */
if (itemCount == 0)
{
return 0;
}
switch (acc->datumStreamVersion)
{
case DatumStreamVersion_Original:
writesz = datumstreamwrite_block_orig(acc);
break;
case DatumStreamVersion_Dense:
case DatumStreamVersion_Dense_Enhanced:
writesz = datumstreamwrite_block_dense(acc);
break;
default:
elog(ERROR, "Unexpected datum stream version %d",
acc->datumStreamVersion);
return 0;
/* Never reaches here. */
}
/* Insert an entry to the block directory */
AppendOnlyBlockDirectory_InsertEntry(
blockDirectory,
columnGroupNo,
acc->blockFirstRowNum,
AppendOnlyStorageWrite_LogicalBlockStartOffset(&acc->ao_write),
itemCount,
addColAction);
return writesz;
}
static void
datumstreamwrite_print_large_varlena_info(
DatumStreamWrite * acc,
uint8 * p)
{
elog(LOG, "Write large varlena <%s>",
VarlenaInfoToString(p));
}
int64
datumstreamwrite_lob(DatumStreamWrite * acc,
Datum d,
AppendOnlyBlockDirectory *blockDirectory,
int colGroupNo,
bool addColAction)
{
uint8 *p;
int32 varLen;
Assert(acc);
Assert(acc->datumStreamVersion == DatumStreamVersion_Original ||
acc->datumStreamVersion == DatumStreamVersion_Dense ||
acc->datumStreamVersion == DatumStreamVersion_Dense_Enhanced);
if (acc->typeInfo.datumlen >= 0)
{
elog(ERROR, "Large object must be variable length objects (varlena)");
}
/*
* If the datum is toasted / compressed -- an error.
*/
if (VARATT_IS_EXTENDED(DatumGetPointer(d)))
{
elog(ERROR, "Expected large object / variable length objects (varlena) to be de-toasted and/or de-compressed at this point");
}
/*
* De-Toast Datum
*/
if (VARATT_IS_EXTERNAL(DatumGetPointer(d)))
{
d = PointerGetDatum(heap_tuple_fetch_attr(
(struct varlena *) DatumGetPointer(d)));
}
p = (uint8 *) DatumGetPointer(d);
varLen = VARSIZE_ANY(p);
if (Debug_datumstream_write_print_large_varlena_info)
{
datumstreamwrite_print_large_varlena_info(
acc,
p);
}
/* Set the BlockFirstRowNum */
AppendOnlyStorageWrite_SetFirstRowNum(&acc->ao_write,
acc->blockFirstRowNum);
AppendOnlyStorageWrite_Content(
&acc->ao_write,
p,
varLen,
AOCSBK_BLOB,
/* rowCount */ 1);
/* Insert an entry to the block directory */
AppendOnlyBlockDirectory_InsertEntry(
blockDirectory,
colGroupNo,
acc->blockFirstRowNum,
AppendOnlyStorageWrite_LogicalBlockStartOffset(&acc->ao_write),
1, /*itemCount -- always just the lob just inserted */
addColAction);
return varLen;
}
static bool
datumstreamread_block_info(DatumStreamRead * acc)
{
bool readOK = false;
Assert(acc);
readOK = AppendOnlyStorageRead_GetBlockInfo(
&acc->ao_read,
&acc->getBlockInfo.contentLen,
&acc->getBlockInfo.execBlockKind,
&acc->getBlockInfo.firstRow,
&acc->getBlockInfo.rowCnt,
&acc->getBlockInfo.isLarge,
&acc->getBlockInfo.isCompressed);
if (!readOK)
return false;
acc->blockFirstRowNum = acc->getBlockInfo.firstRow;
acc->blockFileOffset = acc->ao_read.current.headerOffsetInFile;
acc->blockRowCount = acc->getBlockInfo.rowCnt;
if (Debug_appendonly_print_scan)
elog(LOG,
"Datum stream read get block typeInfo for table '%s' "
"(contentLen %d, execBlockKind = %d, firstRowNum " INT64_FORMAT ", "
"rowCount %d, isLargeContent %s, isCompressed %s, "
"blockFirstRowNum " INT64_FORMAT ", blockFileOffset " INT64_FORMAT ", blockRowCount %d)",
AppendOnlyStorageRead_RelationName(&acc->ao_read),
acc->getBlockInfo.contentLen,
acc->getBlockInfo.execBlockKind,
acc->getBlockInfo.firstRow,
acc->getBlockInfo.rowCnt,
(acc->getBlockInfo.isLarge ? "true" : "false"),
(acc->getBlockInfo.isCompressed ? "true" : "false"),
acc->blockFirstRowNum,
acc->blockFileOffset,
acc->blockRowCount);
return true;
}
static void
datumstreamread_block_get_ready(DatumStreamRead * acc)
{
/*
* Read header information and setup for reading the datum in the block.
*/
if (acc->getBlockInfo.execBlockKind == AOCSBK_BLOCK)
{
bool hadToAdjustRowCount;
int32 adjustedRowCount;
DatumStreamBlockRead_GetReady(
&acc->blockRead,
acc->buffer_beginp,
acc->getBlockInfo.contentLen,
acc->getBlockInfo.firstRow,
acc->getBlockInfo.rowCnt,
&hadToAdjustRowCount,
&adjustedRowCount);
if (hadToAdjustRowCount)
{
acc->blockRowCount = adjustedRowCount;
}
}
else if (acc->getBlockInfo.execBlockKind == AOCSBK_BLOB)
{
Assert(acc->buffer_beginp == acc->large_object_buffer);
}
else
{
elog(ERROR,
"Unexpected Append-Only Column Store executor kind %d",
acc->getBlockInfo.execBlockKind);
}
}
void
datumstreamread_block_content(DatumStreamRead * acc)
{
Assert(acc);
/*
* Clear out state from previous block.
*/
DatumStreamBlockRead_Reset(&acc->blockRead);
acc->largeObjectState = DatumStreamLargeObjectState_None;
/*
* Read in data.
*/
if (acc->getBlockInfo.execBlockKind == AOCSBK_BLOCK)
{
Assert(!acc->getBlockInfo.isLarge);
if (acc->getBlockInfo.isCompressed)
{
/* Compressed, need to decompress to our own buffer. */
if (acc->large_object_buffer_size < acc->getBlockInfo.contentLen)
{
MemoryContext oldCtxt;
oldCtxt = MemoryContextSwitchTo(acc->memctxt);
if (acc->large_object_buffer)
{
pfree(acc->large_object_buffer);
acc->large_object_buffer = NULL;
SIMPLE_FAULT_INJECTOR("malloc_failure");
}
acc->large_object_buffer_size = acc->getBlockInfo.contentLen;
acc->large_object_buffer = palloc(acc->getBlockInfo.contentLen);
MemoryContextSwitchTo(oldCtxt);
}
AppendOnlyStorageRead_Content(
&acc->ao_read,
(uint8 *) acc->large_object_buffer,
acc->getBlockInfo.contentLen);
acc->buffer_beginp = acc->large_object_buffer;
}
else
{
acc->buffer_beginp = AppendOnlyStorageRead_GetBuffer(&acc->ao_read);
}
if (Debug_appendonly_print_datumstream)
elog(LOG,
"datumstream_read_block_content filePathName %s firstRowNum " INT64_FORMAT " rowCnt %u "
"ndatum %u contentLen %d datump %p",
acc->ao_read.bufferedRead.filePathName,
acc->getBlockInfo.firstRow,
acc->getBlockInfo.rowCnt,
acc->blockRead.logical_row_count,
acc->getBlockInfo.contentLen, acc->blockRead.datump);
}
else if (acc->getBlockInfo.execBlockKind == AOCSBK_BLOB)
{
Assert(acc->getBlockInfo.rowCnt == 1);
if (acc->typeInfo.datumlen >= 0)
{
elog(ERROR, "Large object must be variable length objects (varlena)");
}
/*
* NOTE: Do not assert the content is large. What appears to be large
* content
*/
/* NOTE: can compress into one AO storage block. */
if (acc->large_object_buffer_size < acc->getBlockInfo.contentLen)
{
MemoryContext oldCtxt;
oldCtxt = MemoryContextSwitchTo(acc->memctxt);
if (acc->large_object_buffer)
pfree(acc->large_object_buffer);
acc->large_object_buffer_size = acc->getBlockInfo.contentLen;
acc->large_object_buffer = palloc(acc->getBlockInfo.contentLen);
MemoryContextSwitchTo(oldCtxt);
}
AppendOnlyStorageRead_Content(
&acc->ao_read,
acc->large_object_buffer,
acc->getBlockInfo.contentLen);
acc->buffer_beginp = acc->large_object_buffer;
acc->largeObjectState = DatumStreamLargeObjectState_HaveAoContent;
if (Debug_datumstream_read_check_large_varlena_integrity)
{
datumstreamread_check_large_varlena_integrity(
acc,
acc->buffer_beginp,
acc->getBlockInfo.contentLen);
}
}
else
{
elog(ERROR,
"Unexpected Append-Only Column Store executor kind %d",
acc->getBlockInfo.execBlockKind);
}
/*
* Unpack the information from the block headers and get ready to read the first datum.
*/
datumstreamread_block_get_ready(acc);
}
int
datumstreamread_block(DatumStreamRead * acc,
AppendOnlyBlockDirectory *blockDirectory,
int colGroupNo)
{
bool readOK = false;
Assert(acc);
acc->blockFirstRowNum += acc->blockRowCount;
readOK = AppendOnlyStorageRead_GetBlockInfo(&acc->ao_read,
&acc->getBlockInfo.contentLen,
&acc->getBlockInfo.execBlockKind,
&acc->getBlockInfo.firstRow,
&acc->getBlockInfo.rowCnt,
&acc->getBlockInfo.isLarge,
&acc->getBlockInfo.isCompressed);
if (!readOK)
return -1;
if (Debug_appendonly_print_datumstream)
elog(LOG,
"datumstream_read_block filePathName %s ndatum %u datump %p "
"firstRow " INT64_FORMAT " rowCnt %u contentLen %u ",
acc->ao_read.bufferedRead.filePathName,
acc->blockRead.logical_row_count,
acc->blockRead.datump,
acc->getBlockInfo.firstRow,
acc->getBlockInfo.rowCnt,
acc->getBlockInfo.contentLen);
/*
* Pre-4.0 blocks do not store firstRowNum, so the returned value
* for firstRow is -1. In this case, acc->blockFirstRowNum keeps
* its last value, i.e. is incremented by the blockRowCount of the
* previous block.
*/
if (acc->getBlockInfo.firstRow >= 0)
{
acc->blockFirstRowNum = acc->getBlockInfo.firstRow;
}
acc->blockFileOffset = acc->ao_read.current.headerOffsetInFile;
acc->blockRowCount = acc->getBlockInfo.rowCnt;
if (Debug_appendonly_print_scan)
elog(LOG,
"Datum stream read get block typeInfo for table '%s' "
"(contentLen %d, execBlockKind = %d, firstRowNum " INT64_FORMAT ", "
"rowCount %d, isLargeContent %s, isCompressed %s, "
"blockFirstRowNum " INT64_FORMAT ", blockFileOffset " INT64_FORMAT ", blockRowCount %d)",
AppendOnlyStorageRead_RelationName(&acc->ao_read),
acc->getBlockInfo.contentLen,
acc->getBlockInfo.execBlockKind,
acc->getBlockInfo.firstRow,
acc->getBlockInfo.rowCnt,
(acc->getBlockInfo.isLarge ? "true" : "false"),
(acc->getBlockInfo.isCompressed ? "true" : "false"),
acc->blockFirstRowNum,
acc->blockFileOffset,
acc->blockRowCount);
datumstreamread_block_content(acc);
if (blockDirectory)
{
AppendOnlyBlockDirectory_InsertEntry(blockDirectory,
colGroupNo,
acc->blockFirstRowNum,
acc->blockFileOffset,
acc->blockRowCount,
false);
}
return 0;
}
void
datumstreamread_rewind_block(DatumStreamRead * datumStream)
{
DatumStreamBlockRead_Reset(&datumStream->blockRead);
datumstreamread_block_get_ready(datumStream);
}
/*
* Find the specified row in the current block.
*
* Note that the values for rowNumInBlock starts with 0.
*/
void
datumstreamread_find(DatumStreamRead * datumStream,
int32 rowNumInBlock)
{
/*
* if reading a tuple that is prior to the tuple that the datum stream
* is pointing to now, reset the datum stream pointers.
*/
if (rowNumInBlock < DatumStreamBlockRead_Nth(&datumStream->blockRead))
datumstreamread_rewind_block(datumStream);
Assert(rowNumInBlock >= DatumStreamBlockRead_Nth(&datumStream->blockRead) ||
DatumStreamBlockRead_Nth(&datumStream->blockRead) == -1);
/*
* Find the right row in the block.
*/
while (rowNumInBlock > DatumStreamBlockRead_Nth(&datumStream->blockRead))
{
int status;
status = datumstreamread_advance(datumStream);
if (status == 0)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Unexpected internal error,"
" could not find the right row in block."
" rowCnt is %d"
" largeObjectState is %d"
" rowNumInBlock is %d"
" DatumStreamBlockRead_Nth is %d",
datumStream->blockRead.logical_row_count,
datumStream->largeObjectState,
rowNumInBlock,
DatumStreamBlockRead_Nth(&datumStream->blockRead))));
}
}
Assert(rowNumInBlock == DatumStreamBlockRead_Nth(&datumStream->blockRead));
}
/*
* Find the block that contains the given row.
*/
bool
datumstreamread_find_block(DatumStreamRead * datumStream,
DatumStreamFetchDesc datumStreamFetchDesc,
int64 rowNum)
{
Assert(datumStreamFetchDesc->datumStream == datumStream);
if (datumStreamFetchDesc->scanNextFileOffset >=
datumStreamFetchDesc->scanAfterFileOffset)
return false;
/* No more blocks requested for range */
if (datumStreamFetchDesc->currentSegmentFile.logicalEof ==
datumStreamFetchDesc->scanNextFileOffset)
return false;
/* No more blocks requested for range */
if (datumStreamFetchDesc->currentSegmentFile.logicalEof <
datumStreamFetchDesc->scanNextFileOffset)
return false;
/* UNDONE:Why does our next scan position go beyond logical EOF ? */
AppendOnlyStorageRead_SetTemporaryRange(
&datumStream->ao_read,
datumStreamFetchDesc->scanNextFileOffset,
datumStreamFetchDesc->scanAfterFileOffset);
while (true)
{
if (!datumstreamread_block_info(datumStream))
return false;
/*
* Update the current block typeInfo.
*/
const bool isOldBlockFormat = (datumStream->getBlockInfo.firstRow == -1);
datumStreamFetchDesc->currentBlock.valid = true;
datumStreamFetchDesc->currentBlock.fileOffset =
AppendOnlyStorageRead_CurrentHeaderOffsetInFile(
&datumStream->ao_read);
datumStreamFetchDesc->currentBlock.overallBlockLen =
AppendOnlyStorageRead_OverallBlockLen(&datumStream->ao_read);
datumStreamFetchDesc->currentBlock.firstRowNum =
(!isOldBlockFormat) ? datumStream->getBlockInfo.firstRow : datumStreamFetchDesc->scanNextRowNum;
datumStreamFetchDesc->currentBlock.lastRowNum =
datumStreamFetchDesc->currentBlock.firstRowNum + datumStream->getBlockInfo.rowCnt - 1;
datumStreamFetchDesc->currentBlock.isCompressed =
datumStream->getBlockInfo.isCompressed;
datumStreamFetchDesc->currentBlock.isLargeContent = datumStream->getBlockInfo.isLarge;
datumStreamFetchDesc->currentBlock.gotContents = false;
if (Debug_appendonly_print_datumstream)
elog(LOG,
"datumstream_find_block filePathName %s fileOffset " INT64_FORMAT " firstRowNum " INT64_FORMAT " "
"rowCnt %u lastRowNum " INT64_FORMAT " ",
datumStream->ao_read.bufferedRead.filePathName,
datumStreamFetchDesc->currentBlock.fileOffset,
datumStreamFetchDesc->currentBlock.firstRowNum,
datumStream->getBlockInfo.rowCnt,
datumStreamFetchDesc->currentBlock.lastRowNum);
if (rowNum < datumStreamFetchDesc->currentBlock.firstRowNum)
{
/*
* Since we have read a new block, the temporary
* range for the read needs to be adjusted
* accordingly. Otherwise, the underlying bufferedRead
* may stop reading more data because of the
* previously-set smaller temporary range.
*/
int64 beginFileOffset = datumStreamFetchDesc->currentBlock.fileOffset;
int64 afterFileOffset = datumStreamFetchDesc->currentBlock.fileOffset +
datumStreamFetchDesc->currentBlock.overallBlockLen;
AppendOnlyStorageRead_SetTemporaryRange(
&datumStream->ao_read,
beginFileOffset,
afterFileOffset);
return false;
}
/*
* the fix for MPP-17061 does not need to be applied for pre-4.0 blocks,
* since index could only be created after upgrading to 4.x.
* As a result pre-4.0 blocks has no invisible rows.
*/
if (isOldBlockFormat)
{
int32 rowCnt;
/*
* rowCnt may not be valid for pre-4.0 blocks, we need to
* read the block content to restore the correct value.
*/
datumstreamread_block_content(datumStream);
rowCnt = datumStream->blockRowCount;
datumStreamFetchDesc->currentBlock.lastRowNum =
datumStreamFetchDesc->currentBlock.firstRowNum + rowCnt - 1;
}
if (Debug_appendonly_print_datumstream)
elog(LOG,
"datumstream_find_block filePathName %s fileOffset " INT64_FORMAT " firstRowNum " INT64_FORMAT " "
"rowCnt %u lastRowNum " INT64_FORMAT " ",
datumStream->ao_read.bufferedRead.filePathName,
datumStreamFetchDesc->currentBlock.fileOffset,
datumStreamFetchDesc->currentBlock.firstRowNum,
datumStream->getBlockInfo.rowCnt,
datumStreamFetchDesc->currentBlock.lastRowNum);
if (rowNum <= datumStreamFetchDesc->currentBlock.lastRowNum)
{
/*
* Found the block that contains the row.
* Read the block content if it was not read above.
*/
if (!isOldBlockFormat)
{
if (datumStreamFetchDesc->currentBlock.gotContents)
{
ereport(ERROR,
(errmsg("Unexpected internal error,"
" block content was already read."
" datumstream_find_block filePathName %s"
" fileOffset " INT64_FORMAT
" firstRowNum " INT64_FORMAT
" rowCnt %u lastRowNum " INT64_FORMAT
" ndatum %u contentLen %d gotContents true",
datumStream->ao_read.bufferedRead.filePathName,
datumStreamFetchDesc->currentBlock.fileOffset,
datumStream->getBlockInfo.firstRow,
datumStream->getBlockInfo.rowCnt,
datumStreamFetchDesc->currentBlock.lastRowNum,
datumStream->blockRead.logical_row_count,
datumStream->getBlockInfo.contentLen
)));
}
if (Debug_appendonly_print_datumstream)
elog(LOG,
"datumstream_find_block filePathName %s fileOffset " INT64_FORMAT " firstRowNum " INT64_FORMAT " "
"rowCnt %u lastRowNum " INT64_FORMAT " "
"ndatum %u contentLen %d ",
datumStream->ao_read.bufferedRead.filePathName,
datumStreamFetchDesc->currentBlock.fileOffset,
datumStream->getBlockInfo.firstRow,
datumStream->getBlockInfo.rowCnt,
datumStreamFetchDesc->currentBlock.lastRowNum,
datumStream->blockRead.logical_row_count,
datumStream->getBlockInfo.contentLen);
datumstreamread_block_content(datumStream);
datumStreamFetchDesc->currentBlock.gotContents = true;
}
break;
}
Assert(!datumStreamFetchDesc->currentBlock.gotContents);
/* MPP-17061: reach the end of range covered by block directory entry */
if ((datumStreamFetchDesc->currentBlock.fileOffset +
datumStreamFetchDesc->currentBlock.overallBlockLen) >=
datumStreamFetchDesc->scanAfterFileOffset)
return false;
AppendOnlyStorageRead_SkipCurrentBlock(&datumStream->ao_read);
}
return true;
}
/*
* Ensures that the stream's datum_upgrade_buffer is at least len bytes long.
* Returns a pointer to the (possibly newly allocated) upgrade buffer space. If
* additional space is needed, it will be allocated in the stream's memory
* context.
*/
void *
datumstreamread_get_upgrade_space(DatumStreamRead *ds, size_t len)
{
if (ds->datum_upgrade_buffer_size < len)
{
MemoryContext oldcontext = MemoryContextSwitchTo(ds->memctxt);
/*
* FIXME: looks like at least one realloc() implementation can't handle
* NULL pointers?
*/
if (ds->datum_upgrade_buffer)
ds->datum_upgrade_buffer = repalloc(ds->datum_upgrade_buffer, len);
else
ds->datum_upgrade_buffer = palloc(len);
ds->datum_upgrade_buffer_size = len;
MemoryContextSwitchTo(oldcontext);
}
return ds->datum_upgrade_buffer;
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦