Use LWLocks instead of SpinLocks (#410)

Spinlocks should be held only for a few instructions, for multiple
reasons:

- You have to be very careful not to elog() out while holding a
  spinlock, because there is no mechanism to release the spinlock on
  error.

- Waiters can waste a lot of cycles spinning if the lock is
  contended. I you wait on a spinlock for too long, the PostgreSQL
  implementation will actually PANIC, see s_lock_stuck().

The flushLock is particularly problematic. It is held in exclusive
mode, which means it holds a spinlock, over the call to
FlushPages(). FlushPages() performs lots of I/O so it can take a very
long time (>= minutes), and can also easily error out for various
reasons.

allocatorLock would perhaps be OK as a spinlocks, but even that feels
a bit heavy, so I converted that to an LWLock, too.

entryLock is usually held for a very short time, in shared mode, so
that would be fine as a spinlock. However, in the rare case that the
entry point is updated, it's held for a very long time. An LWLock used
in shared mode is about as fast a spinlock, that path is pretty
heavily optimized.

I think we have some problems with the per-element spinlocks too. In
HnswUpdateNeighborPagesInMemory(), it's held over a call to
HnswUpdateConnection(), but HnswUpdateConnection() can error out at
least in case of an out-of-memory error (it uses lappend(), which
calls palloc()). It also calls the distance function, and I don't
think they are guaranteed to be ereport-free either. However, I didn't
address that in this PR, it needs a bit more thinking.
This commit is contained in:
Heikki Linnakangas
2024-01-16 23:25:03 +02:00
committed by GitHub
parent fa0acbf62d
commit 719b4b7436
4 changed files with 60 additions and 83 deletions

View File

@@ -741,7 +741,6 @@ Thanks to:
- [k-means++: The Advantage of Careful Seeding](https://theory.stanford.edu/~sergei/papers/kMeansPP-soda.pdf)
- [Concept Decompositions for Large Sparse Text Data using Clustering](https://www.cs.utexas.edu/users/inderjit/public_papers/concept_mlj.pdf)
- [Efficient and Robust Approximate Nearest Neighbor Search using Hierarchical Navigable Small World Graphs](https://arxiv.org/ftp/arxiv/papers/1603/1603.09320.pdf)
- [Concurrent Programming: Algorithms, Principles, and Foundations](https://doi.org/10.1007/978-3-642-32027-9)
## History

View File

@@ -16,12 +16,48 @@
int hnsw_ef_search;
static relopt_kind hnsw_relopt_kind;
int entryLockTrancheId;
int allocatorLockTrancheId;
int flushLockTrancheId;
/*
* Initialize index options and variables
*/
void
HnswInit(void)
{
int *tranche_ids;
bool found;
/*
* Assign tranche IDs for our LWLocks. This only needs to be done by one
* backend, the tranche IDs are remembered in shared memory.
*
* This shared memory area is very small, so we just allocate it from the
* "slop" that PostgreSQL reserves for small allocations like this. If
* this grows bigger, we should use a shmem_request_hook and
* RequestAddinShmemSpace() to pre-reserve space for this.
*/
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
tranche_ids = ShmemInitStruct("pgvector LWLock ids",
sizeof(int) * 3,
&found);
if (!found)
{
tranche_ids[0] = LWLockNewTrancheId();
tranche_ids[1] = LWLockNewTrancheId();
tranche_ids[2] = LWLockNewTrancheId();
}
entryLockTrancheId = tranche_ids[0];
allocatorLockTrancheId = tranche_ids[1];
flushLockTrancheId = tranche_ids[2];
LWLockRelease(AddinShmemInitLock);
/* Per-backend registration of the tranche IDs */
LWLockRegisterTranche(entryLockTrancheId, "pgvector entryLock");
LWLockRegisterTranche(allocatorLockTrancheId, "pgvector allocatorLock");
LWLockRegisterTranche(flushLockTrancheId, "pgvector flushLock");
hnsw_relopt_kind = add_reloption_kind();
add_int_reloption(hnsw_relopt_kind, "m", "Max number of connections",
HNSW_DEFAULT_M, HNSW_MIN_M, HNSW_MAX_M

View File

@@ -116,6 +116,11 @@
/* Variables */
extern int hnsw_ef_search;
/* These are initialized when the module is loaded */
extern int entryLockTrancheId;
extern int allocatorLockTrancheId;
extern int flushLockTrancheId;
typedef struct HnswElementData HnswElementData;
typedef struct HnswNeighborArray HnswNeighborArray;
@@ -177,24 +182,6 @@ typedef struct HnswOptions
int efConstruction; /* size of dynamic candidate list */
} HnswOptions;
typedef enum HnswLWLockMode
{
RW_EXCLUSIVE,
RW_SHARED
} HnswLWLockMode;
/*
* Readers-writers with weak priority to the readers
*
* https://doi.org/10.1007/978-3-642-32027-9
*/
typedef struct HnswRWLock
{
volatile int readers;
slock_t readersMutex;
slock_t globalMutex;
} HnswRWLock;
typedef struct HnswGraph
{
/* Graph state */
@@ -203,16 +190,16 @@ typedef struct HnswGraph
double indtuples;
/* Entry state */
slock_t entryLock;
LWLock entryLock;
HnswElementPtr entryPoint;
/* Allocations state */
slock_t allocatorLock;
LWLock allocatorLock;
long memoryUsed;
long memoryTotal;
/* Flushed state */
HnswRWLock flushLock;
LWLock flushLock;
bool flushed;
} HnswGraph;

View File

@@ -291,51 +291,6 @@ FlushPages(HnswBuildState * buildstate)
MemoryContextReset(buildstate->graphCtx);
}
/*
* Initialize a readers-writer lock
*/
static void
HnswRWLockInitialize(HnswRWLock * lock)
{
lock->readers = 0;
SpinLockInit(&lock->readersMutex);
SpinLockInit(&lock->globalMutex);
}
/*
* Acquire a readers-writer lock
*/
static void
HnswRWLockAcquire(HnswRWLock * lock, HnswLWLockMode lockmode)
{
if (lockmode == RW_EXCLUSIVE)
SpinLockAcquire(&lock->globalMutex);
else
{
SpinLockAcquire(&lock->readersMutex);
if (++lock->readers == 1)
SpinLockAcquire(&lock->globalMutex);
SpinLockRelease(&lock->readersMutex);
}
}
/*
* Release a readers-writer lock
*/
static void
HnswRWLockRelease(HnswRWLock * lock, HnswLWLockMode lockmode)
{
if (lockmode == RW_EXCLUSIVE)
SpinLockRelease(&lock->globalMutex);
else
{
SpinLockAcquire(&lock->readersMutex);
if (--lock->readers == 0)
SpinLockRelease(&lock->globalMutex);
SpinLockRelease(&lock->readersMutex);
}
}
/*
* Add a heap TID to an existing element
*/
@@ -463,7 +418,7 @@ InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, Hn
Size valueSize;
Pointer valuePtr;
bool updateEntryPoint;
HnswRWLock *flushLock = &graph->flushLock;
LWLock *flushLock = &graph->flushLock;
char *base = buildstate->hnswarea;
/* Detoast once for all calls */
@@ -480,25 +435,25 @@ InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, Hn
valueSize = VARSIZE_ANY(DatumGetPointer(value));
/* Ensure graph not flushed when inserting */
HnswRWLockAcquire(flushLock, RW_SHARED);
LWLockAcquire(flushLock, LW_SHARED);
if (graph->flushed)
{
HnswRWLockRelease(flushLock, RW_SHARED);
LWLockRelease(flushLock);
return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, buildstate->heap, true);
}
/* Get lock for allocator */
SpinLockAcquire(&graph->allocatorLock);
LWLockAcquire(&graph->allocatorLock, LW_EXCLUSIVE);
/* Flush pages if needed */
if (graph->memoryUsed >= graph->memoryTotal)
{
SpinLockRelease(&graph->allocatorLock);
LWLockRelease(&graph->allocatorLock);
HnswRWLockRelease(flushLock, RW_SHARED);
HnswRWLockAcquire(flushLock, RW_EXCLUSIVE);
LWLockRelease(flushLock);
LWLockAcquire(flushLock, LW_EXCLUSIVE);
if (!graph->flushed)
{
@@ -510,7 +465,7 @@ InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, Hn
FlushPages(buildstate);
}
HnswRWLockRelease(flushLock, RW_EXCLUSIVE);
LWLockRelease(flushLock);
return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, buildstate->heap, true);
}
@@ -520,7 +475,7 @@ InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, Hn
valuePtr = HnswAlloc(allocator, valueSize);
/* Release allocator lock */
SpinLockRelease(&graph->allocatorLock);
LWLockRelease(&graph->allocatorLock);
/* Copy datum */
memcpy(valuePtr, DatumGetPointer(value), valueSize);
@@ -530,13 +485,13 @@ InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, Hn
SpinLockInit(&element->lock);
/* Get entry point */
SpinLockAcquire(&graph->entryLock);
LWLockAcquire(&graph->entryLock, LW_EXCLUSIVE);
entryPoint = HnswPtrAccess(base, graph->entryPoint);
updateEntryPoint = entryPoint == NULL || element->level > entryPoint->level;
/* Release lock if not updating entry point */
if (!updateEntryPoint)
SpinLockRelease(&graph->entryLock);
LWLockRelease(&graph->entryLock);
/* Insert element in graph */
HnswInsertElement(base, element, entryPoint, NULL, procinfo, collation, m, efConstruction, false);
@@ -546,10 +501,10 @@ InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, Hn
/* Release lock if needed */
if (updateEntryPoint)
SpinLockRelease(&graph->entryLock);
LWLockRelease(&graph->entryLock);
/* Release flush lock */
HnswRWLockRelease(flushLock, RW_SHARED);
LWLockRelease(flushLock);
return true;
}
@@ -603,9 +558,9 @@ InitGraph(HnswGraph * graph, char *base, long memoryTotal)
graph->flushed = false;
graph->indtuples = 0;
SpinLockInit(&graph->lock);
SpinLockInit(&graph->entryLock);
SpinLockInit(&graph->allocatorLock);
HnswRWLockInitialize(&graph->flushLock);
LWLockInitialize(&graph->entryLock, entryLockTrancheId);
LWLockInitialize(&graph->allocatorLock, allocatorLockTrancheId);
LWLockInitialize(&graph->flushLock, flushLockTrancheId);
}
/*