greenplumn gp_hyperloglog 源码

  • 2022-08-18
  • 浏览 (219)

greenplumn gp_hyperloglog 代码

文件路径:/src/backend/utils/hyperloglog/gp_hyperloglog.c

/*
 * Copyright 2012, Tomas Vondra (tv@fuzzy.cz). All rights reserved.
 * Copyright 2015, Conversant, Inc. All rights reserved.
 * Copyright 2018, VMware, Inc. or its affiliates. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without modification, are
 * permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice, this list of
 * conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright notice, this list
 * of conditions and the following disclaimer in the documentation and/or other materials
 * provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY TOMAS VONDRA, CONVERSANT INC, VMWARE, INC. OF ITS AFFILIATES.
 * AND ANY OTHER CONTRIBUTORS (THE "AUTHORS") ''AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * The views and conclusions contained in the software and documentation are those of the
 * Authors and should not be interpreted as representing official policies, either expressed
 * or implied, of the Authors.
 */

/*
 * 07/26/2018
 *
 * We have updated, removed and modified the content of this file.
 *
 * 	1. We have removed some function definitions in as we did not need
 * 	them for implementing incremental analyze.
 * 	2. We modified utility function definitions as most of them are used in the
 * 	code internally and not exposed to the user.Function parameters are
 * 	no longer as the type of `PG_FUNCTION_ARGS` and extracted in the
 * 	function body, but only passed by the caller as it is.
 *	3. We kept the definitions of user facing functions that are necessary for
 * 	full scan incremental analyze.
 * 	4. We abondoned the sparse represenation of the hyperloglog and for
 *  simplicity this version only supports dense represenation.
 */


/* This file contains internal functions and several functions exposed to the
 * outside via gp_hyperloglog.h. The functions are for the manipulation/creation/
 * evaluation of GpHLLCounters that are necessary for implementing incremental
 * analyze in GPDB. This file is modified from its original content and we removed
 * the code that was unnecessary for our purpose.
 */
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <math.h>
#include <string.h>

#include "postgres.h"
#include "fmgr.h"
#include "common/pg_lzcompress.h"
#include "utils/builtins.h"
#include "utils/bytea.h"
#include "utils/lsyscache.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"

#include "utils/hyperloglog/gp_hyperloglog.h"

#define GPHLLDATARAWSIZE(hloglog) (POW2(hloglog->b))

/* ------------- function declarations for local functions --------------- */
static double gp_hll_estimate_dense(GpHLLCounter hloglog);
static double gp_error_estimate(double E,int b);

static GpHLLCounter gp_hll_add_hash_dense(GpHLLCounter hloglog, uint64_t hash);

static GpHLLCounter gp_hll_compress_dense(GpHLLCounter hloglog);
static GpHLLCounter gp_hll_compress_dense_unpacked(GpHLLCounter hloglog);
static GpHLLCounter gp_hll_decompress_unpacked(GpHLLCounter hloglog);
static GpHLLCounter gp_hll_decompress_dense(GpHLLCounter hloglog);
static GpHLLCounter gp_hll_decompress_dense_unpacked(GpHLLCounter hloglog);

/* ---------------------- function definitions --------------------------- */

GpHLLCounter
gp_hll_unpack(GpHLLCounter hloglog){

	char entry;
	int i;
	Size data_rawsize;
	GpHLLCounter htemp;

	if (hloglog->format == UNPACKED || hloglog->format == UNPACKED_UNPACKED)
	{
		return gp_hll_copy(hloglog);
	}

	/* use decompress to handle compressed unpacking */
	if (hloglog->b < 0)
	{
		return gp_hll_decompress_unpacked(hloglog);
	}

	/* set format to unpacked*/
	if (hloglog->format == PACKED_UNPACKED)
	{
		hloglog->format = UNPACKED_UNPACKED;
	}
	else if (hloglog->format == PACKED)
	{
		hloglog->format = UNPACKED;
	}

	/*
	 * Allocate and zero an array large enough to hold all the decompressed
	 * bins
	 */
	data_rawsize = GPHLLDATARAWSIZE(hloglog);
	htemp = palloc(sizeof(GpHLLData) + data_rawsize);
	memcpy(htemp, hloglog, sizeof(GpHLLData));

	for(i=0; i < data_rawsize; i++){
		GP_HLL_DENSE_GET_REGISTER(entry,hloglog->data,i,hloglog->binbits);
		htemp->data[i] = entry;
	}

	hloglog = htemp;

	/* set the varsize to the appropriate length  */
	SET_VARSIZE(hloglog, sizeof(GpHLLData) + data_rawsize);

	return hloglog;
}


GpHLLCounter
gp_hll_decompress_unpacked(GpHLLCounter hloglog)
{
	/* make sure the data is compressed */
	if (hloglog->b > 0) {
		return hloglog;
	}

	hloglog = gp_hll_decompress_dense_unpacked(hloglog);

	return hloglog;
}


/* Decompresses dense counters */
static GpHLLCounter
gp_hll_decompress_dense_unpacked(GpHLLCounter hloglog)
{
	Size data_rawsize;
	GpHLLCounter htemp;

	/* reset b to positive value for calcs and to indicate data is
	* decompressed */
	hloglog->b = -1 * (hloglog->b);
	hloglog->format = UNPACKED;

	/* allocate and zero an array large enough to hold all the decompressed
	* bins */
	data_rawsize = GPHLLDATARAWSIZE(hloglog);
	htemp = palloc(sizeof(GpHLLData) + data_rawsize);
	memset(htemp, 0, sizeof(GpHLLData) + data_rawsize);
	memcpy(htemp, hloglog, sizeof(GpHLLData));

	/* decompress the data */
	pglz_decompress(hloglog->data, VARSIZE_ANY(hloglog) - sizeof(GpHLLData),
					(char *) &htemp->data, data_rawsize, true);

	hloglog = htemp;

	/* set the varsize to the appropriate length  */
	SET_VARSIZE(hloglog, sizeof(GpHLLData) + data_rawsize);

	return hloglog;
}



/* Allocate HLL estimator that can handle the desired cartinality and
 * precision.
 * 
 * parameters:
 *      ndistinct   - cardinality the estimator should handle
 *      error       - requested error rate (0 - 1, where 0 means 'exact')
 * 
 * returns:
 *      instance of HLL estimator (throws ERROR in case of failure)
 */
GpHLLCounter
gp_hll_create(double ndistinct, float error, uint8_t format)
{

    float m;
    size_t length;
    GpHLLCounter p;
    
    /* target error rate needs to be between 0 and 1 */
    if (error <= 0 || error >= 1){
        elog(ERROR, "invalid error rate requested - only values in (0,1) allowed");
    }
    if (MIN_BINBITS >= (uint8_t)ceil(log2(log2(ndistinct))) || MAX_BINBITS <= (uint8_t)ceil(log2(log2(ndistinct)))){
        elog(ERROR,"invalid ndstinct - must be between 257 and 1.1579 * 10^77");
    } 

    /* the counter is allocated as part of this memory block  */
    length = gp_hll_get_size(ndistinct, error);
    p = (GpHLLCounter)palloc0(length);

    /* set the counter struct version */
    p->version = STRUCT_VERSION;

	/* set the format to 0 for bitpacked*/
    p->format = format;

    /* what is the minimum number of bins to achieve the requested error rate?
     *  we'll increase this to the nearest power of two later */
    m = ERROR_CONST / (error * error);

    /* so how many bits do we need to index the bins (round up to nearest
     * power of two) */
    p->b = (uint8_t)ceil(log2(m));

    /* set the number of bits per bin */
    p->binbits = (uint8_t)ceil(log2(log2(ndistinct)));

    p->idx = -1;

    if (p->b < MIN_INDEX_BITS)   /* we want at least 2^4 (=16) bins */
        p->b = MIN_INDEX_BITS;
    else if (p->b > MAX_INDEX_BITS)
        elog(ERROR, "number of index bits exceeds MAX_INDEX_BITS (requested %d)", p->b);

    SET_VARSIZE(p, length);

    return p;

}

/* Performs a simple 'copy' of the counter, i.e. allocates a new counter and
 * copies the state from the supplied one. */
GpHLLCounter
gp_hll_copy(GpHLLCounter counter)
{
    
    size_t length = VARSIZE_ANY(counter);
    GpHLLCounter copy = (GpHLLCounter)palloc(length);
    
    memcpy(copy, counter, length);
    
    return copy;

}

/* Merges the two estimators. Either modifies the first estimator in place
 * (inplace=true), or creates a new copy and returns that (inplace=false).
 * Modification in place is very handy in aggregates, when we really want to
 * modify the aggregate state in place.
 * 
 * Merging is only possible if the counters share the same parameters (number
 * of bins, bin size, ...). If the counters don't match, this throws an ERROR.
 *  */
GpHLLCounter
gp_hll_merge(GpHLLCounter counter1, GpHLLCounter counter2)
{

	int i;
	GpHLLCounter result = counter1;
	int upper_bound = POW2(result->b);

	/* check compatibility first */
	//if (counter1->b != counter2->b && -1*counter1->b != counter2->b)
	//elog(ERROR, "index size of estimators differs (%d != %d)", counter1->b, counter2->b);
	//else if (counter1->binbits != counter2->binbits)
	//elog(ERROR, "bin size of estimators differs (%d != %d)", counter1->binbits, counter2->binbits);


	/* Keep the maximum register value for each bin */
	for (i = 0; i < upper_bound; i += 1){
		result->data[i] = ((counter2->data[i] > result->data[i]) ? counter2->data[i] : result->data[i]);
	}

	return result;
}


/* Computes size of the structure, depending on the requested error rate and
 * ndistinct. */
int 
gp_hll_get_size(double ndistinct, float error)
{

    int b;
    float m;
    
    if (error <= 0 || error >= 1)
        elog(ERROR, "invalid error rate requested");
    
    m = ERROR_CONST / (error * error);
    b = (int)ceil(log2(m));
    
    if (b < MIN_INDEX_BITS)
        b = MIN_INDEX_BITS;
    else if (b > MAX_INDEX_BITS)
        elog(ERROR, "number of index bits exceeds MAX_INDEX_BITS (requested %d)",b);
    
    /* The size is the sum of the struct overhead and the bytes the used to 
     * store the buckets. Which is the product of the number of buckets and
     *  the (bits per bucket)/ 8 where 8 is the amount of bits per byte.
     *  
     *  size_in_bytes = struct_overhead + num_buckets*(bits_per_bucket/8)
     *
     * */  
    return sizeof(GpHLLData) + (int)(ceil(POW2(b)) * ceil(log2(log2(ndistinct))) / 8.0);

}

/* Hyperloglog estimate header function */
double 
gp_hll_estimate(GpHLLCounter hloglog)
{
    double E = 0;
    
	E = gp_hll_estimate_dense(hloglog);

	return E;
}
/*
 * Computes the HLL estimate, as described in the paper.
 * 
 * In short it does these steps:
 * 
 * 1) sums the data in counters (1/2^m[i])
 * 2) computes the raw estimate E
 * 3) corrects the estimate for low values
 * 
 */
static double
gp_hll_estimate_dense(GpHLLCounter hloglog)
{
	double H = 0, E = 0;
	int j, V = 0;
	int m = POW2(hloglog->b);

	/* compute the sum for the harmonic mean */
	if (hloglog->binbits <= MAX_PRECOMPUTED_EXPONENTS_BINWIDTH){
		for (j = 0; j < m; j++){
			H += PE[(int)hloglog->data[j]];
		}
	}
	else {
		for (j = 0; j < m; j++){
			if (0 <= hloglog->data[j] && hloglog->data[j] < NUM_OF_PRECOMPUTED_EXPONENTS){
				H += PE[(int)hloglog->data[j]];
			}
			else {
				H += pow(0.5, hloglog->data[j]);
			}
		}
	}

	/* multiple by constants to turn the mean into an estimate */
	E = alpham[hloglog->b] / H;

	/* correct for hyperloglog's low cardinality bias by either using linear
	*  counting or error estimation */
	if (E <= (5.0 * m)) {

		/* account for hloglog low cardinality bias */
		E = E - gp_error_estimate(E, hloglog->b);

		/* search for empty registers for linear counting */
		for (j = 0; j < m; j++){
			if (hloglog->data[j] == 0){
				V += 1;
			}
		}

		/* Don't use linear counting if there are no empty registers since we
		* don't to divide by 0 */
		if (V != 0){
			H = m * log(m / (float)V);
		}
		else {
			H = E;
		}

		/* if the estimated cardinality is below the threshold for a specific
		* accuracy return the linear counting result otherwise use the error
		* corrected version */
		if (H <= threshold[hloglog->b]) {
			E = H;
		}

	}

	return E;

}


/* Estimates the error from hyperloglog's low cardinality bias and by taking
 * a simple linear regression of the nearest 6 points */
static double 
gp_error_estimate(double E,int b)
{
    double avg=0;
    double beta,alpha;
    double sx,sxx,sxy,sy;
    int i, idx, max=0;

    /* get the number of interpoloation points for that precision */
    if (b > 5 && b <= 18) {
        max = MAX_INTERPOLATION_POINTS;
    } else if (b == 5) {
        max = PRECISION_5_MAX_INTERPOLATION_POINTS;
    } else if (b == 4) {
        max = PRECISION_4_MAX_INTERPOLATION_POINTS;
    } else {
	elog(ERROR,"ERROR: parameter b (%d) is out of range (4-18)",b);
    }

    idx = max;

    /* find the index of the first interpolation point greater than the 
     * uncorrected estimate */
    for (i = 0; i < max; i++){
        if (E < rawEstimateData[b-4][i]){
            idx = i;
            break;
        }
    }

    /* make sure array indexes will be inbounds when getting 6 nearest data
     * points */
    if (idx < 3) {
        idx = 3;
    } else if ( idx > max - 2){
        idx = max - 2;
    }

    /* calculate the alpha and beta needed to interpolate the error correction
     * for E */
    sx = rawEstimateData[b-4][idx+2] + rawEstimateData[b-4][idx+1] + rawEstimateData[b-4][idx] + rawEstimateData[b-4][idx-1] + rawEstimateData[b-4][idx-2] + rawEstimateData[b-4][idx-3];
    sxx = rawEstimateData[b-4][idx+2]*rawEstimateData[b-4][idx+2] + rawEstimateData[b-4][idx+1]*rawEstimateData[b-4][idx+1] + rawEstimateData[b-4][idx]*rawEstimateData[b-4][idx] + rawEstimateData[b-4][idx-1]*rawEstimateData[b-4][idx-1] + rawEstimateData[b-4][idx-2]*rawEstimateData[b-4][idx-2] + rawEstimateData[b-4][idx-3]*rawEstimateData[b-4][idx-3]; 
    sy = biasData[b-4][idx+2] + biasData[b-4][idx+1] + biasData[b-4][idx] + biasData[b-4][idx-1] + biasData[b-4][idx-2] + biasData[b-4][idx-3];
    sxy = rawEstimateData[b-4][idx+2]*biasData[b-4][idx+2] + rawEstimateData[b-4][idx+1]*biasData[b-4][idx+1] + rawEstimateData[b-4][idx]*biasData[b-4][idx] + rawEstimateData[b-4][idx-1]*biasData[b-4][idx-1] + rawEstimateData[b-4][idx-2]*biasData[b-4][idx-2] + rawEstimateData[b-4][idx-3]*biasData[b-4][idx-3];
    beta = (6.0*sxy - sx*sy ) / ( 6.0*sxx - sx*sx );
    alpha = (1.0/6.0)*sy - beta*(1.0/6.0)*sx;

    avg = alpha + E*beta;

    return avg;
}

/* Add element header function */
GpHLLCounter
gp_hll_add_element(GpHLLCounter hloglog, const char * element, int elen)
{

    uint64_t hash;

    /* compute the hash */
    hash = GpMurmurHash64A(element, elen, HASH_SEED);    

    /* add the hash to the estimator */
    hloglog = gp_hll_add_hash_dense(hloglog, hash);

    return hloglog;
}

/* Add the appropriate values to a dense encoded counter for a given hash */
static GpHLLCounter
gp_hll_add_hash_dense(GpHLLCounter hloglog, uint64_t hash)
{
    uint64_t idx;
    uint8_t rho,entry,addn;

    /* get idx (keep only the first 'b' bits) */
    idx  = hash >> (HASH_LENGTH - hloglog->b);

    /* rho needs to be independent from 'idx' */
    rho = __builtin_clzll(hash << hloglog->b) + 1;

    /* We only have (64 - hloglog->b) bits leftover after the index bits
     * however the chance that we need more is 2^-(64 - hloglog->b) which
     * is very small. So we only compute more when needed. To do this we
     * rehash the original hash and take the rho of the new hash and add it
     * to the (64 - hloglog->b) bits. We can repeat this for rho up to 255.
     * We can't go any higher since integer values >255 take more than 1 byte
     * which is currently supported nor really necessary due to 2^(2^8) ~ 
     * 1.16E77 a number so large its not feasible to have that many unique 
     * elements. */
	if (rho == HASH_LENGTH)
	{
		addn = HASH_LENGTH;
		rho = (HASH_LENGTH - hloglog->b);
		while (addn == HASH_LENGTH && rho < POW2(hloglog->binbits))
		{
		    hash = GpMurmurHash64A((const char * )&hash, HASH_LENGTH/8, HASH_SEED);
            /* zero length runs should be 1 so counter gets set */
		    addn = __builtin_clzll(hash) + 1;
		    rho += addn;
		}
    }

    /* keep the highest value */
    GP_HLL_DENSE_GET_REGISTER(entry,hloglog->data,idx,hloglog->binbits);
    if (rho > entry) {
        GP_HLL_DENSE_SET_REGISTER(hloglog->data,idx,rho,hloglog->binbits);
    }
    
    return hloglog;
}

/* Compress header function */
GpHLLCounter
gp_hll_compress(GpHLLCounter hloglog)
{

	/* make sure the data isn't compressed already */
	if (hloglog->b < 0) {
		return hloglog;
	}

	if (hloglog->idx == -1 && hloglog->format == PACKED){
		hloglog = gp_hll_compress_dense(hloglog);
	} else if (hloglog->idx == -1 && hloglog->format == UNPACKED){
		hloglog = gp_hll_compress_dense_unpacked(hloglog);
	} else if (hloglog->format == UNPACKED_UNPACKED){
		hloglog->format = UNPACKED;
	} else if (hloglog->format == PACKED_UNPACKED){
		hloglog = gp_hll_unpack(hloglog);
	}

	return hloglog;
}

/* Compresses dense encoded counters using lz compression */
static GpHLLCounter
gp_hll_compress_dense(GpHLLCounter hloglog)
{
    char *dest;
    char entry,*data;    
    int i, len;
    Size data_rawsize;

    /* make sure the dest struct has enough space for an unsuccessful 
     * compression and a 4 bytes of overflow since lz might not recognize its
     * over until then preventing segfaults */
    data_rawsize = GPHLLDATARAWSIZE(hloglog);
    dest = malloc(data_rawsize + 4);
    if (dest == NULL)
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("out of memory"),
                 errdetail("Failed on request of size %zu.", data_rawsize + 4)));
    memset(dest,0,data_rawsize + 4);

    data = malloc(data_rawsize);
    if (data == NULL){
        free(dest);
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("out of memory"),
                 errdetail("Failed on request of size %zu.", data_rawsize)));
    }
    memset(data, 0, data_rawsize);

    /* put all registers in a normal array  i.e. remove dense packing so
     * lz compression can work optimally */
    for(i=0; i < data_rawsize ; i++){
        GP_HLL_DENSE_GET_REGISTER(entry,hloglog->data,i,hloglog->binbits);
        data[i] = entry;
    }

    /* lz_compress the normalized array and copy that data into hloglog->data
     * if any compression was achieved */
    len = pglz_compress(data,data_rawsize,dest,PGLZ_strategy_always);
    if (len >= (data_rawsize * hloglog->binbits /8) ){
	/* free allocated memory and return unaltered array */
    	if (dest){
            free(dest);
    	}
    	if (data){
            free(data);
    	}
    	return hloglog;
    }

	if (len < 0)
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("LZ compression failed"),
				 errdetail("LZ compression return value: %d", len)));

    memcpy(hloglog->data,dest,len);

    /* resize the counter to only encompass the compressed data and the struct
     *  overhead*/
    SET_VARSIZE(hloglog,sizeof(GpHLLData) + len);

    /* invert the b value so it being < 0 can be used as a compression flag */
    hloglog->b = -1 * (hloglog->b);

    /* free allocated memory */
    if (dest){
        free(dest);
    }
    if (data){
        free(data);
    }

    /* return the compressed counter */
    return hloglog;
}

/* Compresses dense encoded counters using lz compression */
static GpHLLCounter
gp_hll_compress_dense_unpacked(GpHLLCounter hloglog)
{
	char *dest;
	Size data_rawsize;
	int len;

	/* make sure the dest struct has enough space for an unsuccessful
	* compression and a 4 bytes of overflow since lz might not recognize its
	* over until then preventing segfaults */
	data_rawsize = GPHLLDATARAWSIZE(hloglog);
	dest = malloc(data_rawsize + 4);
	if (dest == NULL){
		return 0;
	}
	if (dest == NULL)
		ereport(ERROR,
			(errcode(ERRCODE_OUT_OF_MEMORY),
			 errmsg("out of memory"),
			 errdetail("Failed on request of size %zu.", data_rawsize + 4)));


	memset(dest, 0, data_rawsize + 4);


	/* lz_compress the normalized array and copy that data into hloglog->data
	* if any compression was achieved */
	len  = pglz_compress(hloglog->data, data_rawsize, dest, PGLZ_strategy_always);
	if (len >= (data_rawsize * hloglog->binbits / 8)){
		/* free allocated memory and return unaltered array */
		if (dest){
			free(dest);
		}
		return hloglog;
	}

	if (len < 0)
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("LZ compression failed"),
				 errdetail("LZ compression return value: %d", len)));

	memcpy(hloglog->data, dest, len);

	/* resize the counter to only encompass the compressed data and the struct
	*  overhead*/
	SET_VARSIZE(hloglog, sizeof(GpHLLData) + len);

	/* invert the b value so it being < 0 can be used as a compression flag */
	hloglog->b = -1 * (hloglog->b);
	hloglog->format = PACKED;

	/* free allocated memory */
	if (dest){
		free(dest);
	}

	/* return the compressed counter */
	return hloglog;
}


/* Decompress header function */
GpHLLCounter
gp_hll_decompress(GpHLLCounter hloglog)
{
     /* make sure the data is compressed */
    if (hloglog->b > 0) {
        return hloglog;
    }
    
    hloglog = gp_hll_decompress_dense(hloglog);

    return hloglog;
}

/* Decompresses dense counters */
static GpHLLCounter
gp_hll_decompress_dense(GpHLLCounter hloglog)
{
    char * dest;
    Size data_rawsize;
    int i;
    GpHLLCounter htemp;

    /* reset b to positive value for calcs and to indicate data is
     * decompressed */
    hloglog->b = -1 * (hloglog->b);

    /* allocate and zero an array large enough to hold all the decompressed 
     * bins */
    data_rawsize = GPHLLDATARAWSIZE(hloglog);
    dest = malloc(data_rawsize);
    if (dest == NULL)
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("out of memory"),
                 errdetail("Failed on request of size %zu.", data_rawsize)));

    memset(dest,0,data_rawsize);

    /* decompress the data */
    pglz_decompress(hloglog->data, VARSIZE_ANY(hloglog) - sizeof(GpHLLData),
					dest, data_rawsize, true);

    /* copy the struct internals but not the data into a counter with enough 
     * space for the uncompressed data  */
    htemp = palloc(sizeof(GpHLLData) + (int)ceil((data_rawsize * hloglog->binbits / 8.0)));
    memcpy(htemp,hloglog,sizeof(GpHLLData));
    hloglog = htemp;

    /* set the registers to the appropriate value based on the decompressed
     * data */
    for (i=0; i<data_rawsize; i++){
        GP_HLL_DENSE_SET_REGISTER(hloglog->data,i,dest[i],hloglog->binbits);
    }

    /* set the varsize to the appropriate length  */
    SET_VARSIZE(hloglog,sizeof(GpHLLData) + (int)ceil((data_rawsize * hloglog->binbits / 8.0)) );
    

    /* free allocated memory */
    if (dest){
        free(dest);
    }

    return hloglog;
}

/* ---------------------- function definitions --------------------------- */

GpHLLCounter
gp_hyperloglog_add_item(GpHLLCounter hllcounter, Datum element, int16 typlen, bool typbyval, char typalign)
{
	GpHLLCounter hyperloglog;
	
	/* requires the estimator to be already created */
	if (hllcounter == NULL)
		elog(ERROR, "hyperloglog counter must not be NULL");
	
	/* estimator (we know it's not a NULL value) */
	hyperloglog = (GpHLLCounter) hllcounter;
	
	/* TODO The requests for type info shouldn't be a problem (thanks to
	 * lsyscache), but if it turns out to have a noticeable impact it's
	 * possible to cache that between the calls (in the estimator).
	 *
	 * I have noticed no measurable effect from either option. */
	
	/* decompress if needed */
	if(hyperloglog->b < 0)
	{
		hyperloglog = gp_hll_decompress(hyperloglog);
	}
	
	/* it this a varlena type, passed by reference or by value ? */
	if (typlen == -1)
	{
		/* varlena */
		hyperloglog = gp_hll_add_element(hyperloglog, VARDATA_ANY(element), VARSIZE_ANY_EXHDR(element));
	}
	else if (typbyval)
	{
		/* fixed-length, passed by value */
		hyperloglog = gp_hll_add_element(hyperloglog, (char*)&element, typlen);
	}
	else
	{
		/* fixed-length, passed by reference */
		hyperloglog = gp_hll_add_element(hyperloglog, (char*)element, typlen);
	}
	
	return hyperloglog;
}


double
gp_hyperloglog_estimate(GpHLLCounter hyperloglog)
{
	double estimate;
	
	/* unpack if needed */
	GpHLLCounter hyperloglog_unpacked = gp_hll_unpack(hyperloglog);
	
	estimate = gp_hll_estimate(hyperloglog_unpacked);

	/* free unpacked counter */
	pfree(hyperloglog_unpacked);
	
	/* return the updated bytea */
	return estimate;
}

GpHLLCounter
gp_hyperloglog_merge_counters(GpHLLCounter counter1, GpHLLCounter counter2)
{
	if (counter1 == NULL && counter2 == NULL)
	{
		/* if both counters are null return null */
		return NULL;
	}
	else if (counter1 == NULL)
	{
		/* if first counter is null just copy the second estimator into the
		 * first one */
		return gp_hll_copy(counter2);
	}
	else if (counter2 == NULL) {
		/* if second counter is null just return the first estimator */
		return gp_hll_copy(counter1);
	}
	else
	{
		/* ok, we already have the estimator - merge the second one into it */
		/* unpack if needed */
		GpHLLCounter counter1_new = gp_hll_unpack(counter1);
		GpHLLCounter counter2_new = gp_hll_unpack(counter2);

		/* perform the merge */
		counter1_new = gp_hll_merge(counter1_new, counter2_new);

		/*  counter2_new is not required any more */
		pfree(counter2_new);

		/* return the updated GpHLLCounter */
		return counter1_new;
	}
}


GpHLLCounter
gp_hyperloglog_init_def()
{
	GpHLLCounter hyperloglog;
	
	hyperloglog = gp_hll_create(DEFAULT_NDISTINCT, DEFAULT_ERROR, PACKED);
	
	return hyperloglog;
}

int
gp_hyperloglog_len(GpHLLCounter hyperloglog)
{
	return VARSIZE_ANY(hyperloglog);
}

/* GpMurmurHash64A produces the fastest 64 bit hash of the MurmurHash 
 * implementations and is ~ 20x faster than md5. This version produces the
 * same hash for the same key and seed in both big and little endian systems
 * */
uint64_t 
GpMurmurHash64A (const void * key, int len, unsigned int seed) 
{
    const uint64_t m = 0xc6a4a7935bd1e995;
    const int r = 47;
    uint64_t h = seed ^ (len * m);
    const uint8_t *data = (const uint8_t *)key;
    const uint8_t *end = data + (len-(len&7));

    while(data != end) {
        uint64_t k;

#if (BYTE_ORDER == LITTLE_ENDIAN)
        k = *((uint64_t*)data);
#else
        k = (uint64_t) data[0];
        k |= (uint64_t) data[1] << 8;
        k |= (uint64_t) data[2] << 16;
        k |= (uint64_t) data[3] << 24;
        k |= (uint64_t) data[4] << 32;
        k |= (uint64_t) data[5] << 40;
        k |= (uint64_t) data[6] << 48;
        k |= (uint64_t) data[7] << 56;
#endif

        k *= m;
        k ^= k >> r;
        k *= m;
        h ^= k;
        h *= m;
        data += 8;
    }

    switch(len & 7) {
        case 7: h ^= (uint64_t)data[6] << 48;
		/* fallthrough */
        case 6: h ^= (uint64_t)data[5] << 40;
		/* fallthrough */
        case 5: h ^= (uint64_t)data[4] << 32;
		/* fallthrough */
        case 4: h ^= (uint64_t)data[3] << 24;
		/* fallthrough */
        case 3: h ^= (uint64_t)data[2] << 16;
		/* fallthrough */
        case 2: h ^= (uint64_t)data[1] << 8;
		/* fallthrough */
        case 1: h ^= (uint64_t)data[0];
        h *= m;
    };

    h ^= h >> r;
    h *= m;
    h ^= h >> r;

    return h;
}

static const char _base64[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";

static const int8 b64lookup[128] = {
	-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
	-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
	-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 62, -1, -1, -1, 63,
	52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -1, -1, -1, -1, -1, -1,
	-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
	15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, -1, -1, -1, -1, -1,
	-1, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
	41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1, -1, -1,
};


int
gp_hll_b64_encode(const char *src, unsigned len, char *dst)
{
	char	   *p,
		*lend = dst + 76;
	const char *s,
		*end = src + len;
	int			pos = 2;
	uint32		buf = 0;

	s = src;
	p = dst;


	while (s < end)
	{
		buf |= (unsigned char)*s << (pos << 3);
		pos--;
		s++;

		/* write it out */
		if (pos < 0)
		{
			*p++ = _base64[(buf >> 18) & 0x3f];
			*p++ = _base64[(buf >> 12) & 0x3f];
			*p++ = _base64[(buf >> 6) & 0x3f];
			*p++ = _base64[buf & 0x3f];

			pos = 2;
			buf = 0;
		}
		if (p >= lend)
		{
			*p++ = '\n';
			lend = p + 76;
		}
	}
	if (pos != 2)
	{
		*p++ = _base64[(buf >> 18) & 0x3f];
		*p++ = _base64[(buf >> 12) & 0x3f];
		*p++ = (pos == 0) ? _base64[(buf >> 6) & 0x3f] : '=';
		*p++ = '=';
	}

	return p - dst;
}

int
gp_hll_b64_decode(const char *src, unsigned len, char *dst)
{
	const char *srcend = src + len,
		*s = src;
	char	   *p = dst;
	char		c;
	int			b = 0;
	uint32		buf = 0;
	int			pos = 0,
		end = 0;


	while (s < srcend)
	{
		c = *s++;

		if (c == ' ' || c == '\t' || c == '\n' || c == '\r')
			continue;

		if (c == '=')
		{
			/* end sequence */
			if (!end)
			{
				if (pos == 2)
					end = 1;
				else if (pos == 3)
					end = 2;
				else
					ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					errmsg("unexpected \"=\"")));
			}
			b = 0;
		}
		else
		{
			b = -1;
			if (c > 0 && c < 127)
				b = b64lookup[(unsigned char)c];
			if (b < 0)
				ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				errmsg("invalid symbol")));
		}
		/* add it to buffer */
		buf = (buf << 6) + b;
		pos++;
		if (pos == 4)
		{
			*p++ = (buf >> 16) & 255;
			if (end == 0 || end > 1)
				*p++ = (buf >> 8) & 255;
			if (end == 0 || end > 2)
				*p++ = buf & 255;
			buf = 0;
			pos = 0;
		}
	}

	if (pos != 0)
		ereport(ERROR,
		(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
		errmsg("invalid end sequence")));

	return p - dst;
}


int
gp_b64_enc_len(const char *src, unsigned srclen)
{
	/* 3 bytes will be converted to 4, linefeed after 76 chars */
	return (srclen + 2) * 4 / 3 + srclen / (76 * 3 / 4);
}

int
gp_b64_dec_len(const char *src, unsigned srclen)
{
	return (srclen * 3) >> 2;
}

/* PG_GETARG macros for GpHLLCounter's that does version checking */
#define PG_GETARG_HLL_P(n) pg_check_hll_version((GpHLLCounter) PG_GETARG_BYTEA_P(n))
#define PG_GETARG_HLL_P_COPY(n) pg_check_hll_version((GpHLLCounter) PG_GETARG_BYTEA_P_COPY(n))

/* shoot for 2^64 distinct items and 0.8125% error rate by default */
#define DEFAULT_NDISTINCT   1ULL << 63
#define DEFAULT_ERROR       0.008125

/* Use the PG_FUNCTION_INFO_V! macro to pass functions to postgres */
PG_FUNCTION_INFO_V1(gp_hyperloglog_add_item_agg_default);

PG_FUNCTION_INFO_V1(gp_hyperloglog_merge);
PG_FUNCTION_INFO_V1(gp_hyperloglog_get_estimate);

PG_FUNCTION_INFO_V1(gp_hyperloglog_in);
PG_FUNCTION_INFO_V1(gp_hyperloglog_out);

PG_FUNCTION_INFO_V1(gp_hyperloglog_comp);

/* ------------- function declarations for local functions --------------- */
extern Datum gp_hyperloglog_add_item_agg_default(PG_FUNCTION_ARGS);

extern Datum gp_hyperloglog_get_estimate(PG_FUNCTION_ARGS);
extern Datum gp_hyperloglog_merge(PG_FUNCTION_ARGS);

extern Datum gp_hyperloglog_in(PG_FUNCTION_ARGS);
extern Datum gp_hyperloglog_out(PG_FUNCTION_ARGS);

extern Datum gp_hyperloglog_comp(PG_FUNCTION_ARGS);

static GpHLLCounter pg_check_hll_version(GpHLLCounter hloglog);

/* ---------------------- function definitions --------------------------- */
static GpHLLCounter
pg_check_hll_version(GpHLLCounter hloglog)
{
	if (hloglog->version != STRUCT_VERSION){
		elog(ERROR,"ERROR: The stored counter is version %u while the library is version %u. Please change library version or use upgrade function to upgrade the counter",hloglog->version,STRUCT_VERSION);
	}
	return hloglog;
}

Datum
gp_hyperloglog_add_item_agg_default(PG_FUNCTION_ARGS)
{

	GpHLLCounter hyperloglog;

	/* info for anyelement */
	Oid         element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
	Datum       element = PG_GETARG_DATUM(1);
	int16       typlen;
	bool        typbyval;
	char        typalign;

	/* Create a new estimator (with default error rate and ndistinct) or reuse
	 * the existing one. Return null if both counter and element args are null.
	 * This prevents excess empty counter creation */
	if (PG_ARGISNULL(0) && PG_ARGISNULL(1)){
		PG_RETURN_NULL();
	} else if (PG_ARGISNULL(0)) {
		hyperloglog = gp_hll_create(DEFAULT_NDISTINCT, DEFAULT_ERROR, PACKED);
	} else {
		hyperloglog = PG_GETARG_HLL_P(0);
	}

	/* add the item to the estimator (skip NULLs) */
	if (! PG_ARGISNULL(1)) {

		/* TODO The requests for type info shouldn't be a problem (thanks to
		 * lsyscache), but if it turns out to have a noticeable impact it's
		 * possible to cache that between the calls (in the estimator).
		 *
		 * I have noticed no measurable effect from either option. */

		/* get type information for the second parameter (anyelement item) */
		get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);

		hyperloglog = gp_hyperloglog_add_item(hyperloglog, element, typlen, typbyval, typalign);
	}

	/* return the updated bytea */
	PG_RETURN_BYTEA_P(hyperloglog);

}

Datum
gp_hyperloglog_merge(PG_FUNCTION_ARGS)
{
	GpHLLCounter counter1 = NULL;
	GpHLLCounter counter2 = NULL;
	GpHLLCounter counter1_merged = NULL;

	if (PG_ARGISNULL(0) && PG_ARGISNULL(1)){
		/* if both counters are null return null */
		PG_RETURN_NULL();

	} else if (PG_ARGISNULL(0)) {
		/* if first counter is null just copy the second estimator into the
		 * first one */
		counter1_merged = PG_GETARG_HLL_P_COPY(1);

	} else if (PG_ARGISNULL(1)) {
		/* if second counter is null just return the first estimator */
		counter1_merged = PG_GETARG_HLL_P_COPY(0);

	} else {
		/* ok, we already have the estimator - merge the second one into it */
		counter1 = PG_GETARG_HLL_P_COPY(0);
		counter2 = PG_GETARG_HLL_P_COPY(1);

		counter1_merged = gp_hyperloglog_merge_counters(counter1, counter2);
		pfree(counter1);
		pfree(counter2);
	}

	/* return the updated bytea */
	PG_RETURN_BYTEA_P(counter1_merged);

}

Datum
gp_hyperloglog_get_estimate(PG_FUNCTION_ARGS)
{
	double estimate;
	GpHLLCounter hyperloglog = PG_GETARG_HLL_P_COPY(0);

	estimate = gp_hyperloglog_estimate(hyperloglog);

	/* free the hll counter copy */
	pfree(hyperloglog);

	/* return the updated bytea */
	PG_RETURN_FLOAT8(estimate);
}

Datum
gp_hyperloglog_out(PG_FUNCTION_ARGS)
{
	int16   datalen, resultlen, res;
	char     *result;
	bytea    *data = PG_GETARG_BYTEA_P(0);

	datalen = VARSIZE_ANY_EXHDR(data);
	resultlen = gp_b64_enc_len(VARDATA_ANY(data), datalen);
	result = palloc(resultlen + 1);
	res = gp_hll_b64_encode(VARDATA_ANY(data),datalen, result);

	/* Make this FATAL 'cause we've trodden on memory ... */
	if (res > resultlen)
		elog(FATAL, "overflow - encode estimate too small");

	result[res] = '\0';

	PG_RETURN_CSTRING(result);
}

Datum
gp_hyperloglog_in(PG_FUNCTION_ARGS)
{
	bytea      *result;
	char       *data = PG_GETARG_CSTRING(0);
	int16      datalen, resultlen, res;

	datalen = strlen(data);
	resultlen = gp_b64_dec_len(data,datalen);
	result = palloc(VARHDRSZ + resultlen);
	res = gp_hll_b64_decode(data, datalen, VARDATA(result));

	/* Make this FATAL 'cause we've trodden on memory ... */
	if (res > resultlen)
		elog(FATAL, "overflow - decode estimate too small");

	SET_VARSIZE(result, VARHDRSZ + res);

	PG_RETURN_BYTEA_P(result);
}

Datum
gp_hyperloglog_comp(PG_FUNCTION_ARGS)
{
	GpHLLCounter hyperloglog;

	if (PG_ARGISNULL(0) ){
		PG_RETURN_NULL();
	}

	hyperloglog =  PG_GETARG_HLL_P_COPY(0);

	hyperloglog = gp_hll_compress(hyperloglog);

	PG_RETURN_BYTEA_P(hyperloglog);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn adminpack 源码

greenplumn verify_nbtree 源码

greenplumn auth_delay 源码

greenplumn auto_explain 源码

greenplumn blcost 源码

greenplumn blinsert 源码

greenplumn bloom 源码

greenplumn blscan 源码

greenplumn blutils 源码

greenplumn blvacuum 源码

0  赞