From 9d3e4e74df43440dc0bf1acd6d477614a4572a74 Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Mon, 15 Jan 2024 15:07:31 -0800 Subject: [PATCH] Added support for in-memory parallel index builds for HNSW --- CHANGELOG.md | 2 +- README.md | 1 + src/hnsw.c | 6 - src/hnsw.h | 131 +++++++-- src/hnswbuild.c | 456 +++++++++++++++++++++----------- src/hnswinsert.c | 80 +++--- src/hnswscan.c | 13 +- src/hnswutils.c | 366 ++++++++++++++++++------- src/hnswvacuum.c | 8 +- test/t/012_hnsw_build_recall.pl | 18 +- 10 files changed, 771 insertions(+), 310 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7076d49..f523c8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## 0.5.2 (unreleased) - Improved performance of HNSW -- Added support for on-disk parallel index builds for HNSW +- Added support for parallel index builds for HNSW - Reduced memory usage for HNSW index builds - Reduced WAL generation for HNSW index builds - Fixed `invalid memory alloc request size` error with HNSW index build diff --git a/README.md b/README.md index d09c4fd..e163886 100644 --- a/README.md +++ b/README.md @@ -741,6 +741,7 @@ Thanks to: - [k-means++: The Advantage of Careful Seeding](https://theory.stanford.edu/~sergei/papers/kMeansPP-soda.pdf) - [Concept Decompositions for Large Sparse Text Data using Clustering](https://www.cs.utexas.edu/users/inderjit/public_papers/concept_mlj.pdf) - [Efficient and Robust Approximate Nearest Neighbor Search using Hierarchical Navigable Small World Graphs](https://arxiv.org/ftp/arxiv/papers/1603/1603.09320.pdf) +- [Concurrent Programming: Algorithms, Principles, and Foundations](https://doi.org/10.1007/978-3-642-32027-9) ## History diff --git a/src/hnsw.c b/src/hnsw.c index 1719820..758e418 100644 --- a/src/hnsw.c +++ b/src/hnsw.c @@ -14,7 +14,6 @@ #endif int hnsw_ef_search; -bool hnsw_enable_parallel_build; static relopt_kind hnsw_relopt_kind; /* @@ -40,11 +39,6 @@ HnswInit(void) DefineCustomIntVariable("hnsw.ef_search", "Sets the size of the dynamic candidate list for search", "Valid range is 1..1000.", &hnsw_ef_search, HNSW_DEFAULT_EF_SEARCH, HNSW_MIN_EF_SEARCH, HNSW_MAX_EF_SEARCH, PGC_USERSET, 0, NULL, NULL, NULL); - - /* Behind a variable for now since can be slower than building in memory */ - DefineCustomBoolVariable("hnsw.enable_parallel_build", "Enables or disables building indexes in parallel", - NULL, &hnsw_enable_parallel_build, - false, PGC_USERSET, 0, NULL, NULL, NULL); } /* diff --git a/src/hnsw.h b/src/hnsw.h index 783a345..b89efc4 100644 --- a/src/hnsw.h +++ b/src/hnsw.h @@ -6,9 +6,9 @@ #include "access/generic_xlog.h" #include "access/parallel.h" #include "access/reloptions.h" -#include "lib/ilist.h" #include "nodes/execnodes.h" #include "port.h" /* for random() */ +#include "utils/relptr.h" #include "utils/sampling.h" #include "vector.h" @@ -96,33 +96,62 @@ /* Ensure fits on page and in uint8 */ #define HnswGetMaxLevel(m) Min(((BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(HnswPageOpaqueData)) - offsetof(HnswNeighborTupleData, indextids) - sizeof(ItemIdData)) / (sizeof(ItemPointerData)) / (m)) - 2, 255) -#define HnswGetNeighbors(element, lc) (AssertMacro((element)->level >= (lc)), &(element)->neighbors[lc]) +#define HnswGetValue(base, element) PointerGetDatum(HnswPtrAccess(base, (element)->value)) + +#if PG_VERSION_NUM < 140005 +#define relptr_offset(rp) ((rp).relptr_off - 1) +#endif + +/* Pointer macros */ +#define HnswPtrAccess(base, hp) ((base) == NULL ? (hp).ptr : relptr_access(base, (hp).relptr)) +#define HnswPtrStore(base, hp, value) ((base) == NULL ? (hp).ptr = (value) : (void) relptr_store(base, (hp).relptr, value)) +#define HnswPtrIsNull(base, hp) ((base) == NULL ? (hp).ptr == NULL : relptr_is_null((hp).relptr)) +#define HnswPtrSetNull(base, hp) ((base) == NULL ? (hp).ptr = NULL : (void) ((hp).relptr.relptr_off = 0)) +#define HnswPtrEqual(base, hp1, hp2) ((base) == NULL ? (hp1).ptr == (hp2).ptr : relptr_offset((hp1).relptr) == relptr_offset((hp2).relptr)) + +/* For code paths dedicated to each type */ +#define HnswPtrPointer(hp) (hp).ptr +#define HnswPtrOffset(hp) relptr_offset((hp).relptr) /* Variables */ extern int hnsw_ef_search; -extern bool hnsw_enable_parallel_build; + +typedef struct HnswElementData HnswElementData; +typedef struct HnswNeighborArray HnswNeighborArray; + +#define HnswPtrDeclare(type, relptrtype, ptrtype) \ + relptr_declare(type, relptrtype); \ + typedef union { type *ptr; relptrtype relptr; } ptrtype; + +/* Pointers that can be absolute or relative */ +/* Use char for DatumPtr so works with Pointer */ +HnswPtrDeclare(HnswElementData, HnswElementRelptr, HnswElementPtr); +HnswPtrDeclare(HnswNeighborArray, HnswNeighborArrayRelptr, HnswNeighborArrayPtr); +HnswPtrDeclare(HnswNeighborArrayPtr, HnswNeighborsRelptr, HnswNeighborsPtr); +HnswPtrDeclare(char, DatumRelptr, DatumPtr); typedef struct HnswElementData { - slist_node next; + HnswElementPtr next; ItemPointerData heaptids[HNSW_HEAPTIDS]; uint8 heaptidsLength; uint8 level; uint8 deleted; uint32 hash; - struct HnswNeighborArray *neighbors; + HnswNeighborsPtr neighbors; BlockNumber blkno; OffsetNumber offno; OffsetNumber neighborOffno; BlockNumber neighborPage; - Datum value; + DatumPtr value; + slock_t lock; } HnswElementData; typedef HnswElementData * HnswElement; typedef struct HnswCandidate { - HnswElement element; + HnswElementPtr element; float distance; bool closer; } HnswCandidate; @@ -131,7 +160,7 @@ typedef struct HnswNeighborArray { int length; bool closerSet; - HnswCandidate *items; + HnswCandidate items[FLEXIBLE_ARRAY_MEMBER]; } HnswNeighborArray; typedef struct HnswPairingHeapNode @@ -148,14 +177,43 @@ typedef struct HnswOptions int efConstruction; /* size of dynamic candidate list */ } HnswOptions; +typedef enum HnswLWLockMode +{ + RW_EXCLUSIVE, + RW_SHARED +} HnswLWLockMode; + +/* + * Readers-writers with weak priority to the readers + * + * https://doi.org/10.1007/978-3-642-32027-9 + */ +typedef struct HnswRWLock +{ + volatile int readers; + slock_t readersMutex; + slock_t globalMutex; +} HnswRWLock; + typedef struct HnswGraph { - slist_head elements; - HnswElement entryPoint; + /* Graph state */ + slock_t lock; + HnswElementPtr head; + double indtuples; + + /* Entry state */ + slock_t entryLock; + HnswElementPtr entryPoint; + + /* Allocations state */ + slock_t allocatorLock; long memoryUsed; long memoryTotal; + + /* Flushed state */ + HnswRWLock flushLock; bool flushed; - double indtuples; } HnswGraph; typedef struct HnswSpool @@ -199,8 +257,15 @@ typedef struct HnswLeader int nparticipanttuplesorts; HnswShared *hnswshared; Snapshot snapshot; + char *hnswarea; } HnswLeader; +typedef struct HnswAllocator +{ + void *(*alloc) (Size size, void *state); + void *state; +} HnswAllocator; + typedef struct HnswBuildState { /* Info */ @@ -233,10 +298,12 @@ typedef struct HnswBuildState /* Memory */ MemoryContext graphCtx; MemoryContext tmpCtx; + HnswAllocator allocator; /* Parallel builds */ HnswLeader *hnswleader; HnswShared *hnswshared; + char *hnswarea; } HnswBuildState; typedef struct HnswMetaPageData @@ -335,23 +402,24 @@ bool HnswNormValue(FmgrInfo *procinfo, Oid collation, Datum *value, Vector * re Buffer HnswNewBuffer(Relation index, ForkNumber forkNum); void HnswInitPage(Buffer buf, Page page); void HnswInit(void); -List *HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement); +List *HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement); HnswElement HnswGetEntryPoint(Relation index); void HnswGetMetaPageInfo(Relation index, int *m, HnswElement * entryPoint); -HnswElement HnswInitElement(ItemPointer tid, int m, double ml, int maxLevel); +void *HnswAlloc(HnswAllocator * allocator, Size size); +HnswElement HnswInitElement(char *base, ItemPointer tid, int m, double ml, int maxLevel, HnswAllocator * alloc); HnswElement HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno); -void HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing); -HnswCandidate *HnswEntryCandidate(HnswElement em, Datum q, Relation rel, FmgrInfo *procinfo, Oid collation, bool loadVec); +void HnswInsertElement(char *base, HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing); +HnswCandidate *HnswEntryCandidate(char *base, HnswElement em, Datum q, Relation rel, FmgrInfo *procinfo, Oid collation, bool loadVec); void HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum, bool building); -void HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m); +void HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m); void HnswAddHeapTid(HnswElement element, ItemPointer heaptid); -void HnswInitNeighbors(HnswElement element, int m); -bool HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building); +void HnswInitNeighbors(char *base, HnswElement element, int m, HnswAllocator * alloc); +bool HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building); void HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building); void HnswLoadElementFromTuple(HnswElement element, HnswElementTuple etup, bool loadHeaptids, bool loadVec); void HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec); -void HnswSetElementTuple(HnswElementTuple etup, HnswElement element); -void HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int m, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation); +void HnswSetElementTuple(char *base, HnswElementTuple etup, HnswElement element); +void HnswUpdateConnection(char *base, HnswElement element, HnswCandidate * hc, int m, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation); void HnswLoadNeighbors(HnswElement element, Relation index, int m); PGDLLEXPORT void HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc); @@ -371,6 +439,16 @@ void hnswrescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, bool hnswgettuple(IndexScanDesc scan, ScanDirection dir); void hnswendscan(IndexScanDesc scan); +static inline +HnswNeighborArray * HnswGetNeighbors(char *base, HnswElement element, int lc) +{ + HnswNeighborArrayPtr *neighborList = HnswPtrAccess(base, element->neighbors); + + Assert(element->level >= lc); + + return HnswPtrAccess(base, neighborList[lc]); +} + /* Hash tables */ typedef struct TidHashEntry { @@ -398,4 +476,17 @@ typedef struct PointerHashEntry #define SH_DECLARE #include "lib/simplehash.h" +typedef struct OffsetHashEntry +{ + Size offset; + char status; +} OffsetHashEntry; + +#define SH_PREFIX offsethash +#define SH_ELEMENT_TYPE OffsetHashEntry +#define SH_KEY_TYPE Size +#define SH_SCOPE extern +#define SH_DECLARE +#include "lib/simplehash.h" + #endif diff --git a/src/hnswbuild.c b/src/hnswbuild.c index 35c03d0..b92bdc9 100644 --- a/src/hnswbuild.c +++ b/src/hnswbuild.c @@ -54,7 +54,8 @@ #endif #define PARALLEL_KEY_HNSW_SHARED UINT64CONST(0xA000000000000001) -#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000002) +#define PARALLEL_KEY_HNSW_AREA UINT64CONST(0xA000000000000002) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000003) #if PG_VERSION_NUM < 130000 #define GENERATIONCHUNK_RAWSIZE (SIZEOF_SIZE_T + SIZEOF_VOID_P * 2) @@ -135,9 +136,11 @@ CreateElementPages(HnswBuildState * buildstate) HnswElementTuple etup; HnswNeighborTuple ntup; BlockNumber insertPage; + HnswElement entryPoint; Buffer buf; Page page; - slist_iter iter; + HnswElementPtr iter = buildstate->graph->head; + char *base = buildstate->hnswarea; /* Calculate sizes */ etupAllocSize = BLCKSZ; @@ -152,18 +155,22 @@ CreateElementPages(HnswBuildState * buildstate) page = BufferGetPage(buf); HnswInitPage(buf, page); - slist_foreach(iter, &buildstate->graph->elements) + while (!HnswPtrIsNull(base, iter)) { - HnswElement element = slist_container(HnswElementData, next, iter.cur); + HnswElement element = HnswPtrAccess(base, iter); Size etupSize; Size ntupSize; Size combinedSize; + void *valuePtr = HnswPtrAccess(base, element->value); + + /* Update iterator */ + iter = element->next; /* Zero memory for each element */ MemSet(etup, 0, etupAllocSize); /* Calculate sizes */ - etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(DatumGetPointer(element->value))); + etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(valuePtr)); ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(element->level, buildstate->m); combinedSize = etupSize + ntupSize + sizeof(ItemIdData); @@ -171,7 +178,7 @@ CreateElementPages(HnswBuildState * buildstate) if (etupSize > etupAllocSize) elog(ERROR, "index tuple too large"); - HnswSetElementTuple(etup, element); + HnswSetElementTuple(base, etup, element); /* Keep element and neighbors on the same page if possible */ if (PageGetFreeSpace(page) < etupSize || (combinedSize <= maxSize && PageGetFreeSpace(page) < combinedSize)) @@ -212,7 +219,8 @@ CreateElementPages(HnswBuildState * buildstate) MarkBufferDirty(buf); UnlockReleaseBuffer(buf); - HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, buildstate->graph->entryPoint, insertPage, forkNum, true); + entryPoint = HnswPtrAccess(base, buildstate->graph->entryPoint); + HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, entryPoint, insertPage, forkNum, true); pfree(etup); pfree(ntup); @@ -227,19 +235,23 @@ CreateNeighborPages(HnswBuildState * buildstate) Relation index = buildstate->index; ForkNumber forkNum = buildstate->forkNum; int m = buildstate->m; - slist_iter iter; + HnswElementPtr iter = buildstate->graph->head; + char *base = buildstate->hnswarea; HnswNeighborTuple ntup; /* Allocate once */ ntup = palloc0(BLCKSZ); - slist_foreach(iter, &buildstate->graph->elements) + while (!HnswPtrIsNull(base, iter)) { - HnswElement e = slist_container(HnswElementData, next, iter.cur); + HnswElement e = HnswPtrAccess(base, iter); Buffer buf; Page page; Size ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(e->level, m); + /* Update iterator */ + iter = e->next; + /* Can take a while, so ensure we can interrupt */ /* Needs to be called when no buffer locks are held */ CHECK_FOR_INTERRUPTS(); @@ -248,7 +260,7 @@ CreateNeighborPages(HnswBuildState * buildstate) LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buf); - HnswSetNeighborTuple(ntup, e, m); + HnswSetNeighborTuple(base, ntup, e, m); if (!PageIndexTupleOverwrite(page, e->neighborOffno, (Item) ntup, ntupSize)) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index)); @@ -261,24 +273,6 @@ CreateNeighborPages(HnswBuildState * buildstate) pfree(ntup); } -#ifdef HNSW_MEMORY -/* - * Show memory usage - */ -static void -ShowMemoryUsage(HnswBuildState * buildstate) -{ -#if PG_VERSION_NUM >= 130000 - elog(INFO, "graph memory: %zu MB, total memory: %zu MB", - MemoryContextMemAllocated(buildstate->graphCtx, false) / (1024 * 1024), - MemoryContextMemAllocated(CurrentMemoryContext, true) / (1024 * 1024)); -#else - MemoryContextStats(CurrentMemoryContext); - elog(INFO, "estimated memory: %zu MB", buildstate->memoryUsed / (1024 * 1024)); -#endif -} -#endif - /* * Flush pages */ @@ -286,7 +280,7 @@ static void FlushPages(HnswBuildState * buildstate) { #ifdef HNSW_MEMORY - ShowMemoryUsage(buildstate); + elog(INFO, "memory: %zu MB", buildstate->graph->memoryUsed / (1024 * 1024)); #endif CreateMetaPage(buildstate); @@ -297,66 +291,177 @@ FlushPages(HnswBuildState * buildstate) MemoryContextReset(buildstate->graphCtx); } -#if PG_VERSION_NUM < 130000 /* - * Get the memory used by an element + * Initialize a readers-writer lock */ -static long -HnswElementMemory(HnswElement e, int m) +static void +HnswRWLockInitialize(HnswRWLock * lock) { - long elementSize = sizeof(HnswElementData); - - elementSize += sizeof(HnswNeighborArray) * (e->level + 1); - elementSize += sizeof(HnswCandidate) * (m * (e->level + 2)); - elementSize += VARSIZE_ANY(DatumGetPointer(e->value)); - /* Each allocation has a chunk header */ - elementSize += (e->level + 4) * GENERATIONCHUNK_RAWSIZE; - /* Add an extra 5% for alignment and other overhead */ - return elementSize * 1.05; + lock->readers = 0; + SpinLockInit(&lock->readersMutex); + SpinLockInit(&lock->globalMutex); +} + +/* + * Acquire a readers-writer lock + */ +static void +HnswRWLockAcquire(HnswRWLock * lock, HnswLWLockMode lockmode) +{ + if (lockmode == RW_EXCLUSIVE) + SpinLockAcquire(&lock->globalMutex); + else + { + SpinLockAcquire(&lock->readersMutex); + if (++lock->readers == 1) + SpinLockAcquire(&lock->globalMutex); + SpinLockRelease(&lock->readersMutex); + } +} + +/* + * Release a readers-writer lock + */ +static void +HnswRWLockRelease(HnswRWLock * lock, HnswLWLockMode lockmode) +{ + if (lockmode == RW_EXCLUSIVE) + SpinLockRelease(&lock->globalMutex); + else + { + SpinLockAcquire(&lock->readersMutex); + if (--lock->readers == 0) + SpinLockRelease(&lock->globalMutex); + SpinLockRelease(&lock->readersMutex); + } +} + +/* + * Add a heap TID to an existing element + */ +static bool +HnswAddDuplicateInMemory(HnswElement element, HnswElement dup) +{ + SpinLockAcquire(&dup->lock); + + if (dup->heaptidsLength == HNSW_HEAPTIDS) + { + SpinLockRelease(&dup->lock); + return false; + } + + HnswAddHeapTid(dup, &element->heaptids[0]); + + SpinLockRelease(&dup->lock); + + return true; } -#endif /* * Find duplicate element */ static bool -HnswFindDuplicateInMemory(HnswElement element) +HnswFindDuplicateInMemory(char *base, HnswElement element) { - HnswNeighborArray *neighbors = HnswGetNeighbors(element, 0); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0); for (int i = 0; i < neighbors->length; i++) { HnswCandidate *neighbor = &neighbors->items[i]; + HnswElement neighborElement = HnswPtrAccess(base, neighbor->element); + Datum value = HnswGetValue(base, element); + Datum neighborValue = HnswGetValue(base, neighborElement); /* Exit early since ordered by distance */ - if (!datumIsEqual(element->value, neighbor->element->value, false, -1)) + if (!datumIsEqual(value, neighborValue, false, -1)) return false; /* Check for space */ - if (neighbor->element->heaptidsLength < HNSW_HEAPTIDS) - { - HnswAddHeapTid(neighbor->element, &element->heaptids[0]); + if (HnswAddDuplicateInMemory(element, neighborElement)) return true; - } } return false; } /* - * Insert tuple into in-memory graph + * Add to element and neighbor pages + */ +static void +WriteNewElementPagesInMemory(char *base, HnswGraph * graph, HnswElement element) +{ + SpinLockAcquire(&graph->lock); + element->next = graph->head; + HnswPtrStore(base, graph->head, element); + SpinLockRelease(&graph->lock); +} + +/* + * Update neighbors + */ +static void +HnswUpdateNeighborPagesInMemory(char *base, FmgrInfo *procinfo, Oid collation, HnswElement e, int m) +{ + for (int lc = e->level; lc >= 0; lc--) + { + int lm = HnswGetLayerM(m, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc); + + for (int i = 0; i < neighbors->length; i++) + { + HnswCandidate *hc = &neighbors->items[i]; + HnswElement neighborElement = HnswPtrAccess(base, hc->element); + + /* Use element for lock instead of hc since hc can be replaced */ + SpinLockAcquire(&neighborElement->lock); + HnswUpdateConnection(base, e, hc, lm, lc, NULL, NULL, procinfo, collation); + SpinLockRelease(&neighborElement->lock); + } + } +} + +/* + * Write changes in memory + */ +static void +WriteElementInMemory(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement entryPoint, HnswBuildState * buildstate, HnswGraph * graph, bool updateEntryPoint) +{ + char *base = buildstate->hnswarea; + + /* Try to add to existing page */ + if (HnswFindDuplicateInMemory(base, element)) + return; + + /* Write element and neighbor tuples */ + WriteNewElementPagesInMemory(base, graph, element); + + /* Update neighbors */ + HnswUpdateNeighborPagesInMemory(base, procinfo, collation, element, m); + + /* Update entry point if needed (already have lock) */ + if (updateEntryPoint) + HnswPtrStore(base, graph->entryPoint, element); +} + +/* + * Insert tuple */ static bool -InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuildState * buildstate) +InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, HnswBuildState * buildstate) { FmgrInfo *procinfo = buildstate->procinfo; Oid collation = buildstate->collation; HnswGraph *graph = buildstate->graph; - HnswElement entryPoint = graph->entryPoint; + HnswElement entryPoint; int efConstruction = buildstate->efConstruction; int m = buildstate->m; - MemoryContext oldCtx; HnswElement element; + HnswAllocator *allocator = &buildstate->allocator; + Size valueSize; + Pointer valuePtr; + bool updateEntryPoint; + HnswRWLock *flushLock = &graph->flushLock; + char *base = buildstate->hnswarea; /* Detoast once for all calls */ Datum value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); @@ -368,69 +473,84 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil return false; } - /* Allocate element in graph memory context */ - oldCtx = MemoryContextSwitchTo(buildstate->graphCtx); - element = HnswInitElement(heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel); - element->value = datumCopy(value, false, -1); - MemoryContextSwitchTo(oldCtx); + /* Get datum size */ + valueSize = VARSIZE_ANY(DatumGetPointer(value)); - /* Update memory usage */ -#if PG_VERSION_NUM >= 130000 - graph->memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false); -#else - graph->memoryUsed += HnswElementMemory(element, buildstate->m); -#endif + /* Ensure graph not flushed when inserting */ + HnswRWLockAcquire(flushLock, RW_SHARED); + + if (graph->flushed) + { + HnswRWLockRelease(flushLock, RW_SHARED); + + return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, buildstate->heap, true); + } + + /* Get lock for allocator */ + SpinLockAcquire(&graph->allocatorLock); + + /* Flush pages if needed */ + if (graph->memoryUsed >= graph->memoryTotal) + { + SpinLockRelease(&graph->allocatorLock); + + HnswRWLockRelease(flushLock, RW_SHARED); + HnswRWLockAcquire(flushLock, RW_EXCLUSIVE); + + if (!graph->flushed) + { + ereport(NOTICE, + (errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) graph->indtuples), + errdetail("Building will take significantly more time."), + errhint("Increase maintenance_work_mem to speed up builds."))); + + FlushPages(buildstate); + } + + HnswRWLockRelease(flushLock, RW_EXCLUSIVE); + + return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, buildstate->heap, true); + } + + /* Create an element */ + element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator); + valuePtr = HnswAlloc(allocator, valueSize); + + /* Release allocator lock */ + SpinLockRelease(&graph->allocatorLock); + + /* Copy datum */ + memcpy(valuePtr, DatumGetPointer(value), valueSize); + HnswPtrStore(base, element->value, valuePtr); + + /* Create element lock */ + SpinLockInit(&element->lock); + + /* Get entry point */ + SpinLockAcquire(&graph->entryLock); + entryPoint = HnswPtrAccess(base, graph->entryPoint); + updateEntryPoint = entryPoint == NULL || element->level > entryPoint->level; + + /* Release lock if not updating entry point */ + if (!updateEntryPoint) + SpinLockRelease(&graph->entryLock); /* Insert element in graph */ - HnswInsertElement(element, entryPoint, NULL, procinfo, collation, m, efConstruction, false); + HnswInsertElement(base, element, entryPoint, NULL, procinfo, collation, m, efConstruction, false); - /* Look for duplicate */ - if (HnswFindDuplicateInMemory(element)) - { - /* No need to free element since memory unlikely to be reallocated */ - return true; - } + /* Write to memory */ + WriteElementInMemory(index, procinfo, collation, element, m, efConstruction, entryPoint, buildstate, graph, updateEntryPoint); - /* Add element */ - slist_push_head(&graph->elements, &element->next); + /* Release lock if needed */ + if (updateEntryPoint) + SpinLockRelease(&graph->entryLock); - /* Update neighbors */ - for (int lc = element->level; lc >= 0; lc--) - { - int lm = HnswGetLayerM(m, lc); - HnswNeighborArray *neighbors = HnswGetNeighbors(element, lc); - - for (int i = 0; i < neighbors->length; i++) - HnswUpdateConnection(element, &neighbors->items[i], lm, lc, NULL, NULL, procinfo, collation); - } - - /* Update entry point if needed */ - if (entryPoint == NULL || element->level > entryPoint->level) - graph->entryPoint = element; + /* Release flush lock */ + HnswRWLockRelease(flushLock, RW_SHARED); return true; } -/* - * Acquire a lock if needed - */ -static inline void -HnswLockAcquire(HnswShared * hnswshared) -{ - if (hnswshared) - SpinLockAcquire(&hnswshared->mutex); -} - -/* - * Release a lock if needed - */ -static inline void -HnswLockRelease(HnswShared * hnswshared) -{ - if (hnswshared) - SpinLockRelease(&hnswshared->mutex); -} - /* * Callback for table_index_build_scan */ @@ -440,9 +560,7 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, { HnswBuildState *buildstate = (HnswBuildState *) state; HnswGraph *graph = buildstate->graph; - HnswShared *hnswshared = buildstate->hnswshared; MemoryContext oldCtx; - bool inserted; #if PG_VERSION_NUM < 130000 ItemPointer tid = &hup->t_self; @@ -452,31 +570,16 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, if (isnull[0]) return; - /* Flush pages if needed */ - if (!graph->flushed && graph->memoryUsed >= graph->memoryTotal) - { - ereport(NOTICE, - (errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) graph->indtuples), - errdetail("Building will take significantly more time."), - errhint("Increase maintenance_work_mem to speed up builds."))); - - FlushPages(buildstate); - } - + /* Use memory context */ oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); /* Insert tuple */ - if (graph->flushed) - inserted = HnswInsertTuple(index, values, isnull, tid, buildstate->heap, true); - else - inserted = InsertTupleInMemory(index, values, tid, buildstate); - - /* Update progress */ - if (inserted) + if (InsertTuple(index, values, isnull, tid, buildstate)) { - HnswLockAcquire(hnswshared); + /* Update progress */ + SpinLockAcquire(&graph->lock); UpdateProgress(PROGRESS_CREATEIDX_TUPLES_DONE, ++graph->indtuples); - HnswLockRelease(hnswshared); + SpinLockRelease(&graph->lock); } /* Reset memory context */ @@ -488,14 +591,59 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, * Initialize the graph */ static void -InitGraph(HnswGraph * graph) +InitGraph(HnswGraph * graph, char *base, long memoryTotal) { - slist_init(&graph->elements); - graph->entryPoint = NULL; + HnswPtrSetNull(base, graph->head); + HnswPtrSetNull(base, graph->entryPoint); graph->memoryUsed = 0; - graph->memoryTotal = maintenance_work_mem * 1024L; + graph->memoryTotal = memoryTotal; graph->flushed = false; graph->indtuples = 0; + SpinLockInit(&graph->lock); + SpinLockInit(&graph->entryLock); + SpinLockInit(&graph->allocatorLock); + HnswRWLockInitialize(&graph->flushLock); +} + +/* + * Initialize an allocator + */ +static void +InitAllocator(HnswAllocator * allocator, void *(*alloc) (Size size, void *state), void *state) +{ + allocator->alloc = alloc; + allocator->state = state; +} + +/* + * Memory context allocator + */ +static void * +HnswMemoryContextAlloc(Size size, void *state) +{ + HnswBuildState *buildstate = (HnswBuildState *) state; + void *chunk = MemoryContextAlloc(buildstate->graphCtx, size); + +#if PG_VERSION_NUM >= 130000 + buildstate->graphData.memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false); +#else + buildstate->graphData.memoryUsed += MAXALIGN(size); +#endif + + return chunk; +} + +/* + * Shared memory allocator + */ +static void * +HnswSharedMemoryAlloc(Size size, void *state) +{ + HnswBuildState *buildstate = (HnswBuildState *) state; + void *chunk = buildstate->hnswarea + buildstate->graph->memoryUsed; + + buildstate->graph->memoryUsed += MAXALIGN(size); + return chunk; } /* @@ -531,7 +679,7 @@ InitBuildState(HnswBuildState * buildstate, Relation heap, Relation index, Index buildstate->normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); buildstate->collation = index->rd_indcollation[0]; - InitGraph(&buildstate->graphData); + InitGraph(&buildstate->graphData, NULL, maintenance_work_mem * 1024L); buildstate->graph = &buildstate->graphData; buildstate->ml = HnswGetMl(buildstate->m); buildstate->maxLevel = HnswGetMaxLevel(buildstate->m); @@ -549,8 +697,11 @@ InitBuildState(HnswBuildState * buildstate, Relation heap, Relation index, Index "Hnsw build temporary context", ALLOCSET_DEFAULT_SIZES); + InitAllocator(&buildstate->allocator, &HnswMemoryContextAlloc, buildstate); + buildstate->hnswleader = NULL; buildstate->hnswshared = NULL; + buildstate->hnswarea = NULL; } /* @@ -581,6 +732,7 @@ ParallelHeapScan(HnswBuildState * buildstate) if (hnswshared->nparticipantsdone == nparticipanttuplesorts) { buildstate->graph = &hnswshared->graphData; + buildstate->hnswarea = buildstate->hnswleader->hnswarea; reltuples = hnswshared->reltuples; SpinLockRelease(&hnswshared->mutex); break; @@ -600,7 +752,7 @@ ParallelHeapScan(HnswBuildState * buildstate) * Perform a worker's portion of a parallel insert */ static void -HnswParallelScanAndInsert(HnswSpool * hnswspool, HnswShared * hnswshared, bool progress) +HnswParallelScanAndInsert(HnswSpool * hnswspool, HnswShared * hnswshared, char *hnswarea, bool progress) { HnswBuildState buildstate; #if PG_VERSION_NUM >= 120000 @@ -616,7 +768,8 @@ HnswParallelScanAndInsert(HnswSpool * hnswspool, HnswShared * hnswshared, bool p indexInfo->ii_Concurrent = hnswshared->isconcurrent; InitBuildState(&buildstate, hnswspool->heap, hnswspool->index, indexInfo, MAIN_FORKNUM); buildstate.graph = &hnswshared->graphData; - buildstate.hnswshared = hnswshared; + buildstate.hnswarea = hnswarea; + InitAllocator(&buildstate.allocator, &HnswSharedMemoryAlloc, &buildstate); #if PG_VERSION_NUM >= 120000 scan = table_beginscan_parallel(hnswspool->heap, ParallelTableScanFromHnswShared(hnswshared)); @@ -657,6 +810,7 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc) char *sharedquery; HnswSpool *hnswspool; HnswShared *hnswshared; + char *hnswarea; Relation heapRel; Relation indexRel; LOCKMODE heapLockmode; @@ -697,8 +851,10 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc) hnswspool->heap = heapRel; hnswspool->index = indexRel; + hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false); + /* Perform inserts */ - HnswParallelScanAndInsert(hnswspool, hnswshared, false); + HnswParallelScanAndInsert(hnswspool, hnswshared, hnswarea, false); /* Close relations within worker */ index_close(indexRel, indexLockmode); @@ -761,7 +917,7 @@ HnswLeaderParticipateAsWorker(HnswBuildState * buildstate) leaderworker->index = buildstate->index; /* Perform work common to all participants */ - HnswParallelScanAndInsert(leaderworker, hnswleader->hnswshared, true); + HnswParallelScanAndInsert(leaderworker, hnswleader->hnswshared, hnswleader->hnswarea, true); } /* @@ -774,7 +930,9 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) int scantuplesortstates; Snapshot snapshot; Size esthnswshared; + Size esthnswarea; HnswShared *hnswshared; + char *hnswarea; HnswLeader *hnswleader = (HnswLeader *) palloc0(sizeof(HnswLeader)); bool leaderparticipates = true; int querylen; @@ -803,7 +961,9 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) /* Estimate size of workspaces */ esthnswshared = ParallelEstimateShared(buildstate->heap, snapshot); shm_toc_estimate_chunk(&pcxt->estimator, esthnswshared); - shm_toc_estimate_keys(&pcxt->estimator, 1); + esthnswarea = maintenance_work_mem * 1024L; + shm_toc_estimate_chunk(&pcxt->estimator, esthnswarea); + shm_toc_estimate_keys(&pcxt->estimator, 2); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -840,10 +1000,6 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) /* Initialize mutable state */ hnswshared->nparticipantsdone = 0; hnswshared->reltuples = 0; - InitGraph(&hnswshared->graphData); - /* TODO Support in-memory builds */ - hnswshared->graphData.memoryTotal = 0; - hnswshared->graphData.flushed = true; #if PG_VERSION_NUM >= 120000 table_parallelscan_initialize(buildstate->heap, ParallelTableScanFromHnswShared(hnswshared), @@ -852,7 +1008,12 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) heap_parallelscan_initialize(&hnswshared->heapdesc, buildstate->heap, snapshot); #endif + hnswarea = (char *) shm_toc_allocate(pcxt->toc, esthnswarea); + /* Report less than allocated so never fails */ + InitGraph(&hnswshared->graphData, hnswarea, esthnswarea - 1024 * 1024); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_SHARED, hnswshared); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_AREA, hnswarea); /* Store query string for workers */ if (debug_query_string) @@ -872,6 +1033,7 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) hnswleader->nparticipanttuplesorts++; hnswleader->hnswshared = hnswshared; hnswleader->snapshot = snapshot; + hnswleader->hnswarea = hnswarea; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -926,16 +1088,12 @@ BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum) UpdateProgress(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_HNSW_PHASE_LOAD); /* Calculate parallel workers */ - if (buildstate->heap != NULL && hnsw_enable_parallel_build) + if (buildstate->heap != NULL) parallel_workers = ComputeParallelWorkers(buildstate->heap, buildstate->index); /* Attempt to launch parallel worker scan when required */ if (parallel_workers > 0) - { - /* TODO Support in-memory builds */ - FlushPages(buildstate); HnswBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers); - } /* Add tuples to graph */ if (buildstate->heap != NULL) diff --git a/src/hnswinsert.c b/src/hnswinsert.c index 55c29fb..624a40e 100644 --- a/src/hnswinsert.c +++ b/src/hnswinsert.c @@ -134,9 +134,10 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag OffsetNumber freeOffno = InvalidOffsetNumber; OffsetNumber freeNeighborOffno = InvalidOffsetNumber; BlockNumber newInsertPage = InvalidBlockNumber; + char *base = NULL; /* Calculate sizes */ - etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(DatumGetPointer(e->value))); + etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(HnswPtrAccess(base, e->value))); ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(e->level, m); combinedSize = etupSize + ntupSize + sizeof(ItemIdData); maxSize = HNSW_MAX_SIZE; @@ -144,11 +145,11 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag /* Prepare element tuple */ etup = palloc0(etupSize); - HnswSetElementTuple(etup, e); + HnswSetElementTuple(base, etup, e); /* Prepare neighbor tuple */ ntup = palloc0(ntupSize); - HnswSetNeighborTuple(ntup, e, m); + HnswSetNeighborTuple(base, ntup, e, m); /* Find a page (or two if needed) to insert the tuples */ for (;;) @@ -340,10 +341,12 @@ ConnectionExists(HnswElement e, HnswNeighborTuple ntup, int startIdx, int lm) void HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building) { + char *base = NULL; + for (int lc = e->level; lc >= 0; lc--) { int lm = HnswGetLayerM(m, lc); - HnswNeighborArray *neighbors = HnswGetNeighbors(e, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc); for (int i = 0; i < neighbors->length; i++) { @@ -356,11 +359,12 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE Size ntupSize; int idx = -1; int startIdx; - OffsetNumber offno = hc->element->neighborOffno; + HnswElement neighborElement = HnswPtrAccess(base, hc->element); + OffsetNumber offno = neighborElement->neighborOffno; /* Get latest neighbors since they may have changed */ /* Do not lock yet since selecting neighbors can take time */ - HnswLoadNeighbors(hc->element, index, m); + HnswLoadNeighbors(neighborElement, index, m); /* * Could improve performance for vacuuming by checking neighbors @@ -370,14 +374,14 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE */ /* Select neighbors */ - HnswUpdateConnection(e, hc, lm, lc, &idx, index, procinfo, collation); + HnswUpdateConnection(NULL, e, hc, lm, lc, &idx, index, procinfo, collation); /* New element was not selected as a neighbor */ if (idx == -1) continue; /* Register page */ - buf = ReadBuffer(index, hc->element->neighborPage); + buf = ReadBuffer(index, neighborElement->neighborPage); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); if (building) { @@ -396,7 +400,7 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE ntupSize = ItemIdGetLength(itemid); /* Calculate index for update */ - startIdx = (hc->element->level - lc) * m; + startIdx = (neighborElement->level - lc) * m; /* Check for existing connection */ if (checkExisting && ConnectionExists(e, ntup, startIdx, lm)) @@ -513,17 +517,21 @@ HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup, bool buil static bool HnswFindDuplicate(Relation index, HnswElement element, bool building) { - HnswNeighborArray *neighbors = HnswGetNeighbors(element, 0); + char *base = NULL; + HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0); for (int i = 0; i < neighbors->length; i++) { HnswCandidate *neighbor = &neighbors->items[i]; + HnswElement neighborElement = HnswPtrAccess(base, neighbor->element); + Datum value = HnswGetValue(base, element); + Datum neighborValue = HnswGetValue(base, neighborElement); /* Exit early since ordered by distance */ - if (!datumIsEqual(element->value, neighbor->element->value, false, -1)) + if (!datumIsEqual(value, neighborValue, false, -1)) return false; - if (HnswAddDuplicate(index, element, neighbor->element, building)) + if (HnswAddDuplicate(index, element, neighborElement, building)) return true; } @@ -561,10 +569,8 @@ WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement elem * Insert a tuple into the index */ bool -HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building) +HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building) { - Datum value; - FmgrInfo *normprocinfo; HnswElement entryPoint; HnswElement element; int m; @@ -572,17 +578,7 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti FmgrInfo *procinfo = index_getprocinfo(index, 1, HNSW_DISTANCE_PROC); Oid collation = index->rd_indcollation[0]; LOCKMODE lockmode = ShareLock; - - /* Detoast once for all calls */ - value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); - - /* Normalize if needed */ - normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); - if (normprocinfo != NULL) - { - if (!HnswNormValue(normprocinfo, collation, &value, NULL)) - return false; - } + char *base = NULL; /* * Get a shared lock. This allows vacuum to ensure no in-flight inserts @@ -595,8 +591,8 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti HnswGetMetaPageInfo(index, &m, &entryPoint); /* Create an element */ - element = HnswInitElement(heap_tid, m, HnswGetMl(m), HnswGetMaxLevel(m)); - element->value = value; + element = HnswInitElement(base, heap_tid, m, HnswGetMl(m), HnswGetMaxLevel(m), NULL); + HnswPtrStore(base, element->value, DatumGetPointer(value)); /* Prevent concurrent inserts when likely updating entry point */ if (entryPoint == NULL || element->level > entryPoint->level) @@ -613,7 +609,7 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti } /* Insert element in graph */ - HnswInsertElement(element, entryPoint, index, procinfo, collation, m, efConstruction, false); + HnswInsertElement(base, element, entryPoint, index, procinfo, collation, m, efConstruction, false); /* Write to disk */ WriteElement(index, procinfo, collation, element, m, efConstruction, entryPoint, building); @@ -624,6 +620,30 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti return true; } +/* + * Insert a tuple into the index + */ +static void +HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel) +{ + Datum value; + FmgrInfo *normprocinfo; + Oid collation = index->rd_indcollation[0]; + + /* Detoast once for all calls */ + value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); + + /* Normalize if needed */ + normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); + if (normprocinfo != NULL) + { + if (!HnswNormValue(normprocinfo, collation, &value, NULL)) + return; + } + + HnswInsertTupleOnDisk(index, value, values, isnull, heap_tid, heapRel, false); +} + /* * Insert a tuple into the index */ @@ -650,7 +670,7 @@ hnswinsert(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, oldCtx = MemoryContextSwitchTo(insertCtx); /* Insert tuple */ - HnswInsertTuple(index, values, isnull, heap_tid, heap, false); + HnswInsertTuple(index, values, isnull, heap_tid, heap); /* Delete memory context */ MemoryContextSwitchTo(oldCtx); diff --git a/src/hnswscan.c b/src/hnswscan.c index 48cd6ee..fb7954f 100644 --- a/src/hnswscan.c +++ b/src/hnswscan.c @@ -21,6 +21,7 @@ GetScanItems(IndexScanDesc scan, Datum q) List *w; int m; HnswElement entryPoint; + char *base = NULL; /* Get m and entry point */ HnswGetMetaPageInfo(index, &m, &entryPoint); @@ -28,15 +29,15 @@ GetScanItems(IndexScanDesc scan, Datum q) if (entryPoint == NULL) return NIL; - ep = list_make1(HnswEntryCandidate(entryPoint, q, index, procinfo, collation, false)); + ep = list_make1(HnswEntryCandidate(base, entryPoint, q, index, procinfo, collation, false)); for (int lc = entryPoint->level; lc >= 1; lc--) { - w = HnswSearchLayer(q, ep, 1, lc, index, procinfo, collation, m, false, NULL); + w = HnswSearchLayer(base, q, ep, 1, lc, index, procinfo, collation, m, false, NULL); ep = w; } - return HnswSearchLayer(q, ep, hnsw_ef_search, 0, index, procinfo, collation, m, false, NULL); + return HnswSearchLayer(base, q, ep, hnsw_ef_search, 0, index, procinfo, collation, m, false, NULL); } /* @@ -184,17 +185,19 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir) while (list_length(so->w) > 0) { + char *base = NULL; HnswCandidate *hc = llast(so->w); + HnswElement element = HnswPtrAccess(base, hc->element); ItemPointer heaptid; /* Move to next element if no valid heap TIDs */ - if (hc->element->heaptidsLength == 0) + if (element->heaptidsLength == 0) { so->w = list_delete_last(so->w); continue; } - heaptid = &hc->element->heaptids[--hc->element->heaptidsLength]; + heaptid = &element->heaptids[--element->heaptidsLength]; MemoryContextSwitchTo(oldCtx); diff --git a/src/hnswutils.c b/src/hnswutils.c index 7bee1fb..8c29cd8 100644 --- a/src/hnswutils.c +++ b/src/hnswutils.c @@ -84,9 +84,40 @@ hash_pointer(uintptr_t ptr) #define SH_DEFINE #include "lib/simplehash.h" +/* Needed to include simplehash.h again */ +#if PG_VERSION_NUM < 120000 +#undef SH_EQUAL +#undef sh_log2 +#undef sh_pow2 +#define sh_log2 offsethash_sh_log2 +#define sh_pow2 offsethash_sh_pow2 +#endif + +/* Offset hash table */ +static uint32 +hash_offset(Size offset) +{ +#if SIZEOF_SIZE_T == 8 + return murmurhash64((uint64) offset); +#else + return murmurhash32((uint32) offset); +#endif +} + +#define SH_PREFIX offsethash +#define SH_ELEMENT_TYPE OffsetHashEntry +#define SH_KEY_TYPE Size +#define SH_KEY offset +#define SH_HASH_KEY(tb, key) hash_offset(key) +#define SH_EQUAL(tb, a, b) (a == b) +#define SH_SCOPE extern +#define SH_DEFINE +#include "lib/simplehash.h" + typedef union { pointerhash_hash *pointers; + offsethash_hash *offsets; tidhash_hash *tids; } visited_hash; @@ -188,31 +219,46 @@ HnswInitPage(Buffer buf, Page page) * Allocate neighbors */ void -HnswInitNeighbors(HnswElement element, int m) +HnswInitNeighbors(char *base, HnswElement element, int m, HnswAllocator * allocator) { int level = element->level; - element->neighbors = palloc(sizeof(HnswNeighborArray) * (level + 1)); + HnswNeighborArrayPtr *neighborList = (HnswNeighborArrayPtr *) HnswAlloc(allocator, sizeof(HnswNeighborArrayPtr) * (level + 1)); + + HnswPtrStore(base, element->neighbors, neighborList); for (int lc = 0; lc <= level; lc++) { HnswNeighborArray *a; int lm = HnswGetLayerM(m, lc); - a = &element->neighbors[lc]; + HnswPtrStore(base, neighborList[lc], (HnswNeighborArray *) HnswAlloc(allocator, offsetof(HnswNeighborArray, items) + sizeof(HnswCandidate) * lm)); + + a = HnswGetNeighbors(base, element, lc); a->length = 0; - a->items = palloc(sizeof(HnswCandidate) * lm); a->closerSet = false; } } +/* + * Allocate memory from the allocator + */ +void * +HnswAlloc(HnswAllocator * allocator, Size size) +{ + if (allocator) + return (*(allocator)->alloc) (size, (allocator)->state); + + return palloc(size); +} + /* * Allocate an element */ HnswElement -HnswInitElement(ItemPointer heaptid, int m, double ml, int maxLevel) +HnswInitElement(char *base, ItemPointer heaptid, int m, double ml, int maxLevel, HnswAllocator * allocator) { - HnswElement element = palloc(sizeof(HnswElementData)); + HnswElement element = HnswAlloc(allocator, sizeof(HnswElementData)); int level = (int) (-log(RandomDouble()) * ml); @@ -226,9 +272,9 @@ HnswInitElement(ItemPointer heaptid, int m, double ml, int maxLevel) element->level = level; element->deleted = 0; - HnswInitNeighbors(element, m); + HnswInitNeighbors(base, element, m, allocator); - element->value = PointerGetDatum(NULL); + HnswPtrSetNull(base, element->value); return element; } @@ -252,8 +298,8 @@ HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno) element->blkno = blkno; element->offno = offno; - element->neighbors = NULL; - element->value = PointerGetDatum(NULL); + HnswPtrSetNull(NULL, element->neighbors); + HnswPtrSetNull(NULL, element->value); return element; } @@ -363,8 +409,10 @@ HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, Bloc * Set element tuple, except for neighbor info */ void -HnswSetElementTuple(HnswElementTuple etup, HnswElement element) +HnswSetElementTuple(char *base, HnswElementTuple etup, HnswElement element) { + Pointer valuePtr = HnswPtrAccess(base, element->value); + etup->type = HNSW_ELEMENT_TUPLE_TYPE; etup->level = element->level; etup->deleted = 0; @@ -375,14 +423,14 @@ HnswSetElementTuple(HnswElementTuple etup, HnswElement element) else ItemPointerSetInvalid(&etup->heaptids[i]); } - memcpy(&etup->data, DatumGetPointer(element->value), VARSIZE_ANY(DatumGetPointer(element->value))); + memcpy(&etup->data, valuePtr, VARSIZE_ANY(valuePtr)); } /* * Set neighbor tuple */ void -HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m) +HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m) { int idx = 0; @@ -390,7 +438,7 @@ HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m) for (int lc = e->level; lc >= 0; lc--) { - HnswNeighborArray *neighbors = HnswGetNeighbors(e, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc); int lm = HnswGetLayerM(m, lc); for (int i = 0; i < lm; i++) @@ -400,8 +448,9 @@ HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m) if (i < neighbors->length) { HnswCandidate *hc = &neighbors->items[i]; + HnswElement hce = HnswPtrAccess(base, hc->element); - ItemPointerSet(indextid, hc->element->blkno, hc->element->offno); + ItemPointerSet(indextid, hce->blkno, hce->offno); } else ItemPointerSetInvalid(indextid); @@ -417,12 +466,14 @@ HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m) static void LoadNeighborsFromPage(HnswElement element, Relation index, Page page, int m) { + char *base = NULL; + HnswNeighborTuple ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, element->neighborOffno)); int neighborCount = (element->level + 2) * m; Assert(HnswIsNeighborTuple(ntup)); - HnswInitNeighbors(element, m); + HnswInitNeighbors(base, element, m, NULL); /* Ensure expected neighbors */ if (ntup->count != neighborCount) @@ -448,9 +499,9 @@ LoadNeighborsFromPage(HnswElement element, Relation index, Page page, int m) if (level < 0) level = 0; - neighbors = HnswGetNeighbors(element, level); + neighbors = HnswGetNeighbors(base, element, level); hc = &neighbors->items[neighbors->length++]; - hc->element = e; + HnswPtrStore(base, hc->element, e); } } @@ -497,7 +548,12 @@ HnswLoadElementFromTuple(HnswElement element, HnswElementTuple etup, bool loadHe } if (loadVec) - element->value = datumCopy(PointerGetDatum(&etup->data), false, -1); + { + char *base = NULL; + Datum value = datumCopy(PointerGetDatum(&etup->data), false, -1); + + HnswPtrStore(base, element->value, DatumGetPointer(value)); + } } /* @@ -533,24 +589,27 @@ HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index, * Get the distance for a candidate */ static float -GetCandidateDistance(HnswCandidate * hc, Datum q, FmgrInfo *procinfo, Oid collation) +GetCandidateDistance(char *base, HnswCandidate * hc, Datum q, FmgrInfo *procinfo, Oid collation) { - return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, q, hc->element->value)); + HnswElement hce = HnswPtrAccess(base, hc->element); + Datum value = HnswGetValue(base, hce); + + return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, q, value)); } /* * Create a candidate for the entry point */ HnswCandidate * -HnswEntryCandidate(HnswElement entryPoint, Datum q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec) +HnswEntryCandidate(char *base, HnswElement entryPoint, Datum q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec) { HnswCandidate *hc = palloc(sizeof(HnswCandidate)); - hc->element = entryPoint; + HnswPtrStore(base, hc->element, entryPoint); if (index == NULL) - hc->distance = GetCandidateDistance(hc, q, procinfo, collation); + hc->distance = GetCandidateDistance(base, hc, q, procinfo, collation); else - HnswLoadElement(hc->element, &hc->distance, &q, index, procinfo, collation, loadVec); + HnswLoadElement(entryPoint, &hc->distance, &q, index, procinfo, collation, loadVec); return hc; } @@ -600,30 +659,58 @@ CreatePairingHeapNode(HnswCandidate * c) * Add to visited */ static inline void -AddToVisited(visited_hash v, HnswCandidate * hc, Relation index, bool *found) +AddToVisited(char *base, visited_hash v, HnswCandidate * hc, Relation index, bool *found) { - if (index == NULL) + if (index != NULL) + { + HnswElement element = HnswPtrAccess(base, hc->element); + ItemPointerData indextid; + + ItemPointerSet(&indextid, element->blkno, element->offno); + tidhash_insert(v.tids, indextid, found); + } + else if (base != NULL) { #if PG_VERSION_NUM >= 130000 - pointerhash_insert_hash(v.pointers, (uintptr_t) hc->element, hc->element->hash, found); + HnswElement element = HnswPtrAccess(base, hc->element); + + offsethash_insert_hash(v.offsets, HnswPtrOffset(hc->element), element->hash, found); #else - pointerhash_insert(v.pointers, (uintptr_t) hc->element, found); + offsethash_insert(v.offsets, HnswPtrOffset(hc->element), found); #endif } else { - ItemPointerData indextid; +#if PG_VERSION_NUM >= 130000 + HnswElement element = HnswPtrAccess(base, hc->element); - ItemPointerSet(&indextid, hc->element->blkno, hc->element->offno); - tidhash_insert(v.tids, indextid, found); + pointerhash_insert_hash(v.pointers, (uintptr_t) HnswPtrPointer(hc->element), element->hash, found); +#else + pointerhash_insert(v.pointers, (uintptr_t) HnswPtrPointer(hc->element), found); +#endif } } +/* + * Count element towards ef + */ +static inline bool +CountElement(char *base, HnswElement skipElement, HnswCandidate * hc) +{ + if (skipElement == NULL) + return true; + + /* Ensure does not access heaptidsLength during in-memory build */ + pg_memory_barrier(); + + return HnswPtrAccess(base, hc->element)->heaptidsLength != 0; +} + /* * Algorithm 2 from paper */ List * -HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement) +HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement) { List *w = NIL; pairingheap *C = pairingheap_allocate(CompareNearestCandidates, NULL); @@ -631,12 +718,23 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro int wlen = 0; visited_hash v; ListCell *lc2; + HnswNeighborArray *neighborhoodData = NULL; + Size neighborhoodSize; /* Create hash table */ - if (index == NULL) - v.pointers = pointerhash_create(CurrentMemoryContext, ef * m * 2, NULL); - else + if (index != NULL) v.tids = tidhash_create(CurrentMemoryContext, ef * m * 2, NULL); + else if (base != NULL) + v.offsets = offsethash_create(CurrentMemoryContext, ef * m * 2, NULL); + else + v.pointers = pointerhash_create(CurrentMemoryContext, ef * m * 2, NULL); + + /* Create local memory for neighborhood if needed */ + if (index == NULL) + { + neighborhoodSize = offsetof(HnswNeighborArray, items) + sizeof(HnswCandidate) * HnswGetLayerM(m, lc); + neighborhoodData = palloc(neighborhoodSize); + } /* Add entry points to v, C, and W */ foreach(lc2, ep) @@ -644,7 +742,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro HnswCandidate *hc = (HnswCandidate *) lfirst(lc2); bool found; - AddToVisited(v, hc, index, &found); + AddToVisited(base, v, hc, index, &found); pairingheap_add(C, &(CreatePairingHeapNode(hc)->ph_node)); pairingheap_add(W, &(CreatePairingHeapNode(hc)->ph_node)); @@ -654,7 +752,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro * would be ideal to do this for inserts as well, but this could * affect insert performance. */ - if (skipElement == NULL || hc->element->heaptidsLength != 0) + if (CountElement(base, skipElement, hc)) wlen++; } @@ -663,38 +761,51 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro HnswNeighborArray *neighborhood; HnswCandidate *c = ((HnswPairingHeapNode *) pairingheap_remove_first(C))->inner; HnswCandidate *f = ((HnswPairingHeapNode *) pairingheap_first(W))->inner; + HnswElement cElement; if (c->distance > f->distance) break; - if (c->element->neighbors == NULL) - HnswLoadNeighbors(c->element, index, m); + cElement = HnswPtrAccess(base, c->element); + + if (HnswPtrIsNull(base, cElement->neighbors)) + HnswLoadNeighbors(cElement, index, m); /* Get the neighborhood at layer lc */ - neighborhood = HnswGetNeighbors(c->element, lc); + neighborhood = HnswGetNeighbors(base, cElement, lc); + + /* Copy neighborhood to local memory if needed */ + if (index == NULL) + { + SpinLockAcquire(&cElement->lock); + memcpy(neighborhoodData, neighborhood, neighborhoodSize); + SpinLockRelease(&cElement->lock); + neighborhood = neighborhoodData; + } for (int i = 0; i < neighborhood->length; i++) { HnswCandidate *e = &neighborhood->items[i]; bool visited; - AddToVisited(v, e, index, &visited); + AddToVisited(base, v, e, index, &visited); if (!visited) { float eDistance; + HnswElement eElement = HnswPtrAccess(base, e->element); f = ((HnswPairingHeapNode *) pairingheap_first(W))->inner; if (index == NULL) - eDistance = GetCandidateDistance(e, q, procinfo, collation); + eDistance = GetCandidateDistance(base, e, q, procinfo, collation); else - HnswLoadElement(e->element, &eDistance, &q, index, procinfo, collation, inserting); + HnswLoadElement(eElement, &eDistance, &q, index, procinfo, collation, inserting); - Assert(!e->element->deleted); + Assert(!eElement->deleted); /* Make robust to issues */ - if (e->element->level < lc) + if (eElement->level < lc) continue; if (eDistance < f->distance || wlen < ef) @@ -702,7 +813,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro /* Copy e */ HnswCandidate *ec = palloc(sizeof(HnswCandidate)); - ec->element = e->element; + HnswPtrStore(base, ec->element, eElement); ec->distance = eDistance; pairingheap_add(C, &(CreatePairingHeapNode(ec)->ph_node)); @@ -713,7 +824,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro * vacuuming. It would be ideal to do this for inserts as * well, but this could affect insert performance. */ - if (skipElement == NULL || e->element->heaptidsLength != 0) + if (CountElement(base, skipElement, e)) { wlen++; @@ -738,7 +849,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro } /* - * Compare candidate distances + * Compare candidate distances with pointer tie-breaker */ static int #if PG_VERSION_NUM >= 130000 @@ -756,10 +867,38 @@ CompareCandidateDistances(const void *a, const void *b) if (hca->distance > hcb->distance) return -1; - if (hca->element < hcb->element) + if (HnswPtrPointer(hca->element) < HnswPtrPointer(hcb->element)) return 1; - if (hca->element > hcb->element) + if (HnswPtrPointer(hca->element) > HnswPtrPointer(hcb->element)) + return -1; + + return 0; +} + +/* + * Compare candidate distances with offset tie-breaker + */ +static int +#if PG_VERSION_NUM >= 130000 +CompareCandidateDistancesOffset(const ListCell *a, const ListCell *b) +#else +CompareCandidateDistancesOffset(const void *a, const void *b) +#endif +{ + HnswCandidate *hca = lfirst((ListCell *) a); + HnswCandidate *hcb = lfirst((ListCell *) b); + + if (hca->distance < hcb->distance) + return 1; + + if (hca->distance > hcb->distance) + return -1; + + if (HnswPtrOffset(hca->element) < HnswPtrOffset(hcb->element)) + return 1; + + if (HnswPtrOffset(hca->element) > HnswPtrOffset(hcb->element)) return -1; return 0; @@ -769,46 +908,59 @@ CompareCandidateDistances(const void *a, const void *b) * Calculate the distance between elements */ static float -HnswGetDistance(HnswElement a, HnswElement b, int lc, FmgrInfo *procinfo, Oid collation) +HnswGetDistance(char *base, HnswElement a, HnswElement b, int lc, FmgrInfo *procinfo, Oid collation) { + Datum aValue; + Datum bValue; + /* Look for cached distance */ - if (a->neighbors != NULL) + if (!HnswPtrIsNull(base, a->neighbors)) { - HnswNeighborArray *neighbors = HnswGetNeighbors(a, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, a, lc); for (int i = 0; i < neighbors->length; i++) { - if (neighbors->items[i].element == b) + HnswElement element = HnswPtrAccess(base, neighbors->items[i].element); + + if (element == b) return neighbors->items[i].distance; } } - if (b->neighbors != NULL) + if (!HnswPtrIsNull(base, b->neighbors)) { - HnswNeighborArray *neighbors = HnswGetNeighbors(b, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, b, lc); for (int i = 0; i < neighbors->length; i++) { - if (neighbors->items[i].element == a) + HnswElement element = HnswPtrAccess(base, neighbors->items[i].element); + + if (element == a) return neighbors->items[i].distance; } } - return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, a->value, b->value)); + aValue = HnswGetValue(base, a); + bValue = HnswGetValue(base, b); + + return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, aValue, bValue)); } /* * Check if an element is closer to q than any element from R */ static bool -CheckElementCloser(HnswCandidate * e, List *r, int lc, FmgrInfo *procinfo, Oid collation) +CheckElementCloser(char *base, HnswCandidate * e, List *r, int lc, FmgrInfo *procinfo, Oid collation) { + HnswElement eElement = HnswPtrAccess(base, e->element); ListCell *lc2; foreach(lc2, r) { HnswCandidate *ri = lfirst(lc2); - float distance = HnswGetDistance(e->element, ri->element, lc, procinfo, collation); + HnswElement riElement = HnswPtrAccess(base, ri->element); + + float distance = HnswGetDistance(base, eElement, riElement, lc, procinfo, collation); if (distance <= e->distance) return false; @@ -821,12 +973,12 @@ CheckElementCloser(HnswCandidate * e, List *r, int lc, FmgrInfo *procinfo, Oid c * Algorithm 4 from paper */ static List * -SelectNeighbors(List *c, int m, int lc, FmgrInfo *procinfo, Oid collation, HnswElement e2, HnswCandidate * newCandidate, HnswCandidate * *pruned, bool sortCandidates) +SelectNeighbors(char *base, List *c, int m, int lc, FmgrInfo *procinfo, Oid collation, HnswElement e2, HnswCandidate * newCandidate, HnswCandidate * *pruned, bool sortCandidates) { List *r = NIL; List *w = list_copy(c); pairingheap *wd; - HnswNeighborArray *neighbors = HnswGetNeighbors(e2, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, e2, lc); bool mustCalculate = !neighbors->closerSet; List *added = NIL; bool removedAny = false; @@ -838,7 +990,12 @@ SelectNeighbors(List *c, int m, int lc, FmgrInfo *procinfo, Oid collation, HnswE /* Ensure order of candidates is deterministic for closer caching */ if (sortCandidates) - list_sort(w, CompareCandidateDistances); + { + if (base == NULL) + list_sort(w, CompareCandidateDistances); + else + list_sort(w, CompareCandidateDistancesOffset); + } while (list_length(w) > 0 && list_length(r) < m) { @@ -849,7 +1006,7 @@ SelectNeighbors(List *c, int m, int lc, FmgrInfo *procinfo, Oid collation, HnswE /* Use previous state of r and wd to skip work when possible */ if (mustCalculate) - e->closer = CheckElementCloser(e, r, lc, procinfo, collation); + e->closer = CheckElementCloser(base, e, r, lc, procinfo, collation); else if (list_length(added) > 0) { /* @@ -858,7 +1015,7 @@ SelectNeighbors(List *c, int m, int lc, FmgrInfo *procinfo, Oid collation, HnswE */ if (e->closer) { - e->closer = CheckElementCloser(e, added, lc, procinfo, collation); + e->closer = CheckElementCloser(base, e, added, lc, procinfo, collation); if (!e->closer) removedAny = true; @@ -871,7 +1028,7 @@ SelectNeighbors(List *c, int m, int lc, FmgrInfo *procinfo, Oid collation, HnswE */ if (removedAny) { - e->closer = CheckElementCloser(e, r, lc, procinfo, collation); + e->closer = CheckElementCloser(base, e, r, lc, procinfo, collation); if (e->closer) added = lappend(added, e); } @@ -879,7 +1036,7 @@ SelectNeighbors(List *c, int m, int lc, FmgrInfo *procinfo, Oid collation, HnswE } else if (e == newCandidate) { - e->closer = CheckElementCloser(e, r, lc, procinfo, collation); + e->closer = CheckElementCloser(base, e, r, lc, procinfo, collation); if (e->closer) added = lappend(added, e); } @@ -913,10 +1070,10 @@ SelectNeighbors(List *c, int m, int lc, FmgrInfo *procinfo, Oid collation, HnswE * Add connections */ static void -AddConnections(HnswElement element, List *neighbors, int m, int lc) +AddConnections(char *base, HnswElement element, List *neighbors, int m, int lc) { ListCell *lc2; - HnswNeighborArray *a = HnswGetNeighbors(element, lc); + HnswNeighborArray *a = HnswGetNeighbors(base, element, lc); foreach(lc2, neighbors) a->items[a->length++] = *((HnswCandidate *) lfirst(lc2)); @@ -926,13 +1083,13 @@ AddConnections(HnswElement element, List *neighbors, int m, int lc) * Update connections */ void -HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int m, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation) +HnswUpdateConnection(char *base, HnswElement element, HnswCandidate * hc, int m, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation) { - HnswNeighborArray *currentNeighbors = HnswGetNeighbors(hc->element, lc); - + HnswElement hce = HnswPtrAccess(base, hc->element); + HnswNeighborArray *currentNeighbors = HnswGetNeighbors(base, hce, lc); HnswCandidate hc2; - hc2.element = element; + HnswPtrStore(base, hc2.element, element); hc2.distance = hc->distance; if (currentNeighbors->length < m) @@ -951,19 +1108,20 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int m, int lc, int /* Load elements on insert */ if (index != NULL) { - Datum q = hc->element->value; + Datum q = HnswGetValue(base, hce); for (int i = 0; i < currentNeighbors->length; i++) { HnswCandidate *hc3 = ¤tNeighbors->items[i]; + HnswElement hc3Element = HnswPtrAccess(base, hc3->element); - if (DatumGetPointer(hc3->element->value) == NULL) - HnswLoadElement(hc3->element, &hc3->distance, &q, index, procinfo, collation, true); + if (HnswPtrIsNull(base, hc3Element->value)) + HnswLoadElement(hc3Element, &hc3->distance, &q, index, procinfo, collation, true); else - hc3->distance = GetCandidateDistance(hc3, q, procinfo, collation); + hc3->distance = GetCandidateDistance(base, hc3, q, procinfo, collation); /* Prune element if being deleted */ - if (hc3->element->heaptidsLength == 0) + if (hc3Element->heaptidsLength == 0) { pruned = ¤tNeighbors->items[i]; break; @@ -980,7 +1138,7 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int m, int lc, int c = lappend(c, ¤tNeighbors->items[i]); c = lappend(c, &hc2); - SelectNeighbors(c, m, lc, procinfo, collation, hc->element, &hc2, &pruned, true); + SelectNeighbors(base, c, m, lc, procinfo, collation, hce, &hc2, &pruned, true); /* Should not happen */ if (pruned == NULL) @@ -990,7 +1148,7 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int m, int lc, int /* Find and replace the pruned element */ for (int i = 0; i < currentNeighbors->length; i++) { - if (currentNeighbors->items[i].element == pruned->element) + if (HnswPtrEqual(base, currentNeighbors->items[i].element, pruned->element)) { currentNeighbors->items[i] = hc2; @@ -1008,43 +1166,65 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int m, int lc, int * Remove elements being deleted or skipped */ static List * -RemoveElements(List *w, HnswElement skipElement) +RemoveElements(char *base, List *w, HnswElement skipElement) { ListCell *lc2; List *w2 = NIL; + /* Ensure does not access heaptidsLength during in-memory build */ + pg_memory_barrier(); + foreach(lc2, w) { HnswCandidate *hc = (HnswCandidate *) lfirst(lc2); + HnswElement hce = HnswPtrAccess(base, hc->element); /* Skip self for vacuuming update */ - if (skipElement != NULL && hc->element->blkno == skipElement->blkno && hc->element->offno == skipElement->offno) + if (skipElement != NULL && hce->blkno == skipElement->blkno && hce->offno == skipElement->offno) continue; - if (hc->element->heaptidsLength != 0) + if (hce->heaptidsLength != 0) w2 = lappend(w2, hc); } return w2; } +#if PG_VERSION_NUM >= 130000 +/* + * Precompute hash + */ +static void +PrecomputeHash(char *base, HnswElement element) +{ + HnswElementPtr ptr; + + HnswPtrStore(base, ptr, element); + + if (base == NULL) + element->hash = hash_pointer((uintptr_t) HnswPtrPointer(ptr)); + else + element->hash = hash_offset(HnswPtrOffset(ptr)); +} +#endif + /* * Algorithm 1 from paper */ void -HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing) +HnswInsertElement(char *base, HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing) { List *ep; List *w; int level = element->level; int entryLevel; - Datum q = element->value; + Datum q = HnswGetValue(base, element); HnswElement skipElement = existing ? element : NULL; #if PG_VERSION_NUM >= 130000 /* Precompute hash */ if (index == NULL) - element->hash = hash_pointer((uintptr_t) element); + PrecomputeHash(base, element); #endif /* No neighbors if no entry point */ @@ -1052,13 +1232,13 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F return; /* Get entry point and level */ - ep = list_make1(HnswEntryCandidate(entryPoint, q, index, procinfo, collation, true)); + ep = list_make1(HnswEntryCandidate(base, entryPoint, q, index, procinfo, collation, true)); entryLevel = entryPoint->level; /* 1st phase: greedy search to insert level */ for (int lc = entryLevel; lc >= level + 1; lc--) { - w = HnswSearchLayer(q, ep, 1, lc, index, procinfo, collation, m, true, skipElement); + w = HnswSearchLayer(base, q, ep, 1, lc, index, procinfo, collation, m, true, skipElement); ep = w; } @@ -1076,12 +1256,12 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F List *neighbors; List *lw; - w = HnswSearchLayer(q, ep, efConstruction, lc, index, procinfo, collation, m, true, skipElement); + w = HnswSearchLayer(base, q, ep, efConstruction, lc, index, procinfo, collation, m, true, skipElement); /* Elements being deleted or skipped can help with search */ /* but should be removed before selecting neighbors */ if (index != NULL) - lw = RemoveElements(w, skipElement); + lw = RemoveElements(base, w, skipElement); else lw = w; @@ -1090,9 +1270,9 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F * sortCandidates to true for in-memory builds to enable closer * caching, but there does not seem to be a difference in performance. */ - neighbors = SelectNeighbors(lw, lm, lc, procinfo, collation, element, NULL, NULL, false); + neighbors = SelectNeighbors(base, lw, lm, lc, procinfo, collation, element, NULL, NULL, false); - AddConnections(element, neighbors, lm, lc); + AddConnections(base, element, neighbors, lm, lc); ep = w; } diff --git a/src/hnswvacuum.c b/src/hnswvacuum.c index 96ac54c..b55e53d 100644 --- a/src/hnswvacuum.c +++ b/src/hnswvacuum.c @@ -205,15 +205,15 @@ RepairGraphElement(HnswVacuumState * vacuumstate, HnswElement element, HnswEleme return; /* Init fields */ - HnswInitNeighbors(element, m); + HnswInitNeighbors(NULL, element, m, NULL); element->heaptidsLength = 0; /* Add element to graph, skipping itself */ - HnswInsertElement(element, entryPoint, index, procinfo, collation, m, efConstruction, true); + HnswInsertElement(NULL, element, entryPoint, index, procinfo, collation, m, efConstruction, true); /* Update neighbor tuple */ /* Do this before getting page to minimize locking */ - HnswSetNeighborTuple(ntup, element, m); + HnswSetNeighborTuple(NULL, ntup, element, m); /* Get neighbor page */ buf = ReadBufferExtended(index, MAIN_FORKNUM, element->neighborPage, RBM_NORMAL, bas); @@ -301,7 +301,7 @@ RepairGraphEntryPoint(HnswVacuumState * vacuumstate) { /* Reset neighbors from previous update */ if (highestPoint != NULL) - highestPoint->neighbors = NULL; + HnswPtrSetNull(NULL, highestPoint->neighbors); RepairGraphElement(vacuumstate, entryPoint, highestPoint); } diff --git a/test/t/012_hnsw_build_recall.pl b/test/t/012_hnsw_build_recall.pl index 51ef58d..163a472 100644 --- a/test/t/012_hnsw_build_recall.pl +++ b/test/t/012_hnsw_build_recall.pl @@ -95,11 +95,10 @@ for my $i (0 .. $#operators) $node->safe_psql("postgres", "DROP INDEX idx;"); - # Build index in parallel + # Build index in parallel in memory my ($ret, $stdout, $stderr) = $node->psql("postgres", qq( SET client_min_messages = DEBUG; SET min_parallel_table_scan_size = 1; - SET hnsw.enable_parallel_build = on; CREATE INDEX idx ON tst USING hnsw (v $opclass); )); is($ret, 0, $stderr); @@ -109,6 +108,21 @@ for my $i (0 .. $#operators) test_recall($min, $operator); $node->safe_psql("postgres", "DROP INDEX idx;"); + + # Build index in parallel on disk + # Set parallel_workers on table to use workers with low maintenance_work_mem + ($ret, $stdout, $stderr) = $node->psql("postgres", qq( + ALTER TABLE tst SET (parallel_workers = 2); + SET client_min_messages = DEBUG; + SET maintenance_work_mem = '4MB'; + CREATE INDEX idx ON tst USING hnsw (v $opclass); + ALTER TABLE tst RESET (parallel_workers); + )); + is($ret, 0, $stderr); + like($stderr, qr/using \d+ parallel workers/); + like($stderr, qr/hnsw graph no longer fits into maintenance_work_mem/); + + $node->safe_psql("postgres", "DROP INDEX idx;"); } done_testing();