diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a1cea5..e86edc7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## 0.5.2 (unreleased) - Improved performance of HNSW -- Added support for on-disk parallel index builds for HNSW +- Added support for parallel index builds for HNSW - Reduced memory usage for HNSW index builds - Reduced WAL generation for HNSW index builds - Fixed error with logical replication diff --git a/src/hnsw.c b/src/hnsw.c index 1719820..708819a 100644 --- a/src/hnsw.c +++ b/src/hnsw.c @@ -14,15 +14,45 @@ #endif int hnsw_ef_search; -bool hnsw_enable_parallel_build; +int hnsw_lock_tranche_id; static relopt_kind hnsw_relopt_kind; +/* + * Assign a tranche ID for our LWLocks. This only needs to be done by one + * backend, as the tranche ID is remembered in shared memory. + * + * This shared memory area is very small, so we just allocate it from the + * "slop" that PostgreSQL reserves for small allocations like this. If + * this grows bigger, we should use a shmem_request_hook and + * RequestAddinShmemSpace() to pre-reserve space for this. + */ +static void +HnswInitLockTranche(void) +{ + int *tranche_ids; + bool found; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + tranche_ids = ShmemInitStruct("hnsw LWLock ids", + sizeof(int) * 1, + &found); + if (!found) + tranche_ids[0] = LWLockNewTrancheId(); + hnsw_lock_tranche_id = tranche_ids[0]; + LWLockRelease(AddinShmemInitLock); + + /* Per-backend registration of the tranche ID */ + LWLockRegisterTranche(hnsw_lock_tranche_id, "HnswBuild"); +} + /* * Initialize index options and variables */ void HnswInit(void) { + HnswInitLockTranche(); + hnsw_relopt_kind = add_reloption_kind(); add_int_reloption(hnsw_relopt_kind, "m", "Max number of connections", HNSW_DEFAULT_M, HNSW_MIN_M, HNSW_MAX_M @@ -40,11 +70,6 @@ HnswInit(void) DefineCustomIntVariable("hnsw.ef_search", "Sets the size of the dynamic candidate list for search", "Valid range is 1..1000.", &hnsw_ef_search, HNSW_DEFAULT_EF_SEARCH, HNSW_MIN_EF_SEARCH, HNSW_MAX_EF_SEARCH, PGC_USERSET, 0, NULL, NULL, NULL); - - /* Behind a variable for now since can be slower than building in memory */ - DefineCustomBoolVariable("hnsw.enable_parallel_build", "Enables or disables building indexes in parallel", - NULL, &hnsw_enable_parallel_build, - false, PGC_USERSET, 0, NULL, NULL, NULL); } /* diff --git a/src/hnsw.h b/src/hnsw.h index 2228664..b3281fe 100644 --- a/src/hnsw.h +++ b/src/hnsw.h @@ -6,9 +6,9 @@ #include "access/generic_xlog.h" #include "access/parallel.h" #include "access/reloptions.h" -#include "lib/ilist.h" #include "nodes/execnodes.h" #include "port.h" /* for random() */ +#include "utils/relptr.h" #include "utils/sampling.h" #include "vector.h" @@ -96,33 +96,62 @@ /* Ensure fits on page and in uint8 */ #define HnswGetMaxLevel(m) Min(((BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(HnswPageOpaqueData)) - offsetof(HnswNeighborTupleData, indextids) - sizeof(ItemIdData)) / (sizeof(ItemPointerData)) / (m)) - 2, 255) -#define HnswGetNeighbors(element, lc) (AssertMacro((element)->level >= (lc)), &(element)->neighbors[lc]) +#define HnswGetValue(base, element) PointerGetDatum(HnswPtrAccess(base, (element)->value)) + +#if PG_VERSION_NUM < 140005 +#define relptr_offset(rp) ((rp).relptr_off - 1) +#endif + +/* Pointer macros */ +#define HnswPtrAccess(base, hp) ((base) == NULL ? (hp).ptr : relptr_access(base, (hp).relptr)) +#define HnswPtrStore(base, hp, value) ((base) == NULL ? (void) ((hp).ptr = (value)) : (void) relptr_store(base, (hp).relptr, value)) +#define HnswPtrIsNull(base, hp) ((base) == NULL ? (hp).ptr == NULL : relptr_is_null((hp).relptr)) +#define HnswPtrEqual(base, hp1, hp2) ((base) == NULL ? (hp1).ptr == (hp2).ptr : relptr_offset((hp1).relptr) == relptr_offset((hp2).relptr)) + +/* For code paths dedicated to each type */ +#define HnswPtrPointer(hp) (hp).ptr +#define HnswPtrOffset(hp) relptr_offset((hp).relptr) /* Variables */ extern int hnsw_ef_search; -extern bool hnsw_enable_parallel_build; +extern int hnsw_lock_tranche_id; + +typedef struct HnswElementData HnswElementData; +typedef struct HnswNeighborArray HnswNeighborArray; + +#define HnswPtrDeclare(type, relptrtype, ptrtype) \ + relptr_declare(type, relptrtype); \ + typedef union { type *ptr; relptrtype relptr; } ptrtype; + +/* Pointers that can be absolute or relative */ +/* Use char for DatumPtr so works with Pointer */ +HnswPtrDeclare(HnswElementData, HnswElementRelptr, HnswElementPtr); +HnswPtrDeclare(HnswNeighborArray, HnswNeighborArrayRelptr, HnswNeighborArrayPtr); +HnswPtrDeclare(HnswNeighborArrayPtr, HnswNeighborsRelptr, HnswNeighborsPtr); +HnswPtrDeclare(char, DatumRelptr, DatumPtr); typedef struct HnswElementData { - slist_node next; + HnswElementPtr next; ItemPointerData heaptids[HNSW_HEAPTIDS]; uint8 heaptidsLength; uint8 level; uint8 deleted; uint32 hash; - struct HnswNeighborArray *neighbors; + HnswNeighborsPtr neighbors; BlockNumber blkno; OffsetNumber offno; OffsetNumber neighborOffno; BlockNumber neighborPage; - Datum value; + DatumPtr value; + LWLock lock; } HnswElementData; typedef HnswElementData * HnswElement; typedef struct HnswCandidate { - HnswElement element; + HnswElementPtr element; float distance; bool closer; } HnswCandidate; @@ -131,7 +160,7 @@ typedef struct HnswNeighborArray { int length; bool closerSet; - HnswCandidate *items; + HnswCandidate items[FLEXIBLE_ARRAY_MEMBER]; } HnswNeighborArray; typedef struct HnswPairingHeapNode @@ -150,12 +179,23 @@ typedef struct HnswOptions typedef struct HnswGraph { - slist_head elements; - HnswElement entryPoint; + /* Graph state */ + slock_t lock; + HnswElementPtr head; + double indtuples; + + /* Entry state */ + LWLock entryLock; + HnswElementPtr entryPoint; + + /* Allocations state */ + LWLock allocatorLock; long memoryUsed; long memoryTotal; + + /* Flushed state */ + LWLock flushLock; bool flushed; - double indtuples; } HnswGraph; typedef struct HnswShared @@ -192,8 +232,15 @@ typedef struct HnswLeader int nparticipanttuplesorts; HnswShared *hnswshared; Snapshot snapshot; + char *hnswarea; } HnswLeader; +typedef struct HnswAllocator +{ + void *(*alloc) (Size size, void *state); + void *state; +} HnswAllocator; + typedef struct HnswBuildState { /* Info */ @@ -226,10 +273,12 @@ typedef struct HnswBuildState /* Memory */ MemoryContext graphCtx; MemoryContext tmpCtx; + HnswAllocator allocator; /* Parallel builds */ HnswLeader *hnswleader; HnswShared *hnswshared; + char *hnswarea; } HnswBuildState; typedef struct HnswMetaPageData @@ -328,23 +377,24 @@ bool HnswNormValue(FmgrInfo *procinfo, Oid collation, Datum *value, Vector * re Buffer HnswNewBuffer(Relation index, ForkNumber forkNum); void HnswInitPage(Buffer buf, Page page); void HnswInit(void); -List *HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement); +List *HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement); HnswElement HnswGetEntryPoint(Relation index); void HnswGetMetaPageInfo(Relation index, int *m, HnswElement * entryPoint); -HnswElement HnswInitElement(ItemPointer tid, int m, double ml, int maxLevel); +void *HnswAlloc(HnswAllocator * allocator, Size size); +HnswElement HnswInitElement(char *base, ItemPointer tid, int m, double ml, int maxLevel, HnswAllocator * alloc); HnswElement HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno); -void HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing); -HnswCandidate *HnswEntryCandidate(HnswElement em, Datum q, Relation rel, FmgrInfo *procinfo, Oid collation, bool loadVec); +void HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing); +HnswCandidate *HnswEntryCandidate(char *base, HnswElement em, Datum q, Relation rel, FmgrInfo *procinfo, Oid collation, bool loadVec); void HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum, bool building); -void HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m); +void HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m); void HnswAddHeapTid(HnswElement element, ItemPointer heaptid); -void HnswInitNeighbors(HnswElement element, int m); -bool HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building); -void HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building); +void HnswInitNeighbors(char *base, HnswElement element, int m, HnswAllocator * alloc); +bool HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building); +void HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building); void HnswLoadElementFromTuple(HnswElement element, HnswElementTuple etup, bool loadHeaptids, bool loadVec); void HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec); -void HnswSetElementTuple(HnswElementTuple etup, HnswElement element); -void HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation); +void HnswSetElementTuple(char *base, HnswElementTuple etup, HnswElement element); +void HnswUpdateConnection(char *base, HnswElement element, HnswCandidate * hc, int lm, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation); void HnswLoadNeighbors(HnswElement element, Relation index, int m); PGDLLEXPORT void HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc); @@ -364,6 +414,16 @@ void hnswrescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, bool hnswgettuple(IndexScanDesc scan, ScanDirection dir); void hnswendscan(IndexScanDesc scan); +static inline HnswNeighborArray * +HnswGetNeighbors(char *base, HnswElement element, int lc) +{ + HnswNeighborArrayPtr *neighborList = HnswPtrAccess(base, element->neighbors); + + Assert(element->level >= lc); + + return HnswPtrAccess(base, neighborList[lc]); +} + /* Hash tables */ typedef struct TidHashEntry { @@ -391,4 +451,17 @@ typedef struct PointerHashEntry #define SH_DECLARE #include "lib/simplehash.h" +typedef struct OffsetHashEntry +{ + Size offset; + char status; +} OffsetHashEntry; + +#define SH_PREFIX offsethash +#define SH_ELEMENT_TYPE OffsetHashEntry +#define SH_KEY_TYPE Size +#define SH_SCOPE extern +#define SH_DECLARE +#include "lib/simplehash.h" + #endif diff --git a/src/hnswbuild.c b/src/hnswbuild.c index fa8e650..ef08d5d 100644 --- a/src/hnswbuild.c +++ b/src/hnswbuild.c @@ -1,3 +1,39 @@ +/* + * The HNSW build happens in two phases: + * + * 1. In-memory phase + * + * In this first phase, the graph is held completely in memory. When the graph + * is fully built, or we run out of memory reserved for the build (determined + * by maintenance_work_mem), we materialize the graph to disk (see + * FlushPages()), and switch to the on-disk phase. + * + * In a parallel build, a large contiguous chunk of shared memory is allocated + * to hold the graph. Each worker process has its own HnswBuildState struct in + * private memory, which contains information that doesn't change throughout + * the build, and pointers to the shared structs in shared memory. The shared + * memory area is mapped to a different address in each worker process, and + * 'HnswBuildState.hnswarea' points to the beginning of the shared area in the + * worker process's address space. All pointers used in the graph are + * "relative pointers", stored as an offset from 'hnswarea'. + * + * Each element is protected by an LWLock. It must be held when reading or + * modifying the element's neighbors or 'heaptids'. + * + * In a non-parallel build, the graph is held in backend-private memory. All + * the elements are allocated in a dedicated memory context, 'graphCtx', and + * the pointers used in the graph are regular pointers. + * + * 2. On-disk phase + * + * In the on-disk phase, the index is built by inserting each vector to the + * index one by one, just like on INSERT. The only difference is that we don't + * WAL-log the individual inserts. If the graph fit completely in memory and + * was fully built in the in-memory phase, the on-disk phase is skipped. + * + * After we have finished building the graph, we perform one more scan through + * the index and write all the pages to the WAL. + */ #include "postgres.h" #include @@ -54,7 +90,8 @@ #endif #define PARALLEL_KEY_HNSW_SHARED UINT64CONST(0xA000000000000001) -#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000002) +#define PARALLEL_KEY_HNSW_AREA UINT64CONST(0xA000000000000002) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000003) #if PG_VERSION_NUM < 130000 #define GENERATIONCHUNK_RAWSIZE (SIZEOF_SIZE_T + SIZEOF_VOID_P * 2) @@ -135,9 +172,11 @@ CreateElementPages(HnswBuildState * buildstate) HnswElementTuple etup; HnswNeighborTuple ntup; BlockNumber insertPage; + HnswElement entryPoint; Buffer buf; Page page; - slist_iter iter; + HnswElementPtr iter = buildstate->graph->head; + char *base = buildstate->hnswarea; /* Calculate sizes */ etupAllocSize = BLCKSZ; @@ -152,18 +191,22 @@ CreateElementPages(HnswBuildState * buildstate) page = BufferGetPage(buf); HnswInitPage(buf, page); - slist_foreach(iter, &buildstate->graph->elements) + while (!HnswPtrIsNull(base, iter)) { - HnswElement element = slist_container(HnswElementData, next, iter.cur); + HnswElement element = HnswPtrAccess(base, iter); Size etupSize; Size ntupSize; Size combinedSize; + void *valuePtr = HnswPtrAccess(base, element->value); + + /* Update iterator */ + iter = element->next; /* Zero memory for each element */ MemSet(etup, 0, etupAllocSize); /* Calculate sizes */ - etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(DatumGetPointer(element->value))); + etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(valuePtr)); ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(element->level, buildstate->m); combinedSize = etupSize + ntupSize + sizeof(ItemIdData); @@ -171,7 +214,7 @@ CreateElementPages(HnswBuildState * buildstate) if (etupSize > etupAllocSize) elog(ERROR, "index tuple too large"); - HnswSetElementTuple(etup, element); + HnswSetElementTuple(base, etup, element); /* Keep element and neighbors on the same page if possible */ if (PageGetFreeSpace(page) < etupSize || (combinedSize <= maxSize && PageGetFreeSpace(page) < combinedSize)) @@ -212,7 +255,8 @@ CreateElementPages(HnswBuildState * buildstate) MarkBufferDirty(buf); UnlockReleaseBuffer(buf); - HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, buildstate->graph->entryPoint, insertPage, forkNum, true); + entryPoint = HnswPtrAccess(base, buildstate->graph->entryPoint); + HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, entryPoint, insertPage, forkNum, true); pfree(etup); pfree(ntup); @@ -227,19 +271,23 @@ CreateNeighborPages(HnswBuildState * buildstate) Relation index = buildstate->index; ForkNumber forkNum = buildstate->forkNum; int m = buildstate->m; - slist_iter iter; + HnswElementPtr iter = buildstate->graph->head; + char *base = buildstate->hnswarea; HnswNeighborTuple ntup; /* Allocate once */ ntup = palloc0(BLCKSZ); - slist_foreach(iter, &buildstate->graph->elements) + while (!HnswPtrIsNull(base, iter)) { - HnswElement e = slist_container(HnswElementData, next, iter.cur); + HnswElement e = HnswPtrAccess(base, iter); Buffer buf; Page page; Size ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(e->level, m); + /* Update iterator */ + iter = e->next; + /* Can take a while, so ensure we can interrupt */ /* Needs to be called when no buffer locks are held */ CHECK_FOR_INTERRUPTS(); @@ -248,7 +296,7 @@ CreateNeighborPages(HnswBuildState * buildstate) LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buf); - HnswSetNeighborTuple(ntup, e, m); + HnswSetNeighborTuple(base, ntup, e, m); if (!PageIndexTupleOverwrite(page, e->neighborOffno, (Item) ntup, ntupSize)) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index)); @@ -261,24 +309,6 @@ CreateNeighborPages(HnswBuildState * buildstate) pfree(ntup); } -#ifdef HNSW_MEMORY -/* - * Show memory usage - */ -static void -ShowMemoryUsage(HnswBuildState * buildstate) -{ -#if PG_VERSION_NUM >= 130000 - elog(INFO, "graph memory: %zu MB, total memory: %zu MB", - MemoryContextMemAllocated(buildstate->graphCtx, false) / (1024 * 1024), - MemoryContextMemAllocated(CurrentMemoryContext, true) / (1024 * 1024)); -#else - MemoryContextStats(CurrentMemoryContext); - elog(INFO, "estimated memory: %zu MB", buildstate->memoryUsed / (1024 * 1024)); -#endif -} -#endif - /* * Flush pages */ @@ -286,7 +316,7 @@ static void FlushPages(HnswBuildState * buildstate) { #ifdef HNSW_MEMORY - ShowMemoryUsage(buildstate); + elog(INFO, "memory: %zu MB", buildstate->graph->memoryUsed / (1024 * 1024)); #endif CreateMetaPage(buildstate); @@ -297,66 +327,172 @@ FlushPages(HnswBuildState * buildstate) MemoryContextReset(buildstate->graphCtx); } -#if PG_VERSION_NUM < 130000 /* - * Get the memory used by an element + * Add a heap TID to an existing element */ -static long -HnswElementMemory(HnswElement e, int m) +static bool +AddDuplicateInMemory(HnswElement element, HnswElement dup) { - long elementSize = sizeof(HnswElementData); + LWLockAcquire(&dup->lock, LW_EXCLUSIVE); - elementSize += sizeof(HnswNeighborArray) * (e->level + 1); - elementSize += sizeof(HnswCandidate) * (m * (e->level + 2)); - elementSize += VARSIZE_ANY(DatumGetPointer(e->value)); - /* Each allocation has a chunk header */ - elementSize += (e->level + 4) * GENERATIONCHUNK_RAWSIZE; - /* Add an extra 5% for alignment and other overhead */ - return elementSize * 1.05; + if (dup->heaptidsLength == HNSW_HEAPTIDS) + { + LWLockRelease(&dup->lock); + return false; + } + + HnswAddHeapTid(dup, &element->heaptids[0]); + + LWLockRelease(&dup->lock); + + return true; } -#endif /* * Find duplicate element */ static bool -HnswFindDuplicateInMemory(HnswElement element) +FindDuplicateInMemory(char *base, HnswElement element) { - HnswNeighborArray *neighbors = HnswGetNeighbors(element, 0); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0); + Datum value = HnswGetValue(base, element); for (int i = 0; i < neighbors->length; i++) { HnswCandidate *neighbor = &neighbors->items[i]; + HnswElement neighborElement = HnswPtrAccess(base, neighbor->element); + Datum neighborValue = HnswGetValue(base, neighborElement); /* Exit early since ordered by distance */ - if (!datumIsEqual(element->value, neighbor->element->value, false, -1)) + if (!datumIsEqual(value, neighborValue, false, -1)) return false; /* Check for space */ - if (neighbor->element->heaptidsLength < HNSW_HEAPTIDS) - { - HnswAddHeapTid(neighbor->element, &element->heaptids[0]); + if (AddDuplicateInMemory(element, neighborElement)) return true; - } } return false; } /* - * Insert tuple into in-memory graph + * Add to element list */ -static bool -InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuildState * buildstate) +static void +AddElementInMemory(char *base, HnswGraph * graph, HnswElement element) +{ + SpinLockAcquire(&graph->lock); + element->next = graph->head; + HnswPtrStore(base, graph->head, element); + SpinLockRelease(&graph->lock); +} + +/* + * Update neighbors + */ +static void +UpdateNeighborsInMemory(char *base, FmgrInfo *procinfo, Oid collation, HnswElement e, int m) +{ + for (int lc = e->level; lc >= 0; lc--) + { + int lm = HnswGetLayerM(m, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc); + + for (int i = 0; i < neighbors->length; i++) + { + HnswCandidate *hc = &neighbors->items[i]; + HnswElement neighborElement = HnswPtrAccess(base, hc->element); + + /* Keep scan-build happy on Mac x86-64 */ + Assert(neighborElement); + + /* Use element for lock instead of hc since hc can be replaced */ + LWLockAcquire(&neighborElement->lock, LW_EXCLUSIVE); + HnswUpdateConnection(base, e, hc, lm, lc, NULL, NULL, procinfo, collation); + LWLockRelease(&neighborElement->lock); + } + } +} + +/* + * Update graph in memory + */ +static void +UpdateGraphInMemory(FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement entryPoint, HnswBuildState * buildstate) +{ + HnswGraph *graph = buildstate->graph; + char *base = buildstate->hnswarea; + + /* Look for duplicate */ + if (FindDuplicateInMemory(base, element)) + return; + + /* Add element */ + AddElementInMemory(base, graph, element); + + /* Update neighbors */ + UpdateNeighborsInMemory(base, procinfo, collation, element, m); + + /* Update entry point if needed (already have lock) */ + if (entryPoint == NULL || element->level > entryPoint->level) + HnswPtrStore(base, graph->entryPoint, element); +} + +/* + * Insert tuple in memory + */ +static void +InsertTupleInMemory(HnswBuildState * buildstate, HnswElement element) { FmgrInfo *procinfo = buildstate->procinfo; Oid collation = buildstate->collation; HnswGraph *graph = buildstate->graph; - HnswElement entryPoint = graph->entryPoint; + HnswElement entryPoint; + LWLock *entryLock = &graph->entryLock; int efConstruction = buildstate->efConstruction; int m = buildstate->m; - MemoryContext oldCtx; + char *base = buildstate->hnswarea; + + /* Get entry point */ + LWLockAcquire(entryLock, LW_SHARED); + entryPoint = HnswPtrAccess(base, graph->entryPoint); + + /* Prevent concurrent inserts when likely updating entry point */ + if (entryPoint == NULL || element->level > entryPoint->level) + { + /* Release shared lock */ + LWLockRelease(entryLock); + + /* Get exclusive lock */ + LWLockAcquire(entryLock, LW_EXCLUSIVE); + + /* Get latest entry point after lock is acquired */ + entryPoint = HnswPtrAccess(base, graph->entryPoint); + } + + /* Find neighbors for element */ + HnswFindElementNeighbors(base, element, entryPoint, NULL, procinfo, collation, m, efConstruction, false); + + /* Update graph in memory */ + UpdateGraphInMemory(procinfo, collation, element, m, efConstruction, entryPoint, buildstate); + + /* Release entry lock */ + LWLockRelease(entryLock); +} + +/* + * Insert tuple + */ +static bool +InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, HnswBuildState * buildstate) +{ + HnswGraph *graph = buildstate->graph; HnswElement element; + HnswAllocator *allocator = &buildstate->allocator; + Size valueSize; + Pointer valuePtr; + LWLock *flushLock = &graph->flushLock; + char *base = buildstate->hnswarea; /* Detoast once for all calls */ Datum value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); @@ -364,73 +500,83 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil /* Normalize if needed */ if (buildstate->normprocinfo != NULL) { - if (!HnswNormValue(buildstate->normprocinfo, collation, &value, buildstate->normvec)) + if (!HnswNormValue(buildstate->normprocinfo, buildstate->collation, &value, buildstate->normvec)) return false; } - /* Allocate element in graph memory context */ - oldCtx = MemoryContextSwitchTo(buildstate->graphCtx); - element = HnswInitElement(heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel); - element->value = datumCopy(value, false, -1); - MemoryContextSwitchTo(oldCtx); + /* Get datum size */ + valueSize = VARSIZE_ANY(DatumGetPointer(value)); - /* Update memory usage */ -#if PG_VERSION_NUM >= 130000 - graph->memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false); -#else - graph->memoryUsed += HnswElementMemory(element, buildstate->m); -#endif + /* Ensure graph not flushed when inserting */ + LWLockAcquire(flushLock, LW_SHARED); - /* Insert element in graph */ - HnswInsertElement(element, entryPoint, NULL, procinfo, collation, m, efConstruction, false); - - /* Look for duplicate */ - if (HnswFindDuplicateInMemory(element)) + /* Are we in the on-disk phase? */ + if (graph->flushed) { - /* No need to free element since memory unlikely to be reallocated */ - return true; + LWLockRelease(flushLock); + + return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, buildstate->heap, true); } - /* Add element */ - slist_push_head(&graph->elements, &element->next); + /* + * In a parallel build, the HnswElement is allocated from the shared + * memory area, so we need to coordinate with other processes. + */ + LWLockAcquire(&graph->allocatorLock, LW_EXCLUSIVE); - /* Update neighbors */ - for (int lc = element->level; lc >= 0; lc--) + /* + * Check that we have enough memory available for the new element now that + * we have the allocator lock, and flush pages if needed. + */ + if (graph->memoryUsed >= graph->memoryTotal) { - int lm = HnswGetLayerM(m, lc); - HnswNeighborArray *neighbors = HnswGetNeighbors(element, lc); + LWLockRelease(&graph->allocatorLock); - for (int i = 0; i < neighbors->length; i++) - HnswUpdateConnection(element, &neighbors->items[i], lm, lc, NULL, NULL, procinfo, collation); + LWLockRelease(flushLock); + LWLockAcquire(flushLock, LW_EXCLUSIVE); + + if (!graph->flushed) + { + ereport(NOTICE, + (errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) graph->indtuples), + errdetail("Building will take significantly more time."), + errhint("Increase maintenance_work_mem to speed up builds."))); + + FlushPages(buildstate); + } + + LWLockRelease(flushLock); + + return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, buildstate->heap, true); } - /* Update entry point if needed */ - if (entryPoint == NULL || element->level > entryPoint->level) - graph->entryPoint = element; + /* Ok, we can proceed to allocate the element */ + element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator); + valuePtr = HnswAlloc(allocator, valueSize); + + /* + * We have now allocated the space needed for the element, so we don't + * need the allocator lock anymore. Release it and initialize the rest of + * the element. + */ + LWLockRelease(&graph->allocatorLock); + + /* Copy the datum */ + memcpy(valuePtr, DatumGetPointer(value), valueSize); + HnswPtrStore(base, element->value, valuePtr); + + /* Create a lock for the element */ + LWLockInitialize(&element->lock, hnsw_lock_tranche_id); + + /* Insert tuple */ + InsertTupleInMemory(buildstate, element); + + /* Release flush lock */ + LWLockRelease(flushLock); return true; } -/* - * Acquire a lock if needed - */ -static inline void -HnswLockAcquire(HnswShared * hnswshared) -{ - if (hnswshared) - SpinLockAcquire(&hnswshared->mutex); -} - -/* - * Release a lock if needed - */ -static inline void -HnswLockRelease(HnswShared * hnswshared) -{ - if (hnswshared) - SpinLockRelease(&hnswshared->mutex); -} - /* * Callback for table_index_build_scan */ @@ -440,9 +586,7 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, { HnswBuildState *buildstate = (HnswBuildState *) state; HnswGraph *graph = buildstate->graph; - HnswShared *hnswshared = buildstate->hnswshared; MemoryContext oldCtx; - bool inserted; #if PG_VERSION_NUM < 130000 ItemPointer tid = &hup->t_self; @@ -452,31 +596,16 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, if (isnull[0]) return; - /* Flush pages if needed */ - if (!graph->flushed && graph->memoryUsed >= graph->memoryTotal) - { - ereport(NOTICE, - (errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) graph->indtuples), - errdetail("Building will take significantly more time."), - errhint("Increase maintenance_work_mem to speed up builds."))); - - FlushPages(buildstate); - } - + /* Use memory context */ oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); /* Insert tuple */ - if (graph->flushed) - inserted = HnswInsertTuple(index, values, isnull, tid, buildstate->heap, true); - else - inserted = InsertTupleInMemory(index, values, tid, buildstate); - - /* Update progress */ - if (inserted) + if (InsertTuple(index, values, isnull, tid, buildstate)) { - HnswLockAcquire(hnswshared); + /* Update progress */ + SpinLockAcquire(&graph->lock); UpdateProgress(PROGRESS_CREATEIDX_TUPLES_DONE, ++graph->indtuples); - HnswLockRelease(hnswshared); + SpinLockRelease(&graph->lock); } /* Reset memory context */ @@ -488,14 +617,59 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, * Initialize the graph */ static void -InitGraph(HnswGraph * graph) +InitGraph(HnswGraph * graph, char *base, long memoryTotal) { - slist_init(&graph->elements); - graph->entryPoint = NULL; + HnswPtrStore(base, graph->head, (HnswElement) NULL); + HnswPtrStore(base, graph->entryPoint, (HnswElement) NULL); graph->memoryUsed = 0; - graph->memoryTotal = maintenance_work_mem * 1024L; + graph->memoryTotal = memoryTotal; graph->flushed = false; graph->indtuples = 0; + SpinLockInit(&graph->lock); + LWLockInitialize(&graph->entryLock, hnsw_lock_tranche_id); + LWLockInitialize(&graph->allocatorLock, hnsw_lock_tranche_id); + LWLockInitialize(&graph->flushLock, hnsw_lock_tranche_id); +} + +/* + * Initialize an allocator + */ +static void +InitAllocator(HnswAllocator * allocator, void *(*alloc) (Size size, void *state), void *state) +{ + allocator->alloc = alloc; + allocator->state = state; +} + +/* + * Memory context allocator + */ +static void * +HnswMemoryContextAlloc(Size size, void *state) +{ + HnswBuildState *buildstate = (HnswBuildState *) state; + void *chunk = MemoryContextAlloc(buildstate->graphCtx, size); + +#if PG_VERSION_NUM >= 130000 + buildstate->graphData.memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false); +#else + buildstate->graphData.memoryUsed += MAXALIGN(size); +#endif + + return chunk; +} + +/* + * Shared memory allocator + */ +static void * +HnswSharedMemoryAlloc(Size size, void *state) +{ + HnswBuildState *buildstate = (HnswBuildState *) state; + void *chunk = buildstate->hnswarea + buildstate->graph->memoryUsed; + + buildstate->graph->memoryUsed += MAXALIGN(size); + return chunk; } /* @@ -531,7 +705,7 @@ InitBuildState(HnswBuildState * buildstate, Relation heap, Relation index, Index buildstate->normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); buildstate->collation = index->rd_indcollation[0]; - InitGraph(&buildstate->graphData); + InitGraph(&buildstate->graphData, NULL, maintenance_work_mem * 1024L); buildstate->graph = &buildstate->graphData; buildstate->ml = HnswGetMl(buildstate->m); buildstate->maxLevel = HnswGetMaxLevel(buildstate->m); @@ -549,8 +723,11 @@ InitBuildState(HnswBuildState * buildstate, Relation heap, Relation index, Index "Hnsw build temporary context", ALLOCSET_DEFAULT_SIZES); + InitAllocator(&buildstate->allocator, &HnswMemoryContextAlloc, buildstate); + buildstate->hnswleader = NULL; buildstate->hnswshared = NULL; + buildstate->hnswarea = NULL; } /* @@ -581,6 +758,7 @@ ParallelHeapScan(HnswBuildState * buildstate) if (hnswshared->nparticipantsdone == nparticipanttuplesorts) { buildstate->graph = &hnswshared->graphData; + buildstate->hnswarea = buildstate->hnswleader->hnswarea; reltuples = hnswshared->reltuples; SpinLockRelease(&hnswshared->mutex); break; @@ -600,7 +778,7 @@ ParallelHeapScan(HnswBuildState * buildstate) * Perform a worker's portion of a parallel insert */ static void -HnswParallelScanAndInsert(Relation heapRel, Relation indexRel, HnswShared * hnswshared, bool progress) +HnswParallelScanAndInsert(Relation heapRel, Relation indexRel, HnswShared * hnswshared, char *hnswarea, bool progress) { HnswBuildState buildstate; #if PG_VERSION_NUM >= 120000 @@ -616,7 +794,8 @@ HnswParallelScanAndInsert(Relation heapRel, Relation indexRel, HnswShared * hnsw indexInfo->ii_Concurrent = hnswshared->isconcurrent; InitBuildState(&buildstate, heapRel, indexRel, indexInfo, MAIN_FORKNUM); buildstate.graph = &hnswshared->graphData; - buildstate.hnswshared = hnswshared; + buildstate.hnswarea = hnswarea; + InitAllocator(&buildstate.allocator, &HnswSharedMemoryAlloc, &buildstate); #if PG_VERSION_NUM >= 120000 scan = table_beginscan_parallel(heapRel, ParallelTableScanFromHnswShared(hnswshared)); @@ -656,6 +835,7 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc) { char *sharedquery; HnswShared *hnswshared; + char *hnswarea; Relation heapRel; Relation indexRel; LOCKMODE heapLockmode; @@ -691,8 +871,10 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc) #endif indexRel = index_open(hnswshared->indexrelid, indexLockmode); + hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false); + /* Perform inserts */ - HnswParallelScanAndInsert(heapRel, indexRel, hnswshared, false); + HnswParallelScanAndInsert(heapRel, indexRel, hnswshared, hnswarea, false); /* Close relations within worker */ index_close(indexRel, indexLockmode); @@ -749,7 +931,7 @@ HnswLeaderParticipateAsWorker(HnswBuildState * buildstate) HnswLeader *hnswleader = buildstate->hnswleader; /* Perform work common to all participants */ - HnswParallelScanAndInsert(buildstate->heap, buildstate->index, hnswleader->hnswshared, true); + HnswParallelScanAndInsert(buildstate->heap, buildstate->index, hnswleader->hnswshared, hnswleader->hnswarea, true); } /* @@ -761,7 +943,10 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) ParallelContext *pcxt; Snapshot snapshot; Size esthnswshared; + Size esthnswarea; + Size estother; HnswShared *hnswshared; + char *hnswarea; HnswLeader *hnswleader = (HnswLeader *) palloc0(sizeof(HnswLeader)); bool leaderparticipates = true; int querylen; @@ -788,7 +973,17 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) /* Estimate size of workspaces */ esthnswshared = ParallelEstimateShared(buildstate->heap, snapshot); shm_toc_estimate_chunk(&pcxt->estimator, esthnswshared); - shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Leave space for other objects in shared memory */ + /* Docker has a default limit of 64 MB for shm_size */ + /* which happens to be the default value of maintenance_work_mem */ + esthnswarea = maintenance_work_mem * 1024L; + estother = 2 * 1024 * 1024; + if (esthnswarea > estother) + esthnswarea -= estother; + + shm_toc_estimate_chunk(&pcxt->estimator, esthnswarea); + shm_toc_estimate_keys(&pcxt->estimator, 2); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -824,10 +1019,6 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) /* Initialize mutable state */ hnswshared->nparticipantsdone = 0; hnswshared->reltuples = 0; - InitGraph(&hnswshared->graphData); - /* TODO Support in-memory builds */ - hnswshared->graphData.memoryTotal = 0; - hnswshared->graphData.flushed = true; #if PG_VERSION_NUM >= 120000 table_parallelscan_initialize(buildstate->heap, ParallelTableScanFromHnswShared(hnswshared), @@ -836,7 +1027,12 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) heap_parallelscan_initialize(&hnswshared->heapdesc, buildstate->heap, snapshot); #endif + hnswarea = (char *) shm_toc_allocate(pcxt->toc, esthnswarea); + /* Report less than allocated so never fails */ + InitGraph(&hnswshared->graphData, hnswarea, esthnswarea - 1024 * 1024); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_SHARED, hnswshared); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_AREA, hnswarea); /* Store query string for workers */ if (debug_query_string) @@ -856,6 +1052,7 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) hnswleader->nparticipanttuplesorts++; hnswleader->hnswshared = hnswshared; hnswleader->snapshot = snapshot; + hnswleader->hnswarea = hnswarea; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -910,16 +1107,12 @@ BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum) UpdateProgress(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_HNSW_PHASE_LOAD); /* Calculate parallel workers */ - if (buildstate->heap != NULL && hnsw_enable_parallel_build) + if (buildstate->heap != NULL) parallel_workers = ComputeParallelWorkers(buildstate->heap, buildstate->index); /* Attempt to launch parallel worker scan when required */ if (parallel_workers > 0) - { - /* TODO Support in-memory builds */ - FlushPages(buildstate); HnswBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers); - } /* Add tuples to graph */ if (buildstate->heap != NULL) diff --git a/src/hnswinsert.c b/src/hnswinsert.c index 55c29fb..7d43230 100644 --- a/src/hnswinsert.c +++ b/src/hnswinsert.c @@ -116,7 +116,7 @@ HnswInsertAppendPage(Relation index, Buffer *nbuf, Page *npage, GenericXLogState * Add to element and neighbor pages */ static void -WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPage, BlockNumber *updatedInsertPage, bool building) +AddElementOnDisk(Relation index, HnswElement e, int m, BlockNumber insertPage, BlockNumber *updatedInsertPage, bool building) { Buffer buf; Page page; @@ -134,9 +134,10 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag OffsetNumber freeOffno = InvalidOffsetNumber; OffsetNumber freeNeighborOffno = InvalidOffsetNumber; BlockNumber newInsertPage = InvalidBlockNumber; + char *base = NULL; /* Calculate sizes */ - etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(DatumGetPointer(e->value))); + etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(HnswPtrAccess(base, e->value))); ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(e->level, m); combinedSize = etupSize + ntupSize + sizeof(ItemIdData); maxSize = HNSW_MAX_SIZE; @@ -144,11 +145,11 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag /* Prepare element tuple */ etup = palloc0(etupSize); - HnswSetElementTuple(etup, e); + HnswSetElementTuple(base, etup, e); /* Prepare neighbor tuple */ ntup = palloc0(ntupSize); - HnswSetNeighborTuple(ntup, e, m); + HnswSetNeighborTuple(base, ntup, e, m); /* Find a page (or two if needed) to insert the tuples */ for (;;) @@ -338,12 +339,14 @@ ConnectionExists(HnswElement e, HnswNeighborTuple ntup, int startIdx, int lm) * Update neighbors */ void -HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building) +HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building) { + char *base = NULL; + for (int lc = e->level; lc >= 0; lc--) { int lm = HnswGetLayerM(m, lc); - HnswNeighborArray *neighbors = HnswGetNeighbors(e, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc); for (int i = 0; i < neighbors->length; i++) { @@ -356,11 +359,12 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE Size ntupSize; int idx = -1; int startIdx; - OffsetNumber offno = hc->element->neighborOffno; + HnswElement neighborElement = HnswPtrAccess(base, hc->element); + OffsetNumber offno = neighborElement->neighborOffno; /* Get latest neighbors since they may have changed */ /* Do not lock yet since selecting neighbors can take time */ - HnswLoadNeighbors(hc->element, index, m); + HnswLoadNeighbors(neighborElement, index, m); /* * Could improve performance for vacuuming by checking neighbors @@ -370,14 +374,14 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE */ /* Select neighbors */ - HnswUpdateConnection(e, hc, lm, lc, &idx, index, procinfo, collation); + HnswUpdateConnection(NULL, e, hc, lm, lc, &idx, index, procinfo, collation); /* New element was not selected as a neighbor */ if (idx == -1) continue; /* Register page */ - buf = ReadBuffer(index, hc->element->neighborPage); + buf = ReadBuffer(index, neighborElement->neighborPage); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); if (building) { @@ -396,7 +400,7 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE ntupSize = ItemIdGetLength(itemid); /* Calculate index for update */ - startIdx = (hc->element->level - lc) * m; + startIdx = (neighborElement->level - lc) * m; /* Check for existing connection */ if (checkExisting && ConnectionExists(e, ntup, startIdx, lm)) @@ -447,7 +451,7 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE * Add a heap TID to an existing element */ static bool -HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup, bool building) +AddDuplicateOnDisk(Relation index, HnswElement element, HnswElement dup, bool building) { Buffer buf; Page page; @@ -511,19 +515,23 @@ HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup, bool buil * Find duplicate element */ static bool -HnswFindDuplicate(Relation index, HnswElement element, bool building) +FindDuplicateOnDisk(Relation index, HnswElement element, bool building) { - HnswNeighborArray *neighbors = HnswGetNeighbors(element, 0); + char *base = NULL; + HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0); + Datum value = HnswGetValue(base, element); for (int i = 0; i < neighbors->length; i++) { HnswCandidate *neighbor = &neighbors->items[i]; + HnswElement neighborElement = HnswPtrAccess(base, neighbor->element); + Datum neighborValue = HnswGetValue(base, neighborElement); /* Exit early since ordered by distance */ - if (!datumIsEqual(element->value, neighbor->element->value, false, -1)) + if (!datumIsEqual(value, neighborValue, false, -1)) return false; - if (HnswAddDuplicate(index, element, neighbor->element, building)) + if (AddDuplicateOnDisk(index, element, neighborElement, building)) return true; } @@ -531,26 +539,26 @@ HnswFindDuplicate(Relation index, HnswElement element, bool building) } /* - * Write changes to disk + * Update graph on disk */ static void -WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement entryPoint, bool building) +UpdateGraphOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement entryPoint, bool building) { BlockNumber newInsertPage = InvalidBlockNumber; /* Look for duplicate */ - if (HnswFindDuplicate(index, element, building)) + if (FindDuplicateOnDisk(index, element, building)) return; - /* Write element and neighbor tuples */ - WriteNewElementPages(index, element, m, GetInsertPage(index), &newInsertPage, building); + /* Add element */ + AddElementOnDisk(index, element, m, GetInsertPage(index), &newInsertPage, building); /* Update insert page if needed */ if (BlockNumberIsValid(newInsertPage)) HnswUpdateMetaPage(index, 0, NULL, newInsertPage, MAIN_FORKNUM, building); /* Update neighbors */ - HnswUpdateNeighborPages(index, procinfo, collation, element, m, false, building); + HnswUpdateNeighborsOnDisk(index, procinfo, collation, element, m, false, building); /* Update entry point if needed */ if (entryPoint == NULL || element->level > entryPoint->level) @@ -561,10 +569,8 @@ WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement elem * Insert a tuple into the index */ bool -HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building) +HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building) { - Datum value; - FmgrInfo *normprocinfo; HnswElement entryPoint; HnswElement element; int m; @@ -572,17 +578,7 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti FmgrInfo *procinfo = index_getprocinfo(index, 1, HNSW_DISTANCE_PROC); Oid collation = index->rd_indcollation[0]; LOCKMODE lockmode = ShareLock; - - /* Detoast once for all calls */ - value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); - - /* Normalize if needed */ - normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); - if (normprocinfo != NULL) - { - if (!HnswNormValue(normprocinfo, collation, &value, NULL)) - return false; - } + char *base = NULL; /* * Get a shared lock. This allows vacuum to ensure no in-flight inserts @@ -595,8 +591,8 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti HnswGetMetaPageInfo(index, &m, &entryPoint); /* Create an element */ - element = HnswInitElement(heap_tid, m, HnswGetMl(m), HnswGetMaxLevel(m)); - element->value = value; + element = HnswInitElement(base, heap_tid, m, HnswGetMl(m), HnswGetMaxLevel(m), NULL); + HnswPtrStore(base, element->value, DatumGetPointer(value)); /* Prevent concurrent inserts when likely updating entry point */ if (entryPoint == NULL || element->level > entryPoint->level) @@ -612,11 +608,11 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti entryPoint = HnswGetEntryPoint(index); } - /* Insert element in graph */ - HnswInsertElement(element, entryPoint, index, procinfo, collation, m, efConstruction, false); + /* Find neighbors for element */ + HnswFindElementNeighbors(base, element, entryPoint, index, procinfo, collation, m, efConstruction, false); - /* Write to disk */ - WriteElement(index, procinfo, collation, element, m, efConstruction, entryPoint, building); + /* Update graph on disk */ + UpdateGraphOnDisk(index, procinfo, collation, element, m, efConstruction, entryPoint, building); /* Release lock */ UnlockPage(index, HNSW_UPDATE_LOCK, lockmode); @@ -624,6 +620,30 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti return true; } +/* + * Insert a tuple into the index + */ +static void +HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel) +{ + Datum value; + FmgrInfo *normprocinfo; + Oid collation = index->rd_indcollation[0]; + + /* Detoast once for all calls */ + value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); + + /* Normalize if needed */ + normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); + if (normprocinfo != NULL) + { + if (!HnswNormValue(normprocinfo, collation, &value, NULL)) + return; + } + + HnswInsertTupleOnDisk(index, value, values, isnull, heap_tid, heapRel, false); +} + /* * Insert a tuple into the index */ @@ -650,7 +670,7 @@ hnswinsert(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, oldCtx = MemoryContextSwitchTo(insertCtx); /* Insert tuple */ - HnswInsertTuple(index, values, isnull, heap_tid, heap, false); + HnswInsertTuple(index, values, isnull, heap_tid, heap); /* Delete memory context */ MemoryContextSwitchTo(oldCtx); diff --git a/src/hnswscan.c b/src/hnswscan.c index 48cd6ee..fb7954f 100644 --- a/src/hnswscan.c +++ b/src/hnswscan.c @@ -21,6 +21,7 @@ GetScanItems(IndexScanDesc scan, Datum q) List *w; int m; HnswElement entryPoint; + char *base = NULL; /* Get m and entry point */ HnswGetMetaPageInfo(index, &m, &entryPoint); @@ -28,15 +29,15 @@ GetScanItems(IndexScanDesc scan, Datum q) if (entryPoint == NULL) return NIL; - ep = list_make1(HnswEntryCandidate(entryPoint, q, index, procinfo, collation, false)); + ep = list_make1(HnswEntryCandidate(base, entryPoint, q, index, procinfo, collation, false)); for (int lc = entryPoint->level; lc >= 1; lc--) { - w = HnswSearchLayer(q, ep, 1, lc, index, procinfo, collation, m, false, NULL); + w = HnswSearchLayer(base, q, ep, 1, lc, index, procinfo, collation, m, false, NULL); ep = w; } - return HnswSearchLayer(q, ep, hnsw_ef_search, 0, index, procinfo, collation, m, false, NULL); + return HnswSearchLayer(base, q, ep, hnsw_ef_search, 0, index, procinfo, collation, m, false, NULL); } /* @@ -184,17 +185,19 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir) while (list_length(so->w) > 0) { + char *base = NULL; HnswCandidate *hc = llast(so->w); + HnswElement element = HnswPtrAccess(base, hc->element); ItemPointer heaptid; /* Move to next element if no valid heap TIDs */ - if (hc->element->heaptidsLength == 0) + if (element->heaptidsLength == 0) { so->w = list_delete_last(so->w); continue; } - heaptid = &hc->element->heaptids[--hc->element->heaptidsLength]; + heaptid = &element->heaptids[--element->heaptidsLength]; MemoryContextSwitchTo(oldCtx); diff --git a/src/hnswutils.c b/src/hnswutils.c index a53f606..e3fa0be 100644 --- a/src/hnswutils.c +++ b/src/hnswutils.c @@ -84,9 +84,40 @@ hash_pointer(uintptr_t ptr) #define SH_DEFINE #include "lib/simplehash.h" +/* Needed to include simplehash.h again */ +#if PG_VERSION_NUM < 120000 +#undef SH_EQUAL +#undef sh_log2 +#undef sh_pow2 +#define sh_log2 offsethash_sh_log2 +#define sh_pow2 offsethash_sh_pow2 +#endif + +/* Offset hash table */ +static uint32 +hash_offset(Size offset) +{ +#if SIZEOF_SIZE_T == 8 + return murmurhash64((uint64) offset); +#else + return murmurhash32((uint32) offset); +#endif +} + +#define SH_PREFIX offsethash +#define SH_ELEMENT_TYPE OffsetHashEntry +#define SH_KEY_TYPE Size +#define SH_KEY offset +#define SH_HASH_KEY(tb, key) hash_offset(key) +#define SH_EQUAL(tb, a, b) (a == b) +#define SH_SCOPE extern +#define SH_DEFINE +#include "lib/simplehash.h" + typedef union { pointerhash_hash *pointers; + offsethash_hash *offsets; tidhash_hash *tids; } visited_hash; @@ -188,31 +219,46 @@ HnswInitPage(Buffer buf, Page page) * Allocate neighbors */ void -HnswInitNeighbors(HnswElement element, int m) +HnswInitNeighbors(char *base, HnswElement element, int m, HnswAllocator * allocator) { int level = element->level; - element->neighbors = palloc(sizeof(HnswNeighborArray) * (level + 1)); + HnswNeighborArrayPtr *neighborList = (HnswNeighborArrayPtr *) HnswAlloc(allocator, sizeof(HnswNeighborArrayPtr) * (level + 1)); + + HnswPtrStore(base, element->neighbors, neighborList); for (int lc = 0; lc <= level; lc++) { HnswNeighborArray *a; int lm = HnswGetLayerM(m, lc); - a = &element->neighbors[lc]; + HnswPtrStore(base, neighborList[lc], (HnswNeighborArray *) HnswAlloc(allocator, offsetof(HnswNeighborArray, items) + sizeof(HnswCandidate) * lm)); + + a = HnswGetNeighbors(base, element, lc); a->length = 0; - a->items = palloc(sizeof(HnswCandidate) * lm); a->closerSet = false; } } +/* + * Allocate memory from the allocator + */ +void * +HnswAlloc(HnswAllocator * allocator, Size size) +{ + if (allocator) + return (*(allocator)->alloc) (size, (allocator)->state); + + return palloc(size); +} + /* * Allocate an element */ HnswElement -HnswInitElement(ItemPointer heaptid, int m, double ml, int maxLevel) +HnswInitElement(char *base, ItemPointer heaptid, int m, double ml, int maxLevel, HnswAllocator * allocator) { - HnswElement element = palloc(sizeof(HnswElementData)); + HnswElement element = HnswAlloc(allocator, sizeof(HnswElementData)); int level = (int) (-log(RandomDouble()) * ml); @@ -226,9 +272,9 @@ HnswInitElement(ItemPointer heaptid, int m, double ml, int maxLevel) element->level = level; element->deleted = 0; - HnswInitNeighbors(element, m); + HnswInitNeighbors(base, element, m, allocator); - element->value = PointerGetDatum(NULL); + HnswPtrStore(base, element->value, (Pointer) NULL); return element; } @@ -249,11 +295,12 @@ HnswElement HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno) { HnswElement element = palloc(sizeof(HnswElementData)); + char *base = NULL; element->blkno = blkno; element->offno = offno; - element->neighbors = NULL; - element->value = PointerGetDatum(NULL); + HnswPtrStore(base, element->neighbors, (HnswNeighborArrayPtr *) NULL); + HnswPtrStore(base, element->value, (Pointer) NULL); return element; } @@ -363,8 +410,10 @@ HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, Bloc * Set element tuple, except for neighbor info */ void -HnswSetElementTuple(HnswElementTuple etup, HnswElement element) +HnswSetElementTuple(char *base, HnswElementTuple etup, HnswElement element) { + Pointer valuePtr = HnswPtrAccess(base, element->value); + etup->type = HNSW_ELEMENT_TUPLE_TYPE; etup->level = element->level; etup->deleted = 0; @@ -375,14 +424,14 @@ HnswSetElementTuple(HnswElementTuple etup, HnswElement element) else ItemPointerSetInvalid(&etup->heaptids[i]); } - memcpy(&etup->data, DatumGetPointer(element->value), VARSIZE_ANY(DatumGetPointer(element->value))); + memcpy(&etup->data, valuePtr, VARSIZE_ANY(valuePtr)); } /* * Set neighbor tuple */ void -HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m) +HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m) { int idx = 0; @@ -390,7 +439,7 @@ HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m) for (int lc = e->level; lc >= 0; lc--) { - HnswNeighborArray *neighbors = HnswGetNeighbors(e, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc); int lm = HnswGetLayerM(m, lc); for (int i = 0; i < lm; i++) @@ -400,8 +449,9 @@ HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m) if (i < neighbors->length) { HnswCandidate *hc = &neighbors->items[i]; + HnswElement hce = HnswPtrAccess(base, hc->element); - ItemPointerSet(indextid, hc->element->blkno, hc->element->offno); + ItemPointerSet(indextid, hce->blkno, hce->offno); } else ItemPointerSetInvalid(indextid); @@ -417,12 +467,14 @@ HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m) static void LoadNeighborsFromPage(HnswElement element, Relation index, Page page, int m) { + char *base = NULL; + HnswNeighborTuple ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, element->neighborOffno)); int neighborCount = (element->level + 2) * m; Assert(HnswIsNeighborTuple(ntup)); - HnswInitNeighbors(element, m); + HnswInitNeighbors(base, element, m, NULL); /* Ensure expected neighbors */ if (ntup->count != neighborCount) @@ -448,9 +500,9 @@ LoadNeighborsFromPage(HnswElement element, Relation index, Page page, int m) if (level < 0) level = 0; - neighbors = HnswGetNeighbors(element, level); + neighbors = HnswGetNeighbors(base, element, level); hc = &neighbors->items[neighbors->length++]; - hc->element = e; + HnswPtrStore(base, hc->element, e); } } @@ -497,7 +549,12 @@ HnswLoadElementFromTuple(HnswElement element, HnswElementTuple etup, bool loadHe } if (loadVec) - element->value = datumCopy(PointerGetDatum(&etup->data), false, -1); + { + char *base = NULL; + Datum value = datumCopy(PointerGetDatum(&etup->data), false, -1); + + HnswPtrStore(base, element->value, DatumGetPointer(value)); + } } /* @@ -533,24 +590,27 @@ HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index, * Get the distance for a candidate */ static float -GetCandidateDistance(HnswCandidate * hc, Datum q, FmgrInfo *procinfo, Oid collation) +GetCandidateDistance(char *base, HnswCandidate * hc, Datum q, FmgrInfo *procinfo, Oid collation) { - return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, q, hc->element->value)); + HnswElement hce = HnswPtrAccess(base, hc->element); + Datum value = HnswGetValue(base, hce); + + return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, q, value)); } /* * Create a candidate for the entry point */ HnswCandidate * -HnswEntryCandidate(HnswElement entryPoint, Datum q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec) +HnswEntryCandidate(char *base, HnswElement entryPoint, Datum q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec) { HnswCandidate *hc = palloc(sizeof(HnswCandidate)); - hc->element = entryPoint; + HnswPtrStore(base, hc->element, entryPoint); if (index == NULL) - hc->distance = GetCandidateDistance(hc, q, procinfo, collation); + hc->distance = GetCandidateDistance(base, hc, q, procinfo, collation); else - HnswLoadElement(hc->element, &hc->distance, &q, index, procinfo, collation, loadVec); + HnswLoadElement(entryPoint, &hc->distance, &q, index, procinfo, collation, loadVec); return hc; } @@ -596,34 +656,79 @@ CreatePairingHeapNode(HnswCandidate * c) return node; } +/* + * Init visited + */ +static inline void +InitVisited(char *base, visited_hash * v, Relation index, int ef, int m) +{ + if (index != NULL) + v->tids = tidhash_create(CurrentMemoryContext, ef * m * 2, NULL); + else if (base != NULL) + v->offsets = offsethash_create(CurrentMemoryContext, ef * m * 2, NULL); + else + v->pointers = pointerhash_create(CurrentMemoryContext, ef * m * 2, NULL); +} + /* * Add to visited */ static inline void -AddToVisited(visited_hash * v, HnswCandidate * hc, Relation index, bool *found) +AddToVisited(char *base, visited_hash * v, HnswCandidate * hc, Relation index, bool *found) { - if (index == NULL) + if (index != NULL) + { + HnswElement element = HnswPtrAccess(base, hc->element); + ItemPointerData indextid; + + ItemPointerSet(&indextid, element->blkno, element->offno); + tidhash_insert(v->tids, indextid, found); + } + else if (base != NULL) { #if PG_VERSION_NUM >= 130000 - pointerhash_insert_hash(v->pointers, (uintptr_t) hc->element, hc->element->hash, found); + HnswElement element = HnswPtrAccess(base, hc->element); + + offsethash_insert_hash(v->offsets, HnswPtrOffset(hc->element), element->hash, found); #else - pointerhash_insert(v->pointers, (uintptr_t) hc->element, found); + offsethash_insert(v->offsets, HnswPtrOffset(hc->element), found); #endif } else { - ItemPointerData indextid; +#if PG_VERSION_NUM >= 130000 + HnswElement element = HnswPtrAccess(base, hc->element); - ItemPointerSet(&indextid, hc->element->blkno, hc->element->offno); - tidhash_insert(v->tids, indextid, found); + pointerhash_insert_hash(v->pointers, (uintptr_t) HnswPtrPointer(hc->element), element->hash, found); +#else + pointerhash_insert(v->pointers, (uintptr_t) HnswPtrPointer(hc->element), found); +#endif } } +/* + * Count element towards ef + */ +static inline bool +CountElement(char *base, HnswElement skipElement, HnswCandidate * hc) +{ + HnswElement e; + + if (skipElement == NULL) + return true; + + /* Ensure does not access heaptidsLength during in-memory build */ + pg_memory_barrier(); + + e = HnswPtrAccess(base, hc->element); + return e->heaptidsLength != 0; +} + /* * Algorithm 2 from paper */ List * -HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement) +HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement) { List *w = NIL; pairingheap *C = pairingheap_allocate(CompareNearestCandidates, NULL); @@ -631,12 +736,17 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro int wlen = 0; visited_hash v; ListCell *lc2; + HnswNeighborArray *neighborhoodData = NULL; + Size neighborhoodSize; - /* Create hash table */ + InitVisited(base, &v, index, ef, m); + + /* Create local memory for neighborhood if needed */ if (index == NULL) - v.pointers = pointerhash_create(CurrentMemoryContext, ef * m * 2, NULL); - else - v.tids = tidhash_create(CurrentMemoryContext, ef * m * 2, NULL); + { + neighborhoodSize = offsetof(HnswNeighborArray, items) + sizeof(HnswCandidate) * HnswGetLayerM(m, lc); + neighborhoodData = palloc(neighborhoodSize); + } /* Add entry points to v, C, and W */ foreach(lc2, ep) @@ -644,7 +754,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro HnswCandidate *hc = (HnswCandidate *) lfirst(lc2); bool found; - AddToVisited(&v, hc, index, &found); + AddToVisited(base, &v, hc, index, &found); pairingheap_add(C, &(CreatePairingHeapNode(hc)->ph_node)); pairingheap_add(W, &(CreatePairingHeapNode(hc)->ph_node)); @@ -654,7 +764,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro * would be ideal to do this for inserts as well, but this could * affect insert performance. */ - if (skipElement == NULL || hc->element->heaptidsLength != 0) + if (CountElement(base, skipElement, hc)) wlen++; } @@ -663,38 +773,51 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro HnswNeighborArray *neighborhood; HnswCandidate *c = ((HnswPairingHeapNode *) pairingheap_remove_first(C))->inner; HnswCandidate *f = ((HnswPairingHeapNode *) pairingheap_first(W))->inner; + HnswElement cElement; if (c->distance > f->distance) break; - if (c->element->neighbors == NULL) - HnswLoadNeighbors(c->element, index, m); + cElement = HnswPtrAccess(base, c->element); + + if (HnswPtrIsNull(base, cElement->neighbors)) + HnswLoadNeighbors(cElement, index, m); /* Get the neighborhood at layer lc */ - neighborhood = HnswGetNeighbors(c->element, lc); + neighborhood = HnswGetNeighbors(base, cElement, lc); + + /* Copy neighborhood to local memory if needed */ + if (index == NULL) + { + LWLockAcquire(&cElement->lock, LW_SHARED); + memcpy(neighborhoodData, neighborhood, neighborhoodSize); + LWLockRelease(&cElement->lock); + neighborhood = neighborhoodData; + } for (int i = 0; i < neighborhood->length; i++) { HnswCandidate *e = &neighborhood->items[i]; bool visited; - AddToVisited(&v, e, index, &visited); + AddToVisited(base, &v, e, index, &visited); if (!visited) { float eDistance; + HnswElement eElement = HnswPtrAccess(base, e->element); f = ((HnswPairingHeapNode *) pairingheap_first(W))->inner; if (index == NULL) - eDistance = GetCandidateDistance(e, q, procinfo, collation); + eDistance = GetCandidateDistance(base, e, q, procinfo, collation); else - HnswLoadElement(e->element, &eDistance, &q, index, procinfo, collation, inserting); + HnswLoadElement(eElement, &eDistance, &q, index, procinfo, collation, inserting); - Assert(!e->element->deleted); + Assert(!eElement->deleted); /* Make robust to issues */ - if (e->element->level < lc) + if (eElement->level < lc) continue; if (eDistance < f->distance || wlen < ef) @@ -702,7 +825,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro /* Copy e */ HnswCandidate *ec = palloc(sizeof(HnswCandidate)); - ec->element = e->element; + HnswPtrStore(base, ec->element, eElement); ec->distance = eDistance; pairingheap_add(C, &(CreatePairingHeapNode(ec)->ph_node)); @@ -713,7 +836,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro * vacuuming. It would be ideal to do this for inserts as * well, but this could affect insert performance. */ - if (skipElement == NULL || e->element->heaptidsLength != 0) + if (CountElement(base, skipElement, e)) { wlen++; @@ -738,7 +861,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro } /* - * Compare candidate distances + * Compare candidate distances with pointer tie-breaker */ static int #if PG_VERSION_NUM >= 130000 @@ -756,10 +879,38 @@ CompareCandidateDistances(const void *a, const void *b) if (hca->distance > hcb->distance) return -1; - if (hca->element < hcb->element) + if (HnswPtrPointer(hca->element) < HnswPtrPointer(hcb->element)) return 1; - if (hca->element > hcb->element) + if (HnswPtrPointer(hca->element) > HnswPtrPointer(hcb->element)) + return -1; + + return 0; +} + +/* + * Compare candidate distances with offset tie-breaker + */ +static int +#if PG_VERSION_NUM >= 130000 +CompareCandidateDistancesOffset(const ListCell *a, const ListCell *b) +#else +CompareCandidateDistancesOffset(const void *a, const void *b) +#endif +{ + HnswCandidate *hca = lfirst((ListCell *) a); + HnswCandidate *hcb = lfirst((ListCell *) b); + + if (hca->distance < hcb->distance) + return 1; + + if (hca->distance > hcb->distance) + return -1; + + if (HnswPtrOffset(hca->element) < HnswPtrOffset(hcb->element)) + return 1; + + if (HnswPtrOffset(hca->element) > HnswPtrOffset(hcb->element)) return -1; return 0; @@ -769,46 +920,58 @@ CompareCandidateDistances(const void *a, const void *b) * Calculate the distance between elements */ static float -HnswGetDistance(HnswElement a, HnswElement b, int lc, FmgrInfo *procinfo, Oid collation) +HnswGetDistance(char *base, HnswElement a, HnswElement b, int lc, FmgrInfo *procinfo, Oid collation) { + Datum aValue; + Datum bValue; + /* Look for cached distance */ - if (a->neighbors != NULL) + if (!HnswPtrIsNull(base, a->neighbors)) { - HnswNeighborArray *neighbors = HnswGetNeighbors(a, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, a, lc); for (int i = 0; i < neighbors->length; i++) { - if (neighbors->items[i].element == b) + HnswElement element = HnswPtrAccess(base, neighbors->items[i].element); + + if (element == b) return neighbors->items[i].distance; } } - if (b->neighbors != NULL) + if (!HnswPtrIsNull(base, b->neighbors)) { - HnswNeighborArray *neighbors = HnswGetNeighbors(b, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, b, lc); for (int i = 0; i < neighbors->length; i++) { - if (neighbors->items[i].element == a) + HnswElement element = HnswPtrAccess(base, neighbors->items[i].element); + + if (element == a) return neighbors->items[i].distance; } } - return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, a->value, b->value)); + aValue = HnswGetValue(base, a); + bValue = HnswGetValue(base, b); + + return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, aValue, bValue)); } /* * Check if an element is closer to q than any element from R */ static bool -CheckElementCloser(HnswCandidate * e, List *r, int lc, FmgrInfo *procinfo, Oid collation) +CheckElementCloser(char *base, HnswCandidate * e, List *r, int lc, FmgrInfo *procinfo, Oid collation) { + HnswElement eElement = HnswPtrAccess(base, e->element); ListCell *lc2; foreach(lc2, r) { HnswCandidate *ri = lfirst(lc2); - float distance = HnswGetDistance(e->element, ri->element, lc, procinfo, collation); + HnswElement riElement = HnswPtrAccess(base, ri->element); + float distance = HnswGetDistance(base, eElement, riElement, lc, procinfo, collation); if (distance <= e->distance) return false; @@ -821,12 +984,12 @@ CheckElementCloser(HnswCandidate * e, List *r, int lc, FmgrInfo *procinfo, Oid c * Algorithm 4 from paper */ static List * -SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, HnswElement e2, HnswCandidate * newCandidate, HnswCandidate * *pruned, bool sortCandidates) +SelectNeighbors(char *base, List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, HnswElement e2, HnswCandidate * newCandidate, HnswCandidate * *pruned, bool sortCandidates) { List *r = NIL; List *w = list_copy(c); pairingheap *wd; - HnswNeighborArray *neighbors = HnswGetNeighbors(e2, lc); + HnswNeighborArray *neighbors = HnswGetNeighbors(base, e2, lc); bool mustCalculate = !neighbors->closerSet; List *added = NIL; bool removedAny = false; @@ -838,7 +1001,12 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw /* Ensure order of candidates is deterministic for closer caching */ if (sortCandidates) - list_sort(w, CompareCandidateDistances); + { + if (base == NULL) + list_sort(w, CompareCandidateDistances); + else + list_sort(w, CompareCandidateDistancesOffset); + } while (list_length(w) > 0 && list_length(r) < lm) { @@ -849,7 +1017,7 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw /* Use previous state of r and wd to skip work when possible */ if (mustCalculate) - e->closer = CheckElementCloser(e, r, lc, procinfo, collation); + e->closer = CheckElementCloser(base, e, r, lc, procinfo, collation); else if (list_length(added) > 0) { /* @@ -858,7 +1026,7 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw */ if (e->closer) { - e->closer = CheckElementCloser(e, added, lc, procinfo, collation); + e->closer = CheckElementCloser(base, e, added, lc, procinfo, collation); if (!e->closer) removedAny = true; @@ -871,7 +1039,7 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw */ if (removedAny) { - e->closer = CheckElementCloser(e, r, lc, procinfo, collation); + e->closer = CheckElementCloser(base, e, r, lc, procinfo, collation); if (e->closer) added = lappend(added, e); } @@ -879,7 +1047,7 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw } else if (e == newCandidate) { - e->closer = CheckElementCloser(e, r, lc, procinfo, collation); + e->closer = CheckElementCloser(base, e, r, lc, procinfo, collation); if (e->closer) added = lappend(added, e); } @@ -913,10 +1081,10 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw * Add connections */ static void -AddConnections(HnswElement element, List *neighbors, int lc) +AddConnections(char *base, HnswElement element, List *neighbors, int lc) { ListCell *lc2; - HnswNeighborArray *a = HnswGetNeighbors(element, lc); + HnswNeighborArray *a = HnswGetNeighbors(base, element, lc); foreach(lc2, neighbors) a->items[a->length++] = *((HnswCandidate *) lfirst(lc2)); @@ -926,13 +1094,13 @@ AddConnections(HnswElement element, List *neighbors, int lc) * Update connections */ void -HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation) +HnswUpdateConnection(char *base, HnswElement element, HnswCandidate * hc, int lm, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation) { - HnswNeighborArray *currentNeighbors = HnswGetNeighbors(hc->element, lc); - + HnswElement hce = HnswPtrAccess(base, hc->element); + HnswNeighborArray *currentNeighbors = HnswGetNeighbors(base, hce, lc); HnswCandidate hc2; - hc2.element = element; + HnswPtrStore(base, hc2.element, element); hc2.distance = hc->distance; if (currentNeighbors->length < lm) @@ -951,19 +1119,20 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, in /* Load elements on insert */ if (index != NULL) { - Datum q = hc->element->value; + Datum q = HnswGetValue(base, hce); for (int i = 0; i < currentNeighbors->length; i++) { HnswCandidate *hc3 = ¤tNeighbors->items[i]; + HnswElement hc3Element = HnswPtrAccess(base, hc3->element); - if (DatumGetPointer(hc3->element->value) == NULL) - HnswLoadElement(hc3->element, &hc3->distance, &q, index, procinfo, collation, true); + if (HnswPtrIsNull(base, hc3Element->value)) + HnswLoadElement(hc3Element, &hc3->distance, &q, index, procinfo, collation, true); else - hc3->distance = GetCandidateDistance(hc3, q, procinfo, collation); + hc3->distance = GetCandidateDistance(base, hc3, q, procinfo, collation); /* Prune element if being deleted */ - if (hc3->element->heaptidsLength == 0) + if (hc3Element->heaptidsLength == 0) { pruned = ¤tNeighbors->items[i]; break; @@ -980,7 +1149,7 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, in c = lappend(c, ¤tNeighbors->items[i]); c = lappend(c, &hc2); - SelectNeighbors(c, lm, lc, procinfo, collation, hc->element, &hc2, &pruned, true); + SelectNeighbors(base, c, lm, lc, procinfo, collation, hce, &hc2, &pruned, true); /* Should not happen */ if (pruned == NULL) @@ -990,7 +1159,7 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, in /* Find and replace the pruned element */ for (int i = 0; i < currentNeighbors->length; i++) { - if (currentNeighbors->items[i].element == pruned->element) + if (HnswPtrEqual(base, currentNeighbors->items[i].element, pruned->element)) { currentNeighbors->items[i] = hc2; @@ -1008,43 +1177,65 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, in * Remove elements being deleted or skipped */ static List * -RemoveElements(List *w, HnswElement skipElement) +RemoveElements(char *base, List *w, HnswElement skipElement) { ListCell *lc2; List *w2 = NIL; + /* Ensure does not access heaptidsLength during in-memory build */ + pg_memory_barrier(); + foreach(lc2, w) { HnswCandidate *hc = (HnswCandidate *) lfirst(lc2); + HnswElement hce = HnswPtrAccess(base, hc->element); /* Skip self for vacuuming update */ - if (skipElement != NULL && hc->element->blkno == skipElement->blkno && hc->element->offno == skipElement->offno) + if (skipElement != NULL && hce->blkno == skipElement->blkno && hce->offno == skipElement->offno) continue; - if (hc->element->heaptidsLength != 0) + if (hce->heaptidsLength != 0) w2 = lappend(w2, hc); } return w2; } +#if PG_VERSION_NUM >= 130000 +/* + * Precompute hash + */ +static void +PrecomputeHash(char *base, HnswElement element) +{ + HnswElementPtr ptr; + + HnswPtrStore(base, ptr, element); + + if (base == NULL) + element->hash = hash_pointer((uintptr_t) HnswPtrPointer(ptr)); + else + element->hash = hash_offset(HnswPtrOffset(ptr)); +} +#endif + /* * Algorithm 1 from paper */ void -HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing) +HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing) { List *ep; List *w; int level = element->level; int entryLevel; - Datum q = element->value; + Datum q = HnswGetValue(base, element); HnswElement skipElement = existing ? element : NULL; #if PG_VERSION_NUM >= 130000 /* Precompute hash */ if (index == NULL) - element->hash = hash_pointer((uintptr_t) element); + PrecomputeHash(base, element); #endif /* No neighbors if no entry point */ @@ -1052,13 +1243,13 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F return; /* Get entry point and level */ - ep = list_make1(HnswEntryCandidate(entryPoint, q, index, procinfo, collation, true)); + ep = list_make1(HnswEntryCandidate(base, entryPoint, q, index, procinfo, collation, true)); entryLevel = entryPoint->level; /* 1st phase: greedy search to insert level */ for (int lc = entryLevel; lc >= level + 1; lc--) { - w = HnswSearchLayer(q, ep, 1, lc, index, procinfo, collation, m, true, skipElement); + w = HnswSearchLayer(base, q, ep, 1, lc, index, procinfo, collation, m, true, skipElement); ep = w; } @@ -1076,12 +1267,12 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F List *neighbors; List *lw; - w = HnswSearchLayer(q, ep, efConstruction, lc, index, procinfo, collation, m, true, skipElement); + w = HnswSearchLayer(base, q, ep, efConstruction, lc, index, procinfo, collation, m, true, skipElement); /* Elements being deleted or skipped can help with search */ /* but should be removed before selecting neighbors */ if (index != NULL) - lw = RemoveElements(w, skipElement); + lw = RemoveElements(base, w, skipElement); else lw = w; @@ -1090,9 +1281,9 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F * sortCandidates to true for in-memory builds to enable closer * caching, but there does not seem to be a difference in performance. */ - neighbors = SelectNeighbors(lw, lm, lc, procinfo, collation, element, NULL, NULL, false); + neighbors = SelectNeighbors(base, lw, lm, lc, procinfo, collation, element, NULL, NULL, false); - AddConnections(element, neighbors, lc); + AddConnections(base, element, neighbors, lc); ep = w; } diff --git a/src/hnswvacuum.c b/src/hnswvacuum.c index 96ac54c..8bf6ac7 100644 --- a/src/hnswvacuum.c +++ b/src/hnswvacuum.c @@ -199,21 +199,22 @@ RepairGraphElement(HnswVacuumState * vacuumstate, HnswElement element, HnswEleme BufferAccessStrategy bas = vacuumstate->bas; HnswNeighborTuple ntup = vacuumstate->ntup; Size ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(element->level, m); + char *base = NULL; /* Skip if element is entry point */ if (entryPoint != NULL && element->blkno == entryPoint->blkno && element->offno == entryPoint->offno) return; /* Init fields */ - HnswInitNeighbors(element, m); + HnswInitNeighbors(base, element, m, NULL); element->heaptidsLength = 0; - /* Add element to graph, skipping itself */ - HnswInsertElement(element, entryPoint, index, procinfo, collation, m, efConstruction, true); + /* Find neighbors for element, skipping itself */ + HnswFindElementNeighbors(base, element, entryPoint, index, procinfo, collation, m, efConstruction, true); /* Update neighbor tuple */ /* Do this before getting page to minimize locking */ - HnswSetNeighborTuple(ntup, element, m); + HnswSetNeighborTuple(base, ntup, element, m); /* Get neighbor page */ buf = ReadBufferExtended(index, MAIN_FORKNUM, element->neighborPage, RBM_NORMAL, bas); @@ -230,7 +231,7 @@ RepairGraphElement(HnswVacuumState * vacuumstate, HnswElement element, HnswEleme UnlockReleaseBuffer(buf); /* Update neighbors */ - HnswUpdateNeighborPages(index, procinfo, collation, element, m, true, false); + HnswUpdateNeighborsOnDisk(index, procinfo, collation, element, m, true, false); } /* @@ -301,7 +302,7 @@ RepairGraphEntryPoint(HnswVacuumState * vacuumstate) { /* Reset neighbors from previous update */ if (highestPoint != NULL) - highestPoint->neighbors = NULL; + HnswPtrStore((char *) NULL, highestPoint->neighbors, (HnswNeighborArrayPtr *) NULL); RepairGraphElement(vacuumstate, entryPoint, highestPoint); } diff --git a/test/t/012_hnsw_build_recall.pl b/test/t/012_hnsw_build_recall.pl index 51ef58d..163a472 100644 --- a/test/t/012_hnsw_build_recall.pl +++ b/test/t/012_hnsw_build_recall.pl @@ -95,11 +95,10 @@ for my $i (0 .. $#operators) $node->safe_psql("postgres", "DROP INDEX idx;"); - # Build index in parallel + # Build index in parallel in memory my ($ret, $stdout, $stderr) = $node->psql("postgres", qq( SET client_min_messages = DEBUG; SET min_parallel_table_scan_size = 1; - SET hnsw.enable_parallel_build = on; CREATE INDEX idx ON tst USING hnsw (v $opclass); )); is($ret, 0, $stderr); @@ -109,6 +108,21 @@ for my $i (0 .. $#operators) test_recall($min, $operator); $node->safe_psql("postgres", "DROP INDEX idx;"); + + # Build index in parallel on disk + # Set parallel_workers on table to use workers with low maintenance_work_mem + ($ret, $stdout, $stderr) = $node->psql("postgres", qq( + ALTER TABLE tst SET (parallel_workers = 2); + SET client_min_messages = DEBUG; + SET maintenance_work_mem = '4MB'; + CREATE INDEX idx ON tst USING hnsw (v $opclass); + ALTER TABLE tst RESET (parallel_workers); + )); + is($ret, 0, $stderr); + like($stderr, qr/using \d+ parallel workers/); + like($stderr, qr/hnsw graph no longer fits into maintenance_work_mem/); + + $node->safe_psql("postgres", "DROP INDEX idx;"); } done_testing();