greenplumn workfile_mgr 源码
greenplumn workfile_mgr 代码
文件路径:/src/backend/utils/workfile_manager/workfile_mgr.c
/*-------------------------------------------------------------------------
*
* workfile_mgr.c
* Exposes information about BufFiles in shared memory
*
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/utils/workfile_manager/workfile_mgr.c
*
* NOTES:
*
* If a BufFile is created directly with BufFile* functions, it forms its
* own workfile set.
*
*
* There are three ways to use this:
*
* 1. Use the normal upstream BufFile functions, ignoring work files
* altogether. A workfile set will be created for each BufFile automatically.
* This ensures that files created by unmodified upstream code is tracked.
*
* 2. Use BufFile functions, and call XXX to add extra information to
* the workfile set.
*
* 3. Use workfile_mgr_create_set() to create an explicit working set.
*
*
* Workfile set is removed automatically when the last file associated with
* it is closed excepted the pinned workfile_set, which should be removed by
* workfile_mgr_close_set().
*
* Distinguish by the workfile_set life cycle, the workfile_set could be class
* into several classes.
* 1. workfile_set for a temporary workfile with interXact false(see buffile.c).
* Once the file closed, the workfile_set will also get freed.
* And if current transaction aborted, the workfile's resource owner is
* responsible for closing it if the workfile is leaked, which also removes
* the workfile_set.
*
* 2. workfile_set for a temporary workfile with interXact true(see buffile.c).
* Once the file closed, the workfile_set will also get removed.
* Since the workfile doesn't register to a resource owner, WorkFileLocalCtl
* is responsible for removing its corresponding workfile_set in
* AtProcExit_WorkFile(which gets called when process exit).
*
* 3. workfile_set is pinned and saved to prevent unexpected free operation.
* When the workfile_set gets pinned, after all the file in the workfile_set
* gets closed, we don't free the workfile_set, cause we may register new workfile
* later. Call workfile_mgr_close_set to explicitly remove the workfile_set.
* To deal with aborting, since the resource owner of the workfiles in the
* workfile_set will close the workfiles, and this operation will not
* remove workfile_set when no file exists in the workfile_set. We have to free
* these kinds of workfile_sets when transaction ends in AtEOXact_WorkFile.
* If the workfile_set contains interXact workfiles, we still rely on
* WorkFileLocalCtl to remove workfile_sets in AtProcExit_WorkFile. But currently,
* we don't have this kind of usage.
*
* NOTE: if a workfile_set gets created but no workfile register into it(in case of
* error, RegisterFileWithSet not called), it will never have a chance to be removed
* automatically through file close. Then we have to check and remove these
* workfile_sets when the transaction end to prevent overflow in AtEOXact_WorkFile.
*
*
* The purpose of this tracking is twofold:
* 1. Provide the gp_toolkit views, so that you can view temporary file
* usage from a different psql session.
*
* 2. Enforce gp_* limits.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "cdb/cdbvars.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "lib/ilist.h"
#include "storage/buffile.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "utils/builtins.h"
#include "utils/faultinjector.h"
#include "utils/workfile_mgr.h"
/*
* GUC: Number of unique workfile sets that can be open at any time. Each
* workfile set can contain multiple files, however. A hashjoin for example
* will create one file per batch, but they all belong to the same workfile
* set.
*/
int gp_workfile_max_entries = 8192;
#define SizeOfWorkFileUsagePerQueryKey (2 * sizeof(int32));
typedef struct workfile_set WorkFileSetSharedEntry;
/*
* Protected by WorkFileManagerLock (except for sizes, which use atomics)
*/
typedef struct
{
dlist_head freeList;
dlist_head activeList;
int num_active;
uint64 total_bytes;
HTAB *per_query_hash;
WorkFileSetSharedEntry entries[];
} WorkFileShared;
static WorkFileShared *workfile_shared;
/*
* WorkFileLocalEntry - track workfile_set for each workfile.
*
* work_set - the workfile_set in shared memory for a workfile.
* size - the size of the workfile.
*/
typedef struct
{
WorkFileSetSharedEntry *work_set;
int64 size;
} WorkFileLocalEntry;
/*
* WorkFileLocalCtl, local control struct for workfiles
*
* entries - The workfile created in current process will be stored in the the entries
* indexed by virtual File descriptor.
* sizeLocalEntries - The total number of workfiles created.
* localList - track the workfile_sets created in current process.
*/
typedef struct
{
WorkFileLocalEntry *entries;
int sizeLocalEntries;
bool initialized;
dlist_head localList;
} WorkFileLocalCtl;
static WorkFileLocalCtl localCtl = {NULL, 0, false};
static void remove_workfile_set_if_possible(workfile_set* work_set);
static workfile_set *workfile_mgr_create_set_internal(const char *operator_name, const char *prefix);
static void pin_workset(workfile_set *work_set);
static void unpin_workset(workfile_set *work_set);
static bool proc_exit_hook_registered = false;
Datum gp_workfile_mgr_cache_entries(PG_FUNCTION_ARGS);
Datum gp_workfile_mgr_used_diskspace(PG_FUNCTION_ARGS);
/*
* Shared memory initialization
*/
Size
WorkFileShmemSize(void)
{
Size size;
size = offsetof(WorkFileShared, entries);
size = add_size(size, mul_size(gp_workfile_max_entries,
sizeof(WorkFileSetSharedEntry)));
size = add_size(size, hash_estimate_size(gp_workfile_max_entries,
sizeof(WorkFileUsagePerQuery)));
return size;
}
void
WorkFileShmemInit(void)
{
Size size;
bool found;
size = offsetof(WorkFileShared, entries);
size = add_size(size, mul_size(gp_workfile_max_entries,
sizeof(WorkFileSetSharedEntry)));
workfile_shared = (WorkFileShared *)
ShmemInitStruct("WorkFile Data", size, &found);
if (!found)
{
HASHCTL hctl;
memset(workfile_shared, 0, size);
dlist_init(&workfile_shared->freeList);
dlist_init(&workfile_shared->activeList);
workfile_shared->num_active = 0;
for (int i = 0; i < gp_workfile_max_entries; i++)
{
WorkFileSetSharedEntry *entry = &workfile_shared->entries[i];
dlist_push_tail(&workfile_shared->freeList, &entry->node);
entry->active = false;
}
/* Initialize per-query hashtable */
memset(&hctl, 0, sizeof(hctl));
hctl.keysize = SizeOfWorkFileUsagePerQueryKey;
hctl.entrysize = sizeof(WorkFileUsagePerQuery);
hctl.hash = tag_hash;
workfile_shared->per_query_hash =
ShmemInitHash("per-query workfile usage hash",
gp_workfile_max_entries,
gp_workfile_max_entries,
&hctl,
HASH_ELEM | HASH_FUNCTION);
}
}
/*
* Callback function, to delist all our temporary files from shared memory.
*
* We are about to exit, and the temporary files are about to deleted. We
* have to delist them from shared memory before that, while we still
* have access to shared memory. fd.c's on_proc_exit callback runs after
* detaching from shared memory, which is too late.
*/
static void
AtProcExit_WorkFile(int code, Datum arg)
{
dlist_mutable_iter iter;
if (!localCtl.initialized)
return;
dlist_foreach_modify(iter, &localCtl.localList)
{
workfile_set *work_set = dlist_container(workfile_set, local_node, iter.cur);
workfile_mgr_close_set(work_set);
}
}
/*
* Cleanup the workfile_sets at the end of transaction.
* When transaction aborts, we should clean workfile_sets which
* don't contain any workfiles.
* Here we don't care about whether resource owner close workfiles first,
* or we unpin the workset first. Under either order, the expected workfile_set
* should be freed. Currently we put it after resource owner release logic.
*/
void
AtEOXact_WorkFile(void)
{
dlist_mutable_iter iter;
if (!localCtl.initialized)
return;
LWLockAcquire(WorkFileManagerLock, LW_EXCLUSIVE);
dlist_foreach_modify(iter, &localCtl.localList)
{
workfile_set *work_set = dlist_container(workfile_set, local_node, iter.cur);
Assert(work_set->active && work_set->perquery->active);
/*
* Currently, all pinned workfile_sets are calling workfile_mgr_close_set
* in transaction. So we should unpin it for aborting.
*/
unpin_workset(work_set);
/*Remove the workfile_sets that don't contain any workfiles */
remove_workfile_set_if_possible(work_set);
}
LWLockRelease(WorkFileManagerLock);
}
/*
* Enlarge the local array if necessary, so that it has space for 'file'.
*/
static void
ensureLocalEntriesSize(File file)
{
int oldsize = localCtl.sizeLocalEntries;
int newsize;
if (file < 0)
elog(ERROR, "invalid virtual file descriptor: %d", file);
if (file < oldsize)
return;
newsize = file * 2 + 5;
if (oldsize == 0)
{
localCtl.entries = (WorkFileLocalEntry *)
MemoryContextAlloc(TopMemoryContext,
newsize * sizeof(WorkFileLocalEntry));
}
else
localCtl.entries = (WorkFileLocalEntry *)
repalloc(localCtl.entries, newsize * sizeof(WorkFileLocalEntry));
localCtl.sizeLocalEntries = newsize;
/* initialize the newly-allocated entries to all-zeros. */
memset(&localCtl.entries[oldsize], 0,
(newsize - oldsize) * sizeof(WorkFileLocalEntry));
}
/*
* fd.c / buffile.c call these
*/
/*
* Buffile.c calls this whenever a new BufFile is created, with a known
* workfile set. Registers the file with the workfile set.
*/
void
RegisterFileWithSet(File file, workfile_set *work_set)
{
ensureLocalEntriesSize(file);
if (localCtl.entries[file].work_set != NULL)
elog(ERROR, "workfile is already registered with another workfile set");
LWLockAcquire(WorkFileManagerLock, LW_EXCLUSIVE);
if (!work_set->active || !work_set->perquery->active)
ereport(PANIC,
(errmsg("Register file to a non-active workfile_set/per-query summary is illegal")));
localCtl.entries[file].work_set = work_set;
work_set->num_files++;
work_set->perquery->num_files++;
/* Enforce the limit on number of files */
if (gp_workfile_limit_files_per_query > 0 &&
work_set->perquery->num_files > gp_workfile_limit_files_per_query)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("number of workfiles per query limit exceeded")));
}
LWLockRelease(WorkFileManagerLock);
}
/*
* Update the totals of the workfile set for given file.
*
* If the given file is not associated with a workfile set yet,
* a new workfile set is created.
*
* fd.c calls this whenever a (potentially) file is enlarged.
*/
void
UpdateWorkFileSize(File file, uint64 newsize)
{
WorkFileLocalEntry *localEntry;
WorkFileSetSharedEntry *work_set;
WorkFileUsagePerQuery *perquery;
int64 diff;
ensureLocalEntriesSize(file);
localEntry = &localCtl.entries[file];
work_set = localEntry->work_set;
/*
* We got here only when the file's fdstate is FD_WORKFILE, and that
* means the file is registered to a work set.
*/
Assert(work_set);
perquery = work_set->perquery;
diff = (int64) newsize - localEntry->size;
if (diff == 0)
return;
LWLockAcquire(WorkFileManagerLock, LW_EXCLUSIVE);
if (!work_set->active)
{
ereport(WARNING,
(errmsg("workfile_set %s for current file is freed which should not happen.", work_set->prefix),
errprintstack(true)));
LWLockRelease(WorkFileManagerLock);
return;
}
Assert(perquery->active);
/*
* If the file is being enlarged, enforce the limits.
*/
if (diff > 0)
{
if (gp_workfile_limit_per_query > 0 &&
(perquery->total_bytes + diff + 1023) / 1024 > gp_workfile_limit_per_query)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("workfile per query size limit exceeded")));
}
if (gp_workfile_limit_per_segment &&
(workfile_shared->total_bytes + diff) / 1024 > gp_workfile_limit_per_segment)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("workfile per segment size limit exceeded")));
}
}
/*
* Update the summaries in shared memory.
*/
work_set->total_bytes += diff;
perquery->total_bytes += diff;
workfile_shared->total_bytes += diff;
/* also update the local entry */
localEntry->size = newsize;
LWLockRelease(WorkFileManagerLock);
}
/*
* WorkFileDeleted - Delete the file from it's workfile_set, update
* the stats for the workfile_set.
*
* If the file is the last one in the workfile_set, and it's workfile_set
* is not pinned, free the workfile set.
*/
void
WorkFileDeleted(File file, bool hold_lock)
{
WorkFileLocalEntry *localEntry;
WorkFileSetSharedEntry *work_set;
WorkFileUsagePerQuery *perquery;
int64 oldsize;
if (file < 0)
elog(ERROR, "invalid virtual file descriptor: %d", file);
if (file >= localCtl.sizeLocalEntries)
return; /* not a tracked work file */
localEntry = &localCtl.entries[file];
if (!localEntry->work_set)
return; /* not a tracked work file */
if (hold_lock)
LWLockAcquire(WorkFileManagerLock, LW_EXCLUSIVE);
work_set = localEntry->work_set;
perquery = work_set->perquery;
if (!work_set->active || !work_set->perquery->active)
ereport(PANIC,
(errmsg("workfile_set/per-query summarry is not active")));
/*
* Update the summaries in shared memory
*/
oldsize = localEntry->size;
Assert(work_set->num_files > 0);
work_set->num_files--;
work_set->total_bytes -= oldsize;
Assert(perquery->num_files > 0);
perquery->num_files--;
perquery->total_bytes -= oldsize;
workfile_shared->total_bytes -= oldsize;
/*
* Update the workfile_set. If this was the last file in this
* set, remove it.
*/
remove_workfile_set_if_possible(work_set);
localEntry->size = 0;
localEntry->work_set = NULL;
if (hold_lock)
LWLockRelease(WorkFileManagerLock);
}
/*
* Remove the workfile_set if possible. The workfile_set must be unpinned
* and no workfile remains.
* WorkFileManagerLock must be held.
*/
static void
remove_workfile_set_if_possible(workfile_set* work_set)
{
WorkFileUsagePerQuery *perquery = work_set->perquery;
/*
* Update the workfile_set. If there was no file exist in this
* set, remove it unless the set is pinned.
*/
if (work_set->num_files == 0 && !work_set->pinned)
{
perquery->refcount--;
Assert(work_set->total_bytes == 0);
/* Unlink from the active list */
dlist_delete(&work_set->node);
workfile_shared->num_active--;
/* and add to the free list */
dlist_push_head(&workfile_shared->freeList, &work_set->node);
work_set->active = false;
dlist_delete(&work_set->local_node);
/*
* Similarly, update / remove the per-query entry.
*/
if (perquery->refcount == 0)
{
bool found;
Assert(perquery->total_bytes == 0);
perquery->active = false;
(void) hash_search(workfile_shared->per_query_hash,
perquery,
HASH_REMOVE, &found);
if (!found)
elog(PANIC, "per-query hash table is corrupt");
}
}
}
/*
* Public API for creating a workfile set
*
* When hold_pin is set to true, the caller should use workfile_mgr_close_set
* to remove the workfile_set.
*/
workfile_set *
workfile_mgr_create_set(const char *operator_name, const char *prefix, bool hold_pin)
{
workfile_set *work_set;
if (!proc_exit_hook_registered)
{
/*
* register proc-exit hook to ensure temp files are dropped at exit.
*
* The hook must run after before_shmem_exit(), because before_shmem_exit may do
* some cleanup in transaction level, but AtProcExit_WorkFile cleans up the session
* level objects which may be referenced by some short-life objects, e.g. executor nodes.
*/
on_shmem_exit(AtProcExit_WorkFile, 0);
proc_exit_hook_registered = true;
}
LWLockAcquire(WorkFileManagerLock, LW_EXCLUSIVE);
work_set = workfile_mgr_create_set_internal(operator_name, prefix);
if (hold_pin)
pin_workset(work_set);
LWLockRelease(WorkFileManagerLock);
return work_set;
}
/*
* lock must be held
*/
static workfile_set *
workfile_mgr_create_set_internal(const char *operator_name, const char *prefix)
{
WorkFileUsagePerQuery *perquery;
bool found;
WorkFileUsagePerQuery key;
workfile_set *work_set;
Assert(proc_exit_hook_registered);
if (dlist_is_empty(&workfile_shared->freeList))
{
/* Could not acquire another entry from the cache - we filled it up */
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not create workfile manager entry: exceeded number of concurrent spilling queries")));
}
/*
* Find our per-query entry (or allocate, on first use)
*/
key.session_id = gp_session_id;
key.command_count = gp_command_count;
perquery = (WorkFileUsagePerQuery *) hash_search(workfile_shared->per_query_hash,
&key,
HASH_ENTER_NULL, &found);
if (!perquery)
{
/*
* The hash table was full. (The hash table is sized so that this
* should never happen.)
*/
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not create workfile manager entry: per-query hash table is full")));
}
if (!found)
{
perquery->num_files = 0;
perquery->total_bytes = 0;
perquery->active = true;
}
perquery->refcount++;
Assert(perquery->active);
/*
* Allocate a workfile_set entry, and initialize it.
*/
work_set = dlist_container(WorkFileSetSharedEntry, node,
dlist_pop_head_node(&workfile_shared->freeList));
Assert(!work_set->active);
dlist_push_head(&workfile_shared->activeList, &work_set->node);
workfile_shared->num_active++;
work_set->session_id = gp_session_id;
work_set->command_count = gp_command_count;
work_set->slice_id = currentSliceId;
work_set->perquery = perquery;
work_set->num_files = 0;
work_set->total_bytes = 0;
work_set->active = true;
work_set->pinned = false;
/* Track all workfile_sets created in current process */
if (!localCtl.initialized)
{
dlist_init(&localCtl.localList);
localCtl.initialized = true;
}
dlist_push_tail(&localCtl.localList, &work_set->local_node);
if (operator_name)
strlcpy(work_set->operator, operator_name, sizeof(work_set->operator));
else
work_set->operator[0] = '\0';
if (prefix)
{
strlcpy(work_set->prefix, prefix, sizeof(work_set->prefix));
}
else if (operator_name)
{
snprintf(work_set->prefix, sizeof(work_set->prefix), "%s_%d",
operator_name,
(int) (work_set - workfile_shared->entries) + 1);
}
else
{
snprintf(work_set->prefix, sizeof(work_set->prefix), "workfile_set_%d",
(int) (work_set - workfile_shared->entries) + 1);
}
return work_set;
}
/*
* Pin the workfile_set to prevent unexpected free operation.
*/
static void
pin_workset(workfile_set *work_set)
{
work_set->pinned = true;
}
/*
* Unping the workfile_set.
*/
static void
unpin_workset(workfile_set *work_set)
{
work_set->pinned = false;
}
/*
* workfile_mgr_close_set - force close and free the workfile_set.
*
* Normally, for unpinned workfile_set, it'll get freed until the last
* file closed in it. workfile_mgr_close_set will force free the workfile_set
* not matter it gets pinned or not.
*/
void
workfile_mgr_close_set(workfile_set *work_set)
{
int i;
LWLockAcquire(WorkFileManagerLock, LW_EXCLUSIVE);
if (!work_set->active)
ereport(PANIC, (errmsg("workfile_set is not active")));
unpin_workset(work_set);
/* Delete files which in current workfile set */
for (i = 0; i < localCtl.sizeLocalEntries; i++)
{
if (localCtl.entries[i].work_set == work_set)
WorkFileDeleted(i, false);
}
/*
* Since the workfile_mgr_close_set could be called at any moment to
* force close and free the workfile_set, so current workfile_set
* could have empty registered file yet.
*/
if (work_set->active)
{
if (work_set->num_files > 0)
{
WorkFileUsagePerQuery *perquery = work_set->perquery;
ereport(WARNING,
(errmsg("workfile_set %s still contains files for unknown reason.", work_set->prefix),
errprintstack(true)));
if (!perquery->active)
ereport(PANIC, (errmsg("per-query summarry is not active")));
Assert(work_set->total_bytes > 0);
Assert(perquery->num_files > 0);
perquery->num_files -= work_set->num_files;
perquery->total_bytes -= work_set->total_bytes;
work_set->num_files = 0;
work_set->total_bytes = 0;
}
Assert(work_set->num_files == 0);
remove_workfile_set_if_possible(work_set);
}
LWLockRelease(WorkFileManagerLock);
}
static void
workfile_report_inconsistency(void)
{
dlist_node *node;
dlist_iter iter;
int num_freeList = 0;
int num_activeList = 0;
dlist_foreach(iter, &workfile_shared->freeList)
{
num_freeList++;
node = iter.cur;
if (node->next == NULL || node->next->prev != node)
ereport(LOG, (errmsg("workfile freeList is corrupted: "
"node = %p, next = %p, next->prev = %p",
node, node->next, node->next ? node->next->prev : NULL)));
}
if (!dlist_is_empty(&workfile_shared->freeList))
{
node = &workfile_shared->freeList.head;
if (node->next == NULL || node->next->prev != node)
ereport(LOG, (errmsg("workfile freeList is corrupted: "
"node = %p, next = %p, next->prev = %p",
node, node->next, node->next ? node->next->prev : NULL)));
}
dlist_foreach(iter, &workfile_shared->activeList)
{
num_activeList++;
node = iter.cur;
if (node->next == NULL || node->next->prev != node)
ereport(LOG, (errmsg("workfile activeList is corrupted: "
"node = %p, next = %p, next->prev = %p",
node, node->next, node->next ? node->next->prev : NULL)));
}
if (!dlist_is_empty(&workfile_shared->activeList))
{
node = &workfile_shared->activeList.head;
if (node->next == NULL || node->next->prev != node)
ereport(LOG, (errmsg("workfile activeList is corrupted: "
"node = %p, next = %p, next->prev = %p",
node, node->next, node->next ? node->next->prev : NULL)));
}
ereport(PANIC,
(errmsg("num_active = %d, the actual size = %d, total size = %d, free size = %d",
workfile_shared->num_active, num_activeList,
gp_workfile_max_entries, num_freeList)));
}
/*
* Function copying all workfile cache entries for one segment
*/
workfile_set *
workfile_mgr_cache_entries_get_copy(int *num_active)
{
int num_entries;
workfile_set *copied_entries;
dlist_iter iter;
int i;
LWLockAcquire(WorkFileManagerLock, LW_SHARED);
num_entries = workfile_shared->num_active;
copied_entries = (WorkFileSetSharedEntry *) palloc0(num_entries * sizeof(WorkFileSetSharedEntry));
i = 0;
dlist_foreach(iter, &workfile_shared->activeList)
{
WorkFileSetSharedEntry *e = dlist_container(WorkFileSetSharedEntry, node, iter.cur);
memcpy(&copied_entries[i], e, sizeof(WorkFileSetSharedEntry));
i++;
}
/* if i != num_entries, the shared memory for workfile is inconsistent. */
if (i != num_entries)
workfile_report_inconsistency();
LWLockRelease(WorkFileManagerLock);
*num_active = num_entries;
return copied_entries;
}
/*
* User-visible function, for the view
*/
typedef struct
{
int num_entries;
int index;
WorkFileSetSharedEntry *copied_entries;
} get_entries_cxt;
/*
* Function returning all workfile cache entries for one segment
*/
Datum
gp_workfile_mgr_cache_entries_internal(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
get_entries_cxt *cxt;
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* Switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/*
* Build a tuple descriptor for our result type
* The number and type of attributes have to match the definition of the
* view gp_workfile_mgr_cache_entries
*/
#define NUM_CACHE_ENTRIES_ELEM 8
TupleDesc tupdesc = CreateTemplateTupleDesc(NUM_CACHE_ENTRIES_ELEM);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "segid", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prefix", TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "operation", TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "slice", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "sessionid", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "commandid", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "numfiles", INT4OID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
/*
* Now copy all the active entries from shared memory.
*/
cxt = (get_entries_cxt *) palloc(sizeof(get_entries_cxt));
cxt->copied_entries = workfile_mgr_cache_entries_get_copy(&cxt->num_entries);
cxt->index = 0;
funcctx->user_fctx = cxt;
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
cxt = (get_entries_cxt *) funcctx->user_fctx;
while (cxt->index < cxt->num_entries)
{
WorkFileSetSharedEntry *work_set = &cxt->copied_entries[cxt->index];
Datum values[NUM_CACHE_ENTRIES_ELEM];
bool nulls[NUM_CACHE_ENTRIES_ELEM];
HeapTuple tuple;
Datum result;
MemSet(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(GpIdentity.segindex);
values[1] = CStringGetTextDatum(work_set->prefix);
values[2] = Int64GetDatum(work_set->total_bytes);
values[3] = CStringGetTextDatum(work_set->operator);
values[4] = UInt32GetDatum(work_set->slice_id);
values[5] = UInt32GetDatum(work_set->session_id);
values[6] = UInt32GetDatum(work_set->command_count);
values[7] = UInt32GetDatum(work_set->num_files);
cxt->index++;
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
}
SRF_RETURN_DONE(funcctx);
}
/*
* Get the total number of bytes used across all workfiles
*/
uint64
WorkfileSegspace_GetSize(void)
{
uint64 result;
LWLockAcquire(WorkFileManagerLock, LW_SHARED);
result = workfile_shared->total_bytes;
LWLockRelease(WorkFileManagerLock);
return result;
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦