greenplumn sinval 源码
greenplumn sinval 代码
文件路径:/src/backend/storage/ipc/sinval.c
/*-------------------------------------------------------------------------
*
* sinval.c
* POSTGRES shared cache invalidation communication code.
*
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/storage/ipc/sinval.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xact.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/sinvaladt.h"
#include "utils/inval.h"
#include "cdb/cdbtm.h" /* DtxContext */
uint64 SharedInvalidMessageCounter;
uint64 SharedInvalidMessageCounter;
/*
* Because backends sitting idle will not be reading sinval events, we
* need a way to give an idle backend a swift kick in the rear and make
* it catch up before the sinval queue overflows and forces it to go
* through a cache reset exercise. This is done by sending
* PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind.
*
* The signal handler will set an interrupt pending flag and will set the
* processes latch. Whenever starting to read from the client, or when
* interrupted while doing so, ProcessClientReadInterrupt() will call
* ProcessCatchupEvent().
*/
volatile sig_atomic_t catchupInterruptPending = false;
/* Are we currently processing a catchup event? */
volatile int in_process_catchup_event = 0;
/*
* SendSharedInvalidMessages
* Add shared-cache-invalidation message(s) to the global SI message queue.
*/
void
SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
{
SIInsertDataEntries(msgs, n);
}
/*
* ReceiveSharedInvalidMessages
* Process shared-cache-invalidation messages waiting for this backend
*
* We guarantee to process all messages that had been queued before the
* routine was entered. It is of course possible for more messages to get
* queued right after our last SIGetDataEntries call.
*
* NOTE: it is entirely possible for this routine to be invoked recursively
* as a consequence of processing inside the invalFunction or resetFunction.
* Furthermore, such a recursive call must guarantee that all outstanding
* inval messages have been processed before it exits. This is the reason
* for the strange-looking choice to use a statically allocated buffer array
* and counters; it's so that a recursive call can process messages already
* sucked out of sinvaladt.c.
*/
void
ReceiveSharedInvalidMessages(
void (*invalFunction) (SharedInvalidationMessage *msg),
void (*resetFunction) (void))
{
#define MAXINVALMSGS 32
static SharedInvalidationMessage messages[MAXINVALMSGS];
/*
* We use volatile here to prevent bugs if a compiler doesn't realize that
* recursion is a possibility ...
*/
static volatile int nextmsg = 0;
static volatile int nummsgs = 0;
/* Deal with any messages still pending from an outer recursion */
while (nextmsg < nummsgs)
{
SharedInvalidationMessage msg = messages[nextmsg++];
SharedInvalidMessageCounter++;
invalFunction(&msg);
}
do
{
int getResult;
nextmsg = nummsgs = 0;
/* Try to get some more messages */
getResult = SIGetDataEntries(messages, MAXINVALMSGS);
if (getResult < 0)
{
/* got a reset message */
elog(DEBUG4, "cache state reset");
SharedInvalidMessageCounter++;
resetFunction();
break; /* nothing more to do */
}
/* Process them, being wary that a recursive call might eat some */
nextmsg = 0;
nummsgs = getResult;
while (nextmsg < nummsgs)
{
SharedInvalidationMessage msg = messages[nextmsg++];
SharedInvalidMessageCounter++;
invalFunction(&msg);
}
/*
* We only need to loop if the last SIGetDataEntries call (which might
* have been within a recursive call) returned a full buffer.
*/
} while (nummsgs == MAXINVALMSGS);
/*
* We are now caught up. If we received a catchup signal, reset that
* flag, and call SICleanupQueue(). This is not so much because we need
* to flush dead messages right now, as that we want to pass on the
* catchup signal to the next slowest backend. "Daisy chaining" the
* catchup signal this way avoids creating spikes in system load for what
* should be just a background maintenance activity.
*/
if (catchupInterruptPending)
{
catchupInterruptPending = false;
elog(DEBUG4, "sinval catchup complete, cleaning queue");
SICleanupQueue(false, 0);
}
}
/*
* HandleCatchupInterrupt
*
* This is called when PROCSIG_CATCHUP_INTERRUPT is received.
*
* We used to directly call ProcessCatchupEvent directly when idle. These days
* we just set a flag to do it later and notify the process of that fact by
* setting the process's latch.
*/
void
HandleCatchupInterrupt(void)
{
/*
* Note: this is called by a SIGNAL HANDLER. You must be very wary what
* you do here.
*/
catchupInterruptPending = true;
/* make sure the event is processed in due course */
SetLatch(MyLatch);
}
/*
* ProcessCatchupInterrupt
*
* The portion of catchup interrupt handling that runs outside of the signal
* handler, which allows it to actually process pending invalidations.
*/
void
ProcessCatchupInterrupt(void)
{
while (catchupInterruptPending)
{
/*
* Funny indentation to keep the code inside identical to upstream
* while at the same time supporting CMockery which has problems with
* multiple bracing on column 1.
*/
PG_TRY();
{
in_process_catchup_event = 1;
/*
* What we need to do here is cause ReceiveSharedInvalidMessages() to
* run, which will do the necessary work and also reset the
* catchupInterruptPending flag. If we are inside a transaction we
* can just call AcceptInvalidationMessages() to do this. If we
* aren't, we start and immediately end a transaction; the call to
* AcceptInvalidationMessages() happens down inside transaction start.
*
* It is awfully tempting to just call AcceptInvalidationMessages()
* without the rest of the xact start/stop overhead, and I think that
* would actually work in the normal case; but I am not sure that
* things would clean up nicely if we got an error partway through.
*/
if (IsTransactionOrTransactionBlock())
{
elog(DEBUG4, "ProcessCatchupEvent inside transaction");
AcceptInvalidationMessages();
}
else
{
elog(DEBUG4, "ProcessCatchupEvent outside transaction");
/*
* GPDB disallow a new transaction if the distributed transaction
* is undering certain states like DTX_CONTEXT_QE_PREPARED, here
* temporarily set context to DTX_CONTEXT_LOCAL_ONLY to workaround
* the restriction.
*/
DtxContext saveDistributedTransactionContext;
saveDistributedTransactionContext = DistributedTransactionContext;
setDistributedTransactionContext(DTX_CONTEXT_LOCAL_ONLY);
StartTransactionCommand();
CommitTransactionCommand();
setDistributedTransactionContext(saveDistributedTransactionContext);
}
in_process_catchup_event = 0;
}
PG_CATCH();
{
in_process_catchup_event = 0;
PG_RE_THROW();
}
PG_END_TRY();
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦