From 19a0e1b341ef5f063bf988512c877c868f31431f Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Sun, 7 Jan 2024 20:15:30 -0800 Subject: [PATCH] Moved graph to separate struct --- src/hnsw.h | 17 +++++++---- src/hnswbuild.c | 79 ++++++++++++++++++++++++++++++------------------- 2 files changed, 60 insertions(+), 36 deletions(-) diff --git a/src/hnsw.h b/src/hnsw.h index 5a5eef9..bf287f3 100644 --- a/src/hnsw.h +++ b/src/hnsw.h @@ -148,6 +148,15 @@ typedef struct HnswOptions int efConstruction; /* size of dynamic candidate list */ } HnswOptions; +typedef struct HnswGraph +{ + slist_head elements; + HnswElement entryPoint; + long memoryUsed; + long memoryTotal; + bool flushed; +} HnswGraph; + typedef struct HnswSpool { Relation heap; @@ -172,6 +181,7 @@ typedef struct HnswShared int nparticipantsdone; double reltuples; double indtuples; + HnswGraph graphData; #if PG_VERSION_NUM < 120000 ParallelHeapScanDescData heapdesc; /* must come last */ @@ -214,13 +224,10 @@ typedef struct HnswBuildState Oid collation; /* Variables */ - slist_head elements; - HnswElement entryPoint; + HnswGraph graphData; + HnswGraph *graph; double ml; int maxLevel; - long memoryUsed; - long memoryTotal; - bool flushed; Vector *normvec; /* Memory */ diff --git a/src/hnswbuild.c b/src/hnswbuild.c index 476dd0a..349c576 100644 --- a/src/hnswbuild.c +++ b/src/hnswbuild.c @@ -152,7 +152,7 @@ CreateElementPages(HnswBuildState * buildstate) page = BufferGetPage(buf); HnswInitPage(buf, page); - slist_foreach(iter, &buildstate->elements) + slist_foreach(iter, &buildstate->graph->elements) { HnswElement element = slist_container(HnswElementData, next, iter.cur); Size etupSize; @@ -212,7 +212,7 @@ CreateElementPages(HnswBuildState * buildstate) MarkBufferDirty(buf); UnlockReleaseBuffer(buf); - HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, buildstate->entryPoint, insertPage, forkNum, true); + HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, buildstate->graph->entryPoint, insertPage, forkNum, true); pfree(etup); pfree(ntup); @@ -233,7 +233,7 @@ CreateNeighborPages(HnswBuildState * buildstate) /* Allocate once */ ntup = palloc0(BLCKSZ); - slist_foreach(iter, &buildstate->elements) + slist_foreach(iter, &buildstate->graph->elements) { HnswElement e = slist_container(HnswElementData, next, iter.cur); Buffer buf; @@ -293,7 +293,7 @@ FlushPages(HnswBuildState * buildstate) CreateElementPages(buildstate); CreateNeighborPages(buildstate); - buildstate->flushed = true; + buildstate->graph->flushed = true; MemoryContextReset(buildstate->graphCtx); } @@ -324,7 +324,8 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil { FmgrInfo *procinfo = buildstate->procinfo; Oid collation = buildstate->collation; - HnswElement entryPoint = buildstate->entryPoint; + HnswGraph *graph = buildstate->graph; + HnswElement entryPoint = graph->entryPoint; int efConstruction = buildstate->efConstruction; int m = buildstate->m; MemoryContext oldCtx; @@ -356,7 +357,7 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil if (dup == NULL) { /* Add element */ - slist_push_head(&buildstate->elements, &element->next); + slist_push_head(&graph->elements, &element->next); /* Update neighbors */ for (int lc = element->level; lc >= 0; lc--) @@ -370,7 +371,7 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil /* Update entry point if needed */ if (entryPoint == NULL || element->level > entryPoint->level) - buildstate->entryPoint = element; + graph->entryPoint = element; } else { @@ -381,9 +382,9 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil /* Update memory usage */ #if PG_VERSION_NUM >= 130000 - buildstate->memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false); + graph->memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false); #else - buildstate->memoryUsed += HnswElementMemory(element, buildstate->m); + graph->memoryUsed += HnswElementMemory(element, buildstate->m); #endif return true; @@ -397,6 +398,8 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, bool *isnull, bool tupleIsAlive, void *state) { HnswBuildState *buildstate = (HnswBuildState *) state; + HnswGraph *graph = buildstate->graph; + HnswShared *hnswshared = buildstate->hnswshared; MemoryContext oldCtx; bool inserted; @@ -409,20 +412,28 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, return; /* Flush pages if needed */ - if (!buildstate->flushed && buildstate->memoryUsed >= buildstate->memoryTotal) + if (!graph->flushed && graph->memoryUsed >= graph->memoryTotal) { - ereport(NOTICE, - (errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) buildstate->indtuples), - errdetail("Building will take significantly more time."), - errhint("Increase maintenance_work_mem to speed up builds."))); + if (hnswshared) + SpinLockAcquire(&hnswshared->mutex); - FlushPages(buildstate); + if (!hnswshared) + ereport(NOTICE, + (errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) buildstate->indtuples), + errdetail("Building will take significantly more time."), + errhint("Increase maintenance_work_mem to speed up builds."))); + + if (!graph->flushed) + FlushPages(buildstate); + + if (hnswshared) + SpinLockRelease(&hnswshared->mutex); } oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); /* Insert tuple */ - if (buildstate->flushed) + if (graph->flushed) inserted = HnswInsertTuple(index, values, isnull, tid, buildstate->heap, true); else inserted = InsertTupleInMemory(index, values, tid, buildstate); @@ -430,8 +441,6 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, /* Update progress */ if (inserted) { - HnswShared *hnswshared = buildstate->hnswshared; - if (hnswshared) { SpinLockAcquire(&hnswshared->mutex); @@ -447,6 +456,19 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values, MemoryContextReset(buildstate->tmpCtx); } +/* + * Initialize the graph + */ +static void +InitGraph(HnswGraph * graph) +{ + slist_init(&graph->elements); + graph->entryPoint = NULL; + graph->memoryUsed = 0; + graph->memoryTotal = maintenance_work_mem * 1024L; + graph->flushed = false; +} + /* * Initialize the build state */ @@ -480,13 +502,10 @@ InitBuildState(HnswBuildState * buildstate, Relation heap, Relation index, Index buildstate->normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); buildstate->collation = index->rd_indcollation[0]; - slist_init(&buildstate->elements); - buildstate->entryPoint = NULL; + InitGraph(&buildstate->graphData); + buildstate->graph = &buildstate->graphData; buildstate->ml = HnswGetMl(buildstate->m); buildstate->maxLevel = HnswGetMaxLevel(buildstate->m); - buildstate->memoryUsed = 0; - buildstate->memoryTotal = maintenance_work_mem * 1024L; - buildstate->flushed = false; /* Reuse for each tuple */ buildstate->normvec = InitVector(buildstate->dimensions); @@ -532,6 +551,7 @@ ParallelHeapScan(HnswBuildState * buildstate) SpinLockAcquire(&hnswshared->mutex); if (hnswshared->nparticipantsdone == nparticipanttuplesorts) { + buildstate->graph = &hnswshared->graphData; buildstate->indtuples = hnswshared->indtuples; reltuples = hnswshared->reltuples; SpinLockRelease(&hnswshared->mutex); @@ -567,9 +587,7 @@ HnswParallelScanAndInsert(HnswSpool * hnswspool, HnswShared * hnswshared, bool p indexInfo = BuildIndexInfo(hnswspool->index); indexInfo->ii_Concurrent = hnswshared->isconcurrent; InitBuildState(&buildstate, hnswspool->heap, hnswspool->index, indexInfo, MAIN_FORKNUM); - /* TODO Support in-memory builds */ - buildstate.memoryTotal = 0; - buildstate.flushed = true; + buildstate.graph = &hnswshared->graphData; buildstate.hnswshared = hnswshared; #if PG_VERSION_NUM >= 120000 scan = table_beginscan_parallel(hnswspool->heap, @@ -795,6 +813,9 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) hnswshared->nparticipantsdone = 0; hnswshared->reltuples = 0; hnswshared->indtuples = 0; + InitGraph(&hnswshared->graphData); + /* TODO Support in-memory builds */ + hnswshared->graphData.memoryTotal = 0; #if PG_VERSION_NUM >= 120000 table_parallelscan_initialize(buildstate->heap, ParallelTableScanFromHnswShared(hnswshared), @@ -882,11 +903,7 @@ BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum) /* Attempt to launch parallel worker scan when required */ if (parallel_workers > 0) - { - /* TODO Support in-memory builds */ - FlushPages(buildstate); HnswBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers); - } /* Add tuples to graph */ if (buildstate->heap != NULL) @@ -906,7 +923,7 @@ BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum) } /* Flush pages */ - if (!buildstate->flushed) + if (!buildstate->graph->flushed) FlushPages(buildstate); /* End parallel build */