greenplumn gp_compress 源码

  • 2022-08-18
  • 浏览 (303)

greenplumn gp_compress 代码

文件路径:/src/backend/storage/file/gp_compress.c

/*-------------------------------------------------------------------------
 *
 * gp_compress.c
 *	  Common compression utilities
 *
 * Portions Copyright (c) 2009, Greenplum Inc.
 * Portions Copyright (c) EMC, 2011
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/storage/file/gp_compress.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "catalog/pg_compression.h"
#include "cdb/cdbappendonlystoragelayer.h"
#include "storage/gp_compress.h"
#include "utils/guc.h"
#include "utils/resowner.h"

#ifdef USE_ZSTD
#include <zstd.h>
#endif

/*
 * Using the provided compression function this method will try to compress the data.
 * In case an issue occur during the compression it will abort the execution.
 */
void
gp_trycompress(uint8 *sourceData, int32 sourceLen, uint8 *compressedBuffer, int32 compressedBufferLen,
			   int32 *compressedLen, PGFunction compressor, CompressionState *compressionState
)

{
	Assert(PointerIsValid(compressor));
	
	callCompressionActuator( compressor
						   , (const void *)sourceData
						   , (size_t)sourceLen
						   , (char*)compressedBuffer
						   , compressedBufferLen
						   , compressedLen
						   , compressionState
						   );
}

/*---------------------------------------------------------------------------*/
static void gp_decompress_generic(
	  uint8	 *compressed,
	  int32	 compressedLen,
	  uint8	 *uncompressed,
	  int32	 uncompressedLen,
	  PGFunction	 decompressor,
	  CompressionState *compressionState,
	  int64	 bufferCount
	  )
{
	int32				 resultingUncompressedLen;

	callCompressionActuator(decompressor,
							(const void *)compressed,
							(size_t)compressedLen,
							(char*)uncompressed,
							uncompressedLen,
							&resultingUncompressedLen,
							compressionState);


	if (resultingUncompressedLen != uncompressedLen)
		elog(ERROR,
			 "Uncompress returned length %d which is different than the "
			 "expected length %d (block count " INT64_FORMAT ")",
			 resultingUncompressedLen,
			 uncompressedLen,
			 bufferCount);

}  /* end gp_decompress_generic */

/*---------------------------------------------------------------------------*/
void gp_decompress(
		uint8 *compressed,
		int32 compressedLen,
		uint8 *uncompressed,
		int32 uncompressedLen,
		PGFunction decompressor,
		CompressionState *compressionState,
		int64 bufferCount)
{
	unsigned long uncompressedSize;

	int32 resultingUncompressedLen;

	uncompressedSize = (unsigned long)uncompressedLen;

	gp_decompress_generic( compressed
						, compressedLen
						, uncompressed
						, uncompressedLen
						, decompressor
						, compressionState
						, bufferCount
	);

	resultingUncompressedLen = (int32)uncompressedSize;

	if (resultingUncompressedLen != uncompressedLen)
		elog(ERROR,
			 "Uncompress returned length %d which is different than the "
			 "expected length %d (block count " INT64_FORMAT ")",
			 resultingUncompressedLen,
			 uncompressedLen,
			 bufferCount);
}

/*
 * Support for tracking ZSTD handles with resource owners.
 */
#ifdef USE_ZSTD

static dlist_head open_zstd_handles;
static bool zstd_resowner_callback_registered;

static void zstd_free_callback(ResourceReleasePhase phase,
				   bool isCommit,
				   bool isTopLevel,
				   void *arg);

zstd_context *
zstd_alloc_context(void)
{
	zstd_context *ctx;

	if (!zstd_resowner_callback_registered)
	{
		RegisterResourceReleaseCallback(zstd_free_callback, NULL);
		zstd_resowner_callback_registered = true;
	}

	ctx = MemoryContextAlloc(TopMemoryContext, sizeof(zstd_context));
	ctx->cctx = NULL;
	ctx->dctx = NULL;
	ctx->owner = CurrentResourceOwner;
	dlist_push_head(&open_zstd_handles, &ctx->node);

	return ctx;
}

void
zstd_free_context(zstd_context *context)
{
	if (context->cctx)
		ZSTD_freeCCtx(context->cctx);
	if (context->dctx)
		ZSTD_freeDCtx(context->dctx);

	dlist_delete(&context->node);

	pfree(context);
}

/* Close any open ZSTD handles on abort. */
static void
zstd_free_callback(ResourceReleasePhase phase,
				   bool isCommit,
				   bool isTopLevel,
				   void *arg)
{
	dlist_mutable_iter miter;

	if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
		return;

	dlist_foreach_modify(miter, &open_zstd_handles)
	{
		zstd_context *context = dlist_container(zstd_context, node, miter.cur);

		if (context->owner == CurrentResourceOwner)
		{
			if (isCommit)
				elog(WARNING, "zstd context reference leak: context %p still referenced", context);
			zstd_free_context(context);
		}
	}
}

#endif	/* USE_ZSTD */

相关信息

greenplumn 源码目录

相关文章

greenplumn buffile 源码

greenplumn copydir 源码

greenplumn execute_pipe 源码

greenplumn fd 源码

greenplumn reinit 源码

greenplumn sharedfileset 源码

0  赞