Moved graph to separate struct

This commit is contained in:
Andrew Kane
2024-01-07 20:15:30 -08:00
parent c7fe1571ee
commit 19a0e1b341
2 changed files with 60 additions and 36 deletions

View File

@@ -148,6 +148,15 @@ typedef struct HnswOptions
int efConstruction; /* size of dynamic candidate list */
} HnswOptions;
typedef struct HnswGraph
{
slist_head elements;
HnswElement entryPoint;
long memoryUsed;
long memoryTotal;
bool flushed;
} HnswGraph;
typedef struct HnswSpool
{
Relation heap;
@@ -172,6 +181,7 @@ typedef struct HnswShared
int nparticipantsdone;
double reltuples;
double indtuples;
HnswGraph graphData;
#if PG_VERSION_NUM < 120000
ParallelHeapScanDescData heapdesc; /* must come last */
@@ -214,13 +224,10 @@ typedef struct HnswBuildState
Oid collation;
/* Variables */
slist_head elements;
HnswElement entryPoint;
HnswGraph graphData;
HnswGraph *graph;
double ml;
int maxLevel;
long memoryUsed;
long memoryTotal;
bool flushed;
Vector *normvec;
/* Memory */

View File

@@ -152,7 +152,7 @@ CreateElementPages(HnswBuildState * buildstate)
page = BufferGetPage(buf);
HnswInitPage(buf, page);
slist_foreach(iter, &buildstate->elements)
slist_foreach(iter, &buildstate->graph->elements)
{
HnswElement element = slist_container(HnswElementData, next, iter.cur);
Size etupSize;
@@ -212,7 +212,7 @@ CreateElementPages(HnswBuildState * buildstate)
MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);
HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, buildstate->entryPoint, insertPage, forkNum, true);
HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, buildstate->graph->entryPoint, insertPage, forkNum, true);
pfree(etup);
pfree(ntup);
@@ -233,7 +233,7 @@ CreateNeighborPages(HnswBuildState * buildstate)
/* Allocate once */
ntup = palloc0(BLCKSZ);
slist_foreach(iter, &buildstate->elements)
slist_foreach(iter, &buildstate->graph->elements)
{
HnswElement e = slist_container(HnswElementData, next, iter.cur);
Buffer buf;
@@ -293,7 +293,7 @@ FlushPages(HnswBuildState * buildstate)
CreateElementPages(buildstate);
CreateNeighborPages(buildstate);
buildstate->flushed = true;
buildstate->graph->flushed = true;
MemoryContextReset(buildstate->graphCtx);
}
@@ -324,7 +324,8 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil
{
FmgrInfo *procinfo = buildstate->procinfo;
Oid collation = buildstate->collation;
HnswElement entryPoint = buildstate->entryPoint;
HnswGraph *graph = buildstate->graph;
HnswElement entryPoint = graph->entryPoint;
int efConstruction = buildstate->efConstruction;
int m = buildstate->m;
MemoryContext oldCtx;
@@ -356,7 +357,7 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil
if (dup == NULL)
{
/* Add element */
slist_push_head(&buildstate->elements, &element->next);
slist_push_head(&graph->elements, &element->next);
/* Update neighbors */
for (int lc = element->level; lc >= 0; lc--)
@@ -370,7 +371,7 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil
/* Update entry point if needed */
if (entryPoint == NULL || element->level > entryPoint->level)
buildstate->entryPoint = element;
graph->entryPoint = element;
}
else
{
@@ -381,9 +382,9 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil
/* Update memory usage */
#if PG_VERSION_NUM >= 130000
buildstate->memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false);
graph->memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false);
#else
buildstate->memoryUsed += HnswElementMemory(element, buildstate->m);
graph->memoryUsed += HnswElementMemory(element, buildstate->m);
#endif
return true;
@@ -397,6 +398,8 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
bool *isnull, bool tupleIsAlive, void *state)
{
HnswBuildState *buildstate = (HnswBuildState *) state;
HnswGraph *graph = buildstate->graph;
HnswShared *hnswshared = buildstate->hnswshared;
MemoryContext oldCtx;
bool inserted;
@@ -409,20 +412,28 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
return;
/* Flush pages if needed */
if (!buildstate->flushed && buildstate->memoryUsed >= buildstate->memoryTotal)
if (!graph->flushed && graph->memoryUsed >= graph->memoryTotal)
{
ereport(NOTICE,
(errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) buildstate->indtuples),
errdetail("Building will take significantly more time."),
errhint("Increase maintenance_work_mem to speed up builds.")));
if (hnswshared)
SpinLockAcquire(&hnswshared->mutex);
FlushPages(buildstate);
if (!hnswshared)
ereport(NOTICE,
(errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) buildstate->indtuples),
errdetail("Building will take significantly more time."),
errhint("Increase maintenance_work_mem to speed up builds.")));
if (!graph->flushed)
FlushPages(buildstate);
if (hnswshared)
SpinLockRelease(&hnswshared->mutex);
}
oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
/* Insert tuple */
if (buildstate->flushed)
if (graph->flushed)
inserted = HnswInsertTuple(index, values, isnull, tid, buildstate->heap, true);
else
inserted = InsertTupleInMemory(index, values, tid, buildstate);
@@ -430,8 +441,6 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
/* Update progress */
if (inserted)
{
HnswShared *hnswshared = buildstate->hnswshared;
if (hnswshared)
{
SpinLockAcquire(&hnswshared->mutex);
@@ -447,6 +456,19 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
MemoryContextReset(buildstate->tmpCtx);
}
/*
* Initialize the graph
*/
static void
InitGraph(HnswGraph * graph)
{
slist_init(&graph->elements);
graph->entryPoint = NULL;
graph->memoryUsed = 0;
graph->memoryTotal = maintenance_work_mem * 1024L;
graph->flushed = false;
}
/*
* Initialize the build state
*/
@@ -480,13 +502,10 @@ InitBuildState(HnswBuildState * buildstate, Relation heap, Relation index, Index
buildstate->normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC);
buildstate->collation = index->rd_indcollation[0];
slist_init(&buildstate->elements);
buildstate->entryPoint = NULL;
InitGraph(&buildstate->graphData);
buildstate->graph = &buildstate->graphData;
buildstate->ml = HnswGetMl(buildstate->m);
buildstate->maxLevel = HnswGetMaxLevel(buildstate->m);
buildstate->memoryUsed = 0;
buildstate->memoryTotal = maintenance_work_mem * 1024L;
buildstate->flushed = false;
/* Reuse for each tuple */
buildstate->normvec = InitVector(buildstate->dimensions);
@@ -532,6 +551,7 @@ ParallelHeapScan(HnswBuildState * buildstate)
SpinLockAcquire(&hnswshared->mutex);
if (hnswshared->nparticipantsdone == nparticipanttuplesorts)
{
buildstate->graph = &hnswshared->graphData;
buildstate->indtuples = hnswshared->indtuples;
reltuples = hnswshared->reltuples;
SpinLockRelease(&hnswshared->mutex);
@@ -567,9 +587,7 @@ HnswParallelScanAndInsert(HnswSpool * hnswspool, HnswShared * hnswshared, bool p
indexInfo = BuildIndexInfo(hnswspool->index);
indexInfo->ii_Concurrent = hnswshared->isconcurrent;
InitBuildState(&buildstate, hnswspool->heap, hnswspool->index, indexInfo, MAIN_FORKNUM);
/* TODO Support in-memory builds */
buildstate.memoryTotal = 0;
buildstate.flushed = true;
buildstate.graph = &hnswshared->graphData;
buildstate.hnswshared = hnswshared;
#if PG_VERSION_NUM >= 120000
scan = table_beginscan_parallel(hnswspool->heap,
@@ -795,6 +813,9 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request)
hnswshared->nparticipantsdone = 0;
hnswshared->reltuples = 0;
hnswshared->indtuples = 0;
InitGraph(&hnswshared->graphData);
/* TODO Support in-memory builds */
hnswshared->graphData.memoryTotal = 0;
#if PG_VERSION_NUM >= 120000
table_parallelscan_initialize(buildstate->heap,
ParallelTableScanFromHnswShared(hnswshared),
@@ -882,11 +903,7 @@ BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum)
/* Attempt to launch parallel worker scan when required */
if (parallel_workers > 0)
{
/* TODO Support in-memory builds */
FlushPages(buildstate);
HnswBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers);
}
/* Add tuples to graph */
if (buildstate->heap != NULL)
@@ -906,7 +923,7 @@ BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum)
}
/* Flush pages */
if (!buildstate->flushed)
if (!buildstate->graph->flushed)
FlushPages(buildstate);
/* End parallel build */