Improved locking

This commit is contained in:
Andrew Kane
2024-01-16 13:34:55 -08:00
parent 719b4b7436
commit cad48d9203
4 changed files with 44 additions and 50 deletions

View File

@@ -16,9 +16,35 @@
int hnsw_ef_search;
static relopt_kind hnsw_relopt_kind;
int entryLockTrancheId;
int allocatorLockTrancheId;
int flushLockTrancheId;
int hnsw_lock_tranche_id;
/*
* Assign tranche IDs for our LWLocks. This only needs to be done by one
* backend, the tranche IDs are remembered in shared memory.
*
* This shared memory area is very small, so we just allocate it from the
* "slop" that PostgreSQL reserves for small allocations like this. If
* this grows bigger, we should use a shmem_request_hook and
* RequestAddinShmemSpace() to pre-reserve space for this.
*/
static void
HnswInitLockTranche(void)
{
int *tranche_ids;
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
tranche_ids = ShmemInitStruct("hnsw LWLock ids",
sizeof(int) * 1,
&found);
if (!found)
tranche_ids[0] = LWLockNewTrancheId();
hnsw_lock_tranche_id = tranche_ids[0];
LWLockRelease(AddinShmemInitLock);
/* Per-backend registration of the tranche IDs */
LWLockRegisterTranche(hnsw_lock_tranche_id, "HnswBuild");
}
/*
* Initialize index options and variables
@@ -26,37 +52,7 @@ int flushLockTrancheId;
void
HnswInit(void)
{
int *tranche_ids;
bool found;
/*
* Assign tranche IDs for our LWLocks. This only needs to be done by one
* backend, the tranche IDs are remembered in shared memory.
*
* This shared memory area is very small, so we just allocate it from the
* "slop" that PostgreSQL reserves for small allocations like this. If
* this grows bigger, we should use a shmem_request_hook and
* RequestAddinShmemSpace() to pre-reserve space for this.
*/
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
tranche_ids = ShmemInitStruct("pgvector LWLock ids",
sizeof(int) * 3,
&found);
if (!found)
{
tranche_ids[0] = LWLockNewTrancheId();
tranche_ids[1] = LWLockNewTrancheId();
tranche_ids[2] = LWLockNewTrancheId();
}
entryLockTrancheId = tranche_ids[0];
allocatorLockTrancheId = tranche_ids[1];
flushLockTrancheId = tranche_ids[2];
LWLockRelease(AddinShmemInitLock);
/* Per-backend registration of the tranche IDs */
LWLockRegisterTranche(entryLockTrancheId, "pgvector entryLock");
LWLockRegisterTranche(allocatorLockTrancheId, "pgvector allocatorLock");
LWLockRegisterTranche(flushLockTrancheId, "pgvector flushLock");
HnswInitLockTranche();
hnsw_relopt_kind = add_reloption_kind();
add_int_reloption(hnsw_relopt_kind, "m", "Max number of connections",

View File

@@ -116,10 +116,8 @@
/* Variables */
extern int hnsw_ef_search;
/* These are initialized when the module is loaded */
extern int entryLockTrancheId;
extern int allocatorLockTrancheId;
extern int flushLockTrancheId;
/* This is initialized when the module is loaded */
extern int hnsw_lock_tranche_id;
typedef struct HnswElementData HnswElementData;
typedef struct HnswNeighborArray HnswNeighborArray;
@@ -149,7 +147,7 @@ typedef struct HnswElementData
OffsetNumber neighborOffno;
BlockNumber neighborPage;
DatumPtr value;
slock_t lock;
LWLock lock;
} HnswElementData;
typedef HnswElementData * HnswElement;

View File

@@ -297,17 +297,17 @@ FlushPages(HnswBuildState * buildstate)
static bool
HnswAddDuplicateInMemory(HnswElement element, HnswElement dup)
{
SpinLockAcquire(&dup->lock);
LWLockAcquire(&dup->lock, LW_EXCLUSIVE);
if (dup->heaptidsLength == HNSW_HEAPTIDS)
{
SpinLockRelease(&dup->lock);
LWLockRelease(&dup->lock);
return false;
}
HnswAddHeapTid(dup, &element->heaptids[0]);
SpinLockRelease(&dup->lock);
LWLockRelease(&dup->lock);
return true;
}
@@ -371,9 +371,9 @@ HnswUpdateNeighborPagesInMemory(char *base, FmgrInfo *procinfo, Oid collation, H
Assert(neighborElement);
/* Use element for lock instead of hc since hc can be replaced */
SpinLockAcquire(&neighborElement->lock);
LWLockAcquire(&neighborElement->lock, LW_EXCLUSIVE);
HnswUpdateConnection(base, e, hc, lm, lc, NULL, NULL, procinfo, collation);
SpinLockRelease(&neighborElement->lock);
LWLockRelease(&neighborElement->lock);
}
}
}
@@ -482,7 +482,7 @@ InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, Hn
HnswPtrStore(base, element->value, valuePtr);
/* Create element lock */
SpinLockInit(&element->lock);
LWLockInitialize(&element->lock, hnsw_lock_tranche_id);
/* Get entry point */
LWLockAcquire(&graph->entryLock, LW_EXCLUSIVE);
@@ -558,9 +558,9 @@ InitGraph(HnswGraph * graph, char *base, long memoryTotal)
graph->flushed = false;
graph->indtuples = 0;
SpinLockInit(&graph->lock);
LWLockInitialize(&graph->entryLock, entryLockTrancheId);
LWLockInitialize(&graph->allocatorLock, allocatorLockTrancheId);
LWLockInitialize(&graph->flushLock, flushLockTrancheId);
LWLockInitialize(&graph->entryLock, hnsw_lock_tranche_id);
LWLockInitialize(&graph->allocatorLock, hnsw_lock_tranche_id);
LWLockInitialize(&graph->flushLock, hnsw_lock_tranche_id);
}
/*

View File

@@ -780,9 +780,9 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
/* Copy neighborhood to local memory if needed */
if (index == NULL)
{
SpinLockAcquire(&cElement->lock);
LWLockAcquire(&cElement->lock, LW_EXCLUSIVE);
memcpy(neighborhoodData, neighborhood, neighborhoodSize);
SpinLockRelease(&cElement->lock);
LWLockRelease(&cElement->lock);
neighborhood = neighborhoodData;
}