greenplumn pg_compression 源码

  • 2022-08-18
  • 浏览 (286)

greenplumn pg_compression 代码

文件路径:/src/backend/catalog/pg_compression.c

/*---------------------------------------------------------------------
 *
 * pg_compression.c
 *	  Interfaces to low level compression functionality.
 *
 * Portions Copyright (c) 2011 EMC Corporation All Rights Reserved
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/catalog/pg_compression.c
 *
 *---------------------------------------------------------------------
 */

#include "postgres.h"

#ifdef HAVE_LIBZ
#include <zlib.h>
#endif

#include "access/genam.h"
#include "access/reloptions.h"
#include "access/table.h"
#include "access/tupdesc.h"
#include "access/tupmacs.h"
#include "catalog/indexing.h"
#include "catalog/pg_appendonly.h"
#include "catalog/pg_attribute_encoding.h"
#include "catalog/pg_compression.h"
#include "catalog/dependency.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbappendonlystoragelayer.h"
#include "fmgr.h"
#include "nodes/makefuncs.h"
#include "parser/parse_type.h"
#include "storage/gp_compress.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/formatting.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
#include "utils/faultinjector.h"


#ifdef HAVE_LIBZ
/* Internal state for zlib */
typedef struct zlib_state
{
	int level;			/* compression level */
	bool compress;		/* compress or decompress? */

	/*
	 * The compression and decompression functions.
	 */
	int (*compress_fn) (Bytef *dest,
						uLongf *destLen,
						const Bytef *source,
						uLong sourceLen,
						int level);

	int (*decompress_fn) (Bytef *dest,
						  uLongf *destLen,
						  const Bytef *source,
						  uLong sourceLen);

} zlib_state;
#endif

static NameData
comptype_to_name(char *comptype)
{
	char		   *dct; /* down cased comptype */
	size_t			len;
	NameData		compname;


	if (strlen(comptype) >= NAMEDATALEN)
		elog(ERROR, "compression name \"%s\" exceeds maximum name length "
			 "of %d bytes", comptype, NAMEDATALEN - 1);

	len = strlen(comptype);
	dct = asc_tolower(comptype, len);
	len = strlen(dct);
	
	memcpy(&(NameStr(compname)), dct, len);
	NameStr(compname)[len] = '\0';
	pfree(dct);

	return compname;
}

/*
 * Find the compression implementation (in pg_compression) for a particular
 * compression type.
 *
 * Comparison is case insensitive.
 *
 * NOTE: This function performs a catalog lookup, which can cause cache
 * invalidation. If 'comptype' points to the relcache, i.e.
 * RelationData->rd_appendonly->compresstype, that reference is no longer
 * valid after the call!
 */
PGFunction *
GetCompressionImplementation(char *comptype)
{
	HeapTuple		tuple;
	NameData		compname;
	PGFunction	   *funcs;
	Form_pg_compression ctup;
	FmgrInfo		finfo;
	Relation	comprel;
	ScanKeyData	scankey;
	SysScanDesc scan;

	/*
	 * Many callers pass RelationData->rd_appendonly->compresstype as
	 * the argument. That can become invalid, if table_open below causes
	 * a relcache invalidation. Call comptype_to_name() on the argument
	 * first, to make a copy of it before we call table_open().
	 *
	 * This is hazardous to the callers, too, if they try to use the
	 * string after the call for something else, but there isn't much
	 * we can do about it here.
	 */
	compname = comptype_to_name(comptype);

	comprel = table_open(CompressionRelationId, AccessShareLock);

	comptype = NULL;	/* table_open might have invalidated this */

	/* SELECT * FROM pg_compression WHERE compname = :1 */
	ScanKeyInit(&scankey,
				Anum_pg_compression_compname,
				BTEqualStrategyNumber, F_NAMEEQ,
				NameGetDatum(&compname));

	scan = systable_beginscan(comprel, CompressionCompnameIndexId, true,
							  NULL, 1, &scankey);
	tuple = systable_getnext(scan);
	if (!HeapTupleIsValid(tuple))
		ereport(ERROR,
				(errcode(ERRCODE_UNDEFINED_OBJECT),
				 errmsg("unknown compress type \"%s\"",
						NameStr(compname))));

	funcs = palloc0(sizeof(PGFunction) * NUM_COMPRESS_FUNCS);

	ctup = (Form_pg_compression)GETSTRUCT(tuple);

	Assert(OidIsValid(ctup->compconstructor));
	fmgr_info(ctup->compconstructor, &finfo);
	funcs[COMPRESSION_CONSTRUCTOR] = finfo.fn_addr;

	Assert(OidIsValid(ctup->compdestructor));
	fmgr_info(ctup->compdestructor, &finfo);
	funcs[COMPRESSION_DESTRUCTOR] = finfo.fn_addr;

	Assert(OidIsValid(ctup->compcompressor));
	fmgr_info(ctup->compcompressor, &finfo);
	funcs[COMPRESSION_COMPRESS] = finfo.fn_addr;

	Assert(OidIsValid(ctup->compdecompressor));
	fmgr_info(ctup->compdecompressor, &finfo);
	funcs[COMPRESSION_DECOMPRESS] = finfo.fn_addr;

	Assert(OidIsValid(ctup->compvalidator));
	fmgr_info(ctup->compvalidator, &finfo);
	funcs[COMPRESSION_VALIDATOR] = finfo.fn_addr;

	systable_endscan(scan);
	table_close(comprel, AccessShareLock);

	return funcs;
}

/* Invokes a compression constructor */
CompressionState *
callCompressionConstructor(PGFunction constructor,
						   TupleDesc tupledesc,
						   StorageAttributes *sa,
						   bool is_compress)
{
	return (CompressionState *) DatumGetPointer(DirectFunctionCall3(constructor,
											 PointerGetDatum(tupledesc),
											 PointerGetDatum(sa),
											 BoolGetDatum(is_compress)));

}

void
callCompressionDestructor(PGFunction destructor, CompressionState *state)
{
	DirectFunctionCall1(destructor, PointerGetDatum(state));
}

/* Actually call a compression (or decompression) function */
void
callCompressionActuator(PGFunction func , const void *src , int32 src_sz,
						char *dst, int32 dst_sz, int32 *dst_used,
						CompressionState *state)
{

  (void)DirectFunctionCall6(func, PointerGetDatum(src), Int32GetDatum(src_sz),
							PointerGetDatum(dst), Int32GetDatum(dst_sz),
							PointerGetDatum(dst_used), PointerGetDatum(state));


}

void
callCompressionValidator(PGFunction func, char *comptype, int32 complevel,
						 int32 blocksize, Oid typid)
{
	StorageAttributes sa;

	sa.comptype = comptype;
	sa.complevel = complevel;
	sa.blocksize = blocksize;
	sa.typid = typid;
	(void)DirectFunctionCall1(func, PointerGetDatum(&sa));
}

#ifdef HAVE_LIBZ
Datum
zlib_constructor(PG_FUNCTION_ARGS)
{

	/* PG_GETARG_POINTER(0) is TupleDesc that is currently unused.
	 * It is passed as NULL */

	StorageAttributes *sa = (StorageAttributes *) PG_GETARG_POINTER(1);
	CompressionState *cs	   = palloc0(sizeof(CompressionState));
	zlib_state	   *state	= palloc0(sizeof(zlib_state));
	bool			  compress = PG_GETARG_BOOL(2);

	cs->opaque = (void *) state;
	cs->desired_sz = NULL;

	Assert(PointerIsValid(sa->comptype));

	if (sa->complevel == 0)
		sa->complevel = 1;

	state->level = sa->complevel;
	state->compress = compress;
	state->compress_fn = compress2;
	state->decompress_fn = uncompress;

	PG_RETURN_POINTER(cs);

}

Datum
zlib_destructor(PG_FUNCTION_ARGS)
{
	CompressionState *cs = (CompressionState *) PG_GETARG_POINTER(0);

	if (cs != NULL && cs->opaque != NULL)
 	{
		pfree(cs->opaque);
	}

	PG_RETURN_VOID();
}

Datum
zlib_compress(PG_FUNCTION_ARGS)
{
	const void	   *src	  = PG_GETARG_POINTER(0);
	int32			 src_sz   = PG_GETARG_INT32(1);
	void			 *dst	  = PG_GETARG_POINTER(2);
	int32			 dst_sz   = PG_GETARG_INT32(3);
	int32			*dst_used = (int32 *) PG_GETARG_POINTER(4);
	CompressionState *cs	   = (CompressionState *) PG_GETARG_POINTER(5);
	zlib_state	   *state	= (zlib_state *) cs->opaque;
	int				last_error;

	unsigned long amount_available_used = dst_sz;

	last_error = state->compress_fn((unsigned char *) dst,
									&amount_available_used, src, src_sz,
									state->level);

	*dst_used = amount_available_used;

	if (last_error != Z_OK)
	{
		switch (last_error)
		{
			case Z_MEM_ERROR:
				elog(ERROR, "out of memory");
				break;

			case Z_BUF_ERROR:
				/*
				 * zlib returns this when it couldn't compress the data
				 * to a size smaller than the input.
				 *
				 * The caller expects to detect this themselves so we set
				 * dst_used accordingly.
				 */
				*dst_used = src_sz;
				break;

			default:
				/* shouldn't get here */
				elog(ERROR, "zlib compression failed with error %d", last_error);
				break;
		}
	}

	PG_RETURN_VOID();
}

Datum
zlib_decompress(PG_FUNCTION_ARGS)
{
	const char	   *src	= PG_GETARG_POINTER(0);
	int32			src_sz = PG_GETARG_INT32(1);
	void		   *dst	= PG_GETARG_POINTER(2);
	int32			dst_sz = PG_GETARG_INT32(3);
	int32		   *dst_used = (int32 *) PG_GETARG_POINTER(4);
	CompressionState *cs = (CompressionState *) PG_GETARG_POINTER(5);
	zlib_state	   *state = (zlib_state *) cs->opaque;
	int				last_error;
	unsigned long amount_available_used = dst_sz;

	Assert(src_sz > 0 && dst_sz > 0);

	last_error = state->decompress_fn(dst, &amount_available_used,
									  (const Bytef *) src, src_sz);

	SIMPLE_FAULT_INJECTOR("zlib_decompress_after_decompress_fn");

	*dst_used = amount_available_used;

	if (last_error != Z_OK)
	{
		switch (last_error)
		{
			case Z_MEM_ERROR:
				elog(ERROR, "out of memory");
				break;

			case Z_BUF_ERROR:

				/*
				 * This would be a bug. We should have given a buffer big
				 * enough in the decompress case.
				 */
				elog(ERROR, "buffer size %d insufficient for compressed data",
					 dst_sz);
				break;

			case Z_DATA_ERROR:
				/*
				 * zlib data structures corrupted.
				 *
				 * Check out the error message: kind of like 'catalog
				 * convergence' for data corruption :-).
				 */
				elog(ERROR, "zlib encountered data in an unexpected format");
				break;

			default:
				/* shouldn't get here */
				elog(ERROR, "zlib decompression failed with error %d", last_error);
				break;
		}
	}

	PG_RETURN_VOID();
}

Datum
zlib_validator(PG_FUNCTION_ARGS)
{
	PG_RETURN_VOID();
}
#else
Datum
zlib_constructor(PG_FUNCTION_ARGS)
{
	elog(ERROR, "libz compression is not supported in this build of Greenplum");
	PG_RETURN_VOID();
}

Datum
zlib_destructor(PG_FUNCTION_ARGS)
{
	elog(ERROR, "libz compression is not supported in this build of Greenplum");
	PG_RETURN_VOID();
}

Datum
zlib_compress(PG_FUNCTION_ARGS)
{
	elog(ERROR, "libz compression is not supported in this build of Greenplum");
	PG_RETURN_VOID();
}

Datum
zlib_decompress(PG_FUNCTION_ARGS)
{
	elog(ERROR, "libz compression is not supported in this build of Greenplum");
	PG_RETURN_VOID();
}

Datum
zlib_validator(PG_FUNCTION_ARGS)
{
	elog(ERROR, "libz compression is not supported in this build of Greenplum");
	PG_RETURN_VOID();
}
#endif

Datum
rle_type_constructor(PG_FUNCTION_ARGS)
{
	elog(ERROR, "rle_type block compression not supported");
	PG_RETURN_VOID();
}

Datum
rle_type_destructor(PG_FUNCTION_ARGS)
{
	elog(ERROR, "rle_type block compression not supported");
	PG_RETURN_VOID();
}

Datum
rle_type_compress(PG_FUNCTION_ARGS)
{
	elog(ERROR, "rle_type block compression not supported");
	PG_RETURN_VOID();
}

Datum
rle_type_decompress(PG_FUNCTION_ARGS)
{
	elog(ERROR, "rle_type block compression not supported");
	PG_RETURN_VOID();
}

Datum
rle_type_validator(PG_FUNCTION_ARGS)
{
	elog(ERROR, "rle_type block compression not supported");
	PG_RETURN_VOID();
}

/* Dummy routines to implement compresstype=none */
Datum
dummy_compression_constructor(PG_FUNCTION_ARGS)
{
	elog(ERROR, "dummy compression called directly");
	PG_RETURN_VOID();
}

Datum
dummy_compression_destructor(PG_FUNCTION_ARGS)
{
	elog(ERROR, "dummy compression called directly");
	PG_RETURN_VOID();
}

Datum
dummy_compression_compress(PG_FUNCTION_ARGS)
{
	elog(ERROR, "dummy compression called directly");
	PG_RETURN_VOID();
}

Datum
dummy_compression_decompress(PG_FUNCTION_ARGS)
{
	elog(ERROR, "dummy compression called directly");
	PG_RETURN_VOID();
}

Datum
dummy_compression_validator(PG_FUNCTION_ARGS)
{
	elog(ERROR, "dummy compression called directly");
	PG_RETURN_VOID();
}

/*
 * Does a compression algorithm exist by the name of `compresstype'?
 */
bool
compresstype_is_valid(char *comptype)
{
	/*
	 * Hard-coding compresstypes is bad, agreed.  But there isn't a
	 * better way in sight at this point.  Lookup into pg_compression
	 * catalog table is not possible because this method is used to
	 * validate value of gp_default_storage_options GUC (among other
	 * things).  If the GUC is set in postgresql.conf, we get here
	 * before IsNormalProcessingMode() is true.
	 *
	 * Whenever the list of supported compresstypes is changed, this
	 * must change!
	 */
	static const char *const valid_comptypes[] = {
#ifdef HAVE_LIBQUICKLZ
			"quicklz",
#endif
#ifdef HAVE_LIBZ
			"zlib",
#endif
#ifdef USE_ZSTD
			"zstd",
#endif
			"rle_type", "none"};

	for (int i = 0; i < ARRAY_SIZE(valid_comptypes); ++i)
	{
		if (pg_strcasecmp(valid_comptypes[i], comptype) == 0)
			return true;
	}

	return false;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn aclchk 源码

greenplumn aoblkdir 源码

greenplumn aocatalog 源码

greenplumn aoseg 源码

greenplumn aovisimap 源码

greenplumn catalog 源码

greenplumn dependency 源码

greenplumn gp_fastsequence 源码

greenplumn gp_partition_template 源码

greenplumn gp_segment_config 源码

0  赞