mirror of
https://github.com/pgvector/pgvector.git
synced 2026-07-02 18:50:56 +08:00
1165 lines
31 KiB
C
1165 lines
31 KiB
C
#include "postgres.h"
|
|
|
|
#include <math.h>
|
|
|
|
#include "access/parallel.h"
|
|
#include "access/xact.h"
|
|
#include "catalog/index.h"
|
|
#include "hnsw.h"
|
|
#include "miscadmin.h"
|
|
#include "lib/pairingheap.h"
|
|
#include "nodes/pg_list.h"
|
|
#include "storage/bufmgr.h"
|
|
#include "tcop/tcopprot.h"
|
|
#include "utils/datum.h"
|
|
#include "utils/memutils.h"
|
|
|
|
#if PG_VERSION_NUM >= 140000
|
|
#include "utils/backend_progress.h"
|
|
#elif PG_VERSION_NUM >= 120000
|
|
#include "pgstat.h"
|
|
#endif
|
|
|
|
#if PG_VERSION_NUM >= 120000
|
|
#include "access/tableam.h"
|
|
#include "commands/progress.h"
|
|
#else
|
|
#define PROGRESS_CREATEIDX_TUPLES_DONE 0
|
|
#endif
|
|
|
|
#if PG_VERSION_NUM >= 130000
|
|
#define CALLBACK_ITEM_POINTER ItemPointer tid
|
|
#else
|
|
#define CALLBACK_ITEM_POINTER HeapTuple hup
|
|
#endif
|
|
|
|
#if PG_VERSION_NUM >= 120000
|
|
#define UpdateProgress(index, val) pgstat_progress_update_param(index, val)
|
|
#else
|
|
#define UpdateProgress(index, val) ((void)val)
|
|
#endif
|
|
|
|
#if PG_VERSION_NUM >= 140000
|
|
#include "utils/backend_status.h"
|
|
#include "utils/wait_event.h"
|
|
#endif
|
|
|
|
#if PG_VERSION_NUM >= 120000
|
|
#include "access/table.h"
|
|
#include "optimizer/optimizer.h"
|
|
#else
|
|
#include "access/heapam.h"
|
|
#include "optimizer/planner.h"
|
|
#include "pgstat.h"
|
|
#endif
|
|
|
|
#define PARALLEL_KEY_HNSW_SHARED UINT64CONST(0xA000000000000001)
|
|
#define PARALLEL_KEY_HNSW_AREA UINT64CONST(0xA000000000000002)
|
|
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000003)
|
|
|
|
#if PG_VERSION_NUM < 130000
|
|
#define GENERATIONCHUNK_RAWSIZE (SIZEOF_SIZE_T + SIZEOF_VOID_P * 2)
|
|
#endif
|
|
|
|
/*
|
|
* Create the metapage
|
|
*/
|
|
static void
|
|
CreateMetaPage(HnswBuildState * buildstate)
|
|
{
|
|
Relation index = buildstate->index;
|
|
ForkNumber forkNum = buildstate->forkNum;
|
|
Buffer buf;
|
|
Page page;
|
|
HnswMetaPage metap;
|
|
|
|
buf = HnswNewBuffer(index, forkNum);
|
|
page = BufferGetPage(buf);
|
|
HnswInitPage(buf, page);
|
|
|
|
/* Set metapage data */
|
|
metap = HnswPageGetMeta(page);
|
|
metap->magicNumber = HNSW_MAGIC_NUMBER;
|
|
metap->version = HNSW_VERSION;
|
|
metap->dimensions = buildstate->dimensions;
|
|
metap->m = buildstate->m;
|
|
metap->efConstruction = buildstate->efConstruction;
|
|
metap->entryBlkno = InvalidBlockNumber;
|
|
metap->entryOffno = InvalidOffsetNumber;
|
|
metap->entryLevel = -1;
|
|
metap->insertPage = InvalidBlockNumber;
|
|
((PageHeader) page)->pd_lower =
|
|
((char *) metap + sizeof(HnswMetaPageData)) - (char *) page;
|
|
|
|
MarkBufferDirty(buf);
|
|
UnlockReleaseBuffer(buf);
|
|
}
|
|
|
|
/*
|
|
* Add a new page
|
|
*/
|
|
static void
|
|
HnswBuildAppendPage(Relation index, Buffer *buf, Page *page, ForkNumber forkNum)
|
|
{
|
|
/* Add a new page */
|
|
Buffer newbuf = HnswNewBuffer(index, forkNum);
|
|
|
|
/* Update previous page */
|
|
HnswPageGetOpaque(*page)->nextblkno = BufferGetBlockNumber(newbuf);
|
|
|
|
/* Commit */
|
|
MarkBufferDirty(*buf);
|
|
UnlockReleaseBuffer(*buf);
|
|
|
|
/* Can take a while, so ensure we can interrupt */
|
|
/* Needs to be called when no buffer locks are held */
|
|
LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
|
|
CHECK_FOR_INTERRUPTS();
|
|
LockBuffer(newbuf, BUFFER_LOCK_EXCLUSIVE);
|
|
|
|
/* Prepare new page */
|
|
*buf = newbuf;
|
|
*page = BufferGetPage(*buf);
|
|
HnswInitPage(*buf, *page);
|
|
}
|
|
|
|
/*
|
|
* Create element pages
|
|
*/
|
|
static void
|
|
CreateElementPages(HnswBuildState * buildstate)
|
|
{
|
|
Relation index = buildstate->index;
|
|
ForkNumber forkNum = buildstate->forkNum;
|
|
Size etupAllocSize;
|
|
Size maxSize;
|
|
HnswElementTuple etup;
|
|
HnswNeighborTuple ntup;
|
|
BlockNumber insertPage;
|
|
HnswElement entryPoint;
|
|
Buffer buf;
|
|
Page page;
|
|
HnswElementPtr iter = buildstate->graph->head;
|
|
char *base = buildstate->hnswarea;
|
|
|
|
/* Calculate sizes */
|
|
etupAllocSize = BLCKSZ;
|
|
maxSize = HNSW_MAX_SIZE;
|
|
|
|
/* Allocate once */
|
|
etup = palloc0(etupAllocSize);
|
|
ntup = palloc0(BLCKSZ);
|
|
|
|
/* Prepare first page */
|
|
buf = HnswNewBuffer(index, forkNum);
|
|
page = BufferGetPage(buf);
|
|
HnswInitPage(buf, page);
|
|
|
|
while (!HnswPtrIsNull(base, iter))
|
|
{
|
|
HnswElement element = HnswPtrAccess(base, iter);
|
|
Size etupSize;
|
|
Size ntupSize;
|
|
Size combinedSize;
|
|
void *valuePtr = HnswPtrAccess(base, element->value);
|
|
|
|
/* Update iterator */
|
|
iter = element->next;
|
|
|
|
/* Zero memory for each element */
|
|
MemSet(etup, 0, etupAllocSize);
|
|
|
|
/* Calculate sizes */
|
|
etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(valuePtr));
|
|
ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(element->level, buildstate->m);
|
|
combinedSize = etupSize + ntupSize + sizeof(ItemIdData);
|
|
|
|
/* Initial size check */
|
|
if (etupSize > etupAllocSize)
|
|
elog(ERROR, "index tuple too large");
|
|
|
|
HnswSetElementTuple(base, etup, element);
|
|
|
|
/* Keep element and neighbors on the same page if possible */
|
|
if (PageGetFreeSpace(page) < etupSize || (combinedSize <= maxSize && PageGetFreeSpace(page) < combinedSize))
|
|
HnswBuildAppendPage(index, &buf, &page, forkNum);
|
|
|
|
/* Calculate offsets */
|
|
element->blkno = BufferGetBlockNumber(buf);
|
|
element->offno = OffsetNumberNext(PageGetMaxOffsetNumber(page));
|
|
if (combinedSize <= maxSize)
|
|
{
|
|
element->neighborPage = element->blkno;
|
|
element->neighborOffno = OffsetNumberNext(element->offno);
|
|
}
|
|
else
|
|
{
|
|
element->neighborPage = element->blkno + 1;
|
|
element->neighborOffno = FirstOffsetNumber;
|
|
}
|
|
|
|
ItemPointerSet(&etup->neighbortid, element->neighborPage, element->neighborOffno);
|
|
|
|
/* Add element */
|
|
if (PageAddItem(page, (Item) etup, etupSize, InvalidOffsetNumber, false, false) != element->offno)
|
|
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
|
|
|
|
/* Add new page if needed */
|
|
if (PageGetFreeSpace(page) < ntupSize)
|
|
HnswBuildAppendPage(index, &buf, &page, forkNum);
|
|
|
|
/* Add placeholder for neighbors */
|
|
if (PageAddItem(page, (Item) ntup, ntupSize, InvalidOffsetNumber, false, false) != element->neighborOffno)
|
|
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
|
|
}
|
|
|
|
insertPage = BufferGetBlockNumber(buf);
|
|
|
|
/* Commit */
|
|
MarkBufferDirty(buf);
|
|
UnlockReleaseBuffer(buf);
|
|
|
|
entryPoint = HnswPtrAccess(base, buildstate->graph->entryPoint);
|
|
HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, entryPoint, insertPage, forkNum, true);
|
|
|
|
pfree(etup);
|
|
pfree(ntup);
|
|
}
|
|
|
|
/*
|
|
* Create neighbor pages
|
|
*/
|
|
static void
|
|
CreateNeighborPages(HnswBuildState * buildstate)
|
|
{
|
|
Relation index = buildstate->index;
|
|
ForkNumber forkNum = buildstate->forkNum;
|
|
int m = buildstate->m;
|
|
HnswElementPtr iter = buildstate->graph->head;
|
|
char *base = buildstate->hnswarea;
|
|
HnswNeighborTuple ntup;
|
|
|
|
/* Allocate once */
|
|
ntup = palloc0(BLCKSZ);
|
|
|
|
while (!HnswPtrIsNull(base, iter))
|
|
{
|
|
HnswElement e = HnswPtrAccess(base, iter);
|
|
Buffer buf;
|
|
Page page;
|
|
Size ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(e->level, m);
|
|
|
|
/* Update iterator */
|
|
iter = e->next;
|
|
|
|
/* Can take a while, so ensure we can interrupt */
|
|
/* Needs to be called when no buffer locks are held */
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
buf = ReadBufferExtended(index, forkNum, e->neighborPage, RBM_NORMAL, NULL);
|
|
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
|
page = BufferGetPage(buf);
|
|
|
|
HnswSetNeighborTuple(base, ntup, e, m);
|
|
|
|
if (!PageIndexTupleOverwrite(page, e->neighborOffno, (Item) ntup, ntupSize))
|
|
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
|
|
|
|
/* Commit */
|
|
MarkBufferDirty(buf);
|
|
UnlockReleaseBuffer(buf);
|
|
}
|
|
|
|
pfree(ntup);
|
|
}
|
|
|
|
/*
|
|
* Flush pages
|
|
*/
|
|
static void
|
|
FlushPages(HnswBuildState * buildstate)
|
|
{
|
|
#ifdef HNSW_MEMORY
|
|
elog(INFO, "memory: %zu MB", buildstate->graph->memoryUsed / (1024 * 1024));
|
|
#endif
|
|
|
|
CreateMetaPage(buildstate);
|
|
CreateElementPages(buildstate);
|
|
CreateNeighborPages(buildstate);
|
|
|
|
buildstate->graph->flushed = true;
|
|
MemoryContextReset(buildstate->graphCtx);
|
|
}
|
|
|
|
/*
|
|
* Add a heap TID to an existing element
|
|
*/
|
|
static bool
|
|
HnswAddDuplicateInMemory(HnswElement element, HnswElement dup)
|
|
{
|
|
LWLockAcquire(&dup->lock, LW_EXCLUSIVE);
|
|
|
|
if (dup->heaptidsLength == HNSW_HEAPTIDS)
|
|
{
|
|
LWLockRelease(&dup->lock);
|
|
return false;
|
|
}
|
|
|
|
HnswAddHeapTid(dup, &element->heaptids[0]);
|
|
|
|
LWLockRelease(&dup->lock);
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Find duplicate element
|
|
*/
|
|
static bool
|
|
HnswFindDuplicateInMemory(char *base, HnswElement element)
|
|
{
|
|
HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0);
|
|
|
|
for (int i = 0; i < neighbors->length; i++)
|
|
{
|
|
HnswCandidate *neighbor = &neighbors->items[i];
|
|
HnswElement neighborElement = HnswPtrAccess(base, neighbor->element);
|
|
Datum value = HnswGetValue(base, element);
|
|
Datum neighborValue = HnswGetValue(base, neighborElement);
|
|
|
|
/* Exit early since ordered by distance */
|
|
if (!datumIsEqual(value, neighborValue, false, -1))
|
|
return false;
|
|
|
|
/* Check for space */
|
|
if (HnswAddDuplicateInMemory(element, neighborElement))
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/*
|
|
* Add to element and neighbor pages
|
|
*/
|
|
static void
|
|
WriteNewElementPagesInMemory(char *base, HnswGraph * graph, HnswElement element)
|
|
{
|
|
SpinLockAcquire(&graph->lock);
|
|
element->next = graph->head;
|
|
HnswPtrStore(base, graph->head, element);
|
|
SpinLockRelease(&graph->lock);
|
|
}
|
|
|
|
/*
|
|
* Update neighbors
|
|
*/
|
|
static void
|
|
HnswUpdateNeighborPagesInMemory(char *base, FmgrInfo *procinfo, Oid collation, HnswElement e, int m)
|
|
{
|
|
for (int lc = e->level; lc >= 0; lc--)
|
|
{
|
|
int lm = HnswGetLayerM(m, lc);
|
|
HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc);
|
|
|
|
for (int i = 0; i < neighbors->length; i++)
|
|
{
|
|
HnswCandidate *hc = &neighbors->items[i];
|
|
HnswElement neighborElement = HnswPtrAccess(base, hc->element);
|
|
|
|
/* Keep scan-build happy on Mac x86-64 */
|
|
Assert(neighborElement);
|
|
|
|
/* Use element for lock instead of hc since hc can be replaced */
|
|
LWLockAcquire(&neighborElement->lock, LW_EXCLUSIVE);
|
|
HnswUpdateConnection(base, e, hc, lm, lc, NULL, NULL, procinfo, collation);
|
|
LWLockRelease(&neighborElement->lock);
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Write changes in memory
|
|
*/
|
|
static void
|
|
WriteElementInMemory(FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement entryPoint, HnswBuildState * buildstate, HnswGraph * graph, bool updateEntryPoint)
|
|
{
|
|
char *base = buildstate->hnswarea;
|
|
|
|
/* Try to add to existing page */
|
|
if (HnswFindDuplicateInMemory(base, element))
|
|
return;
|
|
|
|
/* Write element and neighbor tuples */
|
|
WriteNewElementPagesInMemory(base, graph, element);
|
|
|
|
/* Update neighbors */
|
|
HnswUpdateNeighborPagesInMemory(base, procinfo, collation, element, m);
|
|
|
|
/* Update entry point if needed (already have lock) */
|
|
if (updateEntryPoint)
|
|
HnswPtrStore(base, graph->entryPoint, element);
|
|
}
|
|
|
|
/*
|
|
* Insert tuple in memory
|
|
*/
|
|
static void
|
|
InsertTupleInMemory(HnswBuildState * buildstate, HnswElement element)
|
|
{
|
|
FmgrInfo *procinfo = buildstate->procinfo;
|
|
Oid collation = buildstate->collation;
|
|
HnswGraph *graph = buildstate->graph;
|
|
HnswElement entryPoint;
|
|
LWLock *entryLock = &graph->entryLock;
|
|
bool updateEntryPoint = false;
|
|
int efConstruction = buildstate->efConstruction;
|
|
int m = buildstate->m;
|
|
char *base = buildstate->hnswarea;
|
|
|
|
/* Get entry point */
|
|
LWLockAcquire(entryLock, LW_SHARED);
|
|
entryPoint = HnswPtrAccess(base, graph->entryPoint);
|
|
|
|
/* Prevent concurrent inserts when likely updating entry point */
|
|
if (entryPoint == NULL || element->level > entryPoint->level)
|
|
{
|
|
/* Release shared lock */
|
|
LWLockRelease(entryLock);
|
|
|
|
/* Get exclusive lock */
|
|
LWLockAcquire(entryLock, LW_EXCLUSIVE);
|
|
|
|
/* Get latest entry point after lock is acquired */
|
|
entryPoint = HnswPtrAccess(base, graph->entryPoint);
|
|
updateEntryPoint = entryPoint == NULL || element->level > entryPoint->level;
|
|
}
|
|
|
|
/* Insert element in graph */
|
|
HnswInsertElement(base, element, entryPoint, NULL, procinfo, collation, m, efConstruction, false);
|
|
|
|
/* Write to memory */
|
|
WriteElementInMemory(procinfo, collation, element, m, efConstruction, entryPoint, buildstate, graph, updateEntryPoint);
|
|
|
|
/* Release entry lock */
|
|
LWLockRelease(entryLock);
|
|
}
|
|
|
|
/*
|
|
* Insert tuple
|
|
*/
|
|
static bool
|
|
InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, HnswBuildState * buildstate)
|
|
{
|
|
HnswGraph *graph = buildstate->graph;
|
|
HnswElement element;
|
|
HnswAllocator *allocator = &buildstate->allocator;
|
|
Size valueSize;
|
|
Pointer valuePtr;
|
|
LWLock *flushLock = &graph->flushLock;
|
|
char *base = buildstate->hnswarea;
|
|
|
|
/* Detoast once for all calls */
|
|
Datum value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
|
|
|
|
/* Normalize if needed */
|
|
if (buildstate->normprocinfo != NULL)
|
|
{
|
|
if (!HnswNormValue(buildstate->normprocinfo, buildstate->collation, &value, buildstate->normvec))
|
|
return false;
|
|
}
|
|
|
|
/* Get datum size */
|
|
valueSize = VARSIZE_ANY(DatumGetPointer(value));
|
|
|
|
/* Ensure graph not flushed when inserting */
|
|
LWLockAcquire(flushLock, LW_SHARED);
|
|
|
|
if (graph->flushed)
|
|
{
|
|
LWLockRelease(flushLock);
|
|
|
|
return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, buildstate->heap, true);
|
|
}
|
|
|
|
/* Get lock for allocator */
|
|
LWLockAcquire(&graph->allocatorLock, LW_EXCLUSIVE);
|
|
|
|
/* Flush pages if needed */
|
|
if (graph->memoryUsed >= graph->memoryTotal)
|
|
{
|
|
LWLockRelease(&graph->allocatorLock);
|
|
|
|
LWLockRelease(flushLock);
|
|
LWLockAcquire(flushLock, LW_EXCLUSIVE);
|
|
|
|
if (!graph->flushed)
|
|
{
|
|
ereport(NOTICE,
|
|
(errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) graph->indtuples),
|
|
errdetail("Building will take significantly more time."),
|
|
errhint("Increase maintenance_work_mem to speed up builds.")));
|
|
|
|
FlushPages(buildstate);
|
|
}
|
|
|
|
LWLockRelease(flushLock);
|
|
|
|
return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, buildstate->heap, true);
|
|
}
|
|
|
|
/* Create an element */
|
|
element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator);
|
|
valuePtr = HnswAlloc(allocator, valueSize);
|
|
|
|
/* Release allocator lock */
|
|
LWLockRelease(&graph->allocatorLock);
|
|
|
|
/* Copy datum */
|
|
memcpy(valuePtr, DatumGetPointer(value), valueSize);
|
|
HnswPtrStore(base, element->value, valuePtr);
|
|
|
|
/* Create element lock */
|
|
LWLockInitialize(&element->lock, hnsw_lock_tranche_id);
|
|
|
|
/* Insert tuple */
|
|
InsertTupleInMemory(buildstate, element);
|
|
|
|
/* Release flush lock */
|
|
LWLockRelease(flushLock);
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Callback for table_index_build_scan
|
|
*/
|
|
static void
|
|
BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
|
|
bool *isnull, bool tupleIsAlive, void *state)
|
|
{
|
|
HnswBuildState *buildstate = (HnswBuildState *) state;
|
|
HnswGraph *graph = buildstate->graph;
|
|
MemoryContext oldCtx;
|
|
|
|
#if PG_VERSION_NUM < 130000
|
|
ItemPointer tid = &hup->t_self;
|
|
#endif
|
|
|
|
/* Skip nulls */
|
|
if (isnull[0])
|
|
return;
|
|
|
|
/* Use memory context */
|
|
oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
|
|
|
|
/* Insert tuple */
|
|
if (InsertTuple(index, values, isnull, tid, buildstate))
|
|
{
|
|
/* Update progress */
|
|
SpinLockAcquire(&graph->lock);
|
|
UpdateProgress(PROGRESS_CREATEIDX_TUPLES_DONE, ++graph->indtuples);
|
|
SpinLockRelease(&graph->lock);
|
|
}
|
|
|
|
/* Reset memory context */
|
|
MemoryContextSwitchTo(oldCtx);
|
|
MemoryContextReset(buildstate->tmpCtx);
|
|
}
|
|
|
|
/*
|
|
* Initialize the graph
|
|
*/
|
|
static void
|
|
InitGraph(HnswGraph * graph, char *base, long memoryTotal)
|
|
{
|
|
HnswPtrStore(base, graph->head, (HnswElement) NULL);
|
|
HnswPtrStore(base, graph->entryPoint, (HnswElement) NULL);
|
|
graph->memoryUsed = 0;
|
|
graph->memoryTotal = memoryTotal;
|
|
graph->flushed = false;
|
|
graph->indtuples = 0;
|
|
SpinLockInit(&graph->lock);
|
|
LWLockInitialize(&graph->entryLock, hnsw_lock_tranche_id);
|
|
LWLockInitialize(&graph->allocatorLock, hnsw_lock_tranche_id);
|
|
LWLockInitialize(&graph->flushLock, hnsw_lock_tranche_id);
|
|
}
|
|
|
|
/*
|
|
* Initialize an allocator
|
|
*/
|
|
static void
|
|
InitAllocator(HnswAllocator * allocator, void *(*alloc) (Size size, void *state), void *state)
|
|
{
|
|
allocator->alloc = alloc;
|
|
allocator->state = state;
|
|
}
|
|
|
|
/*
|
|
* Memory context allocator
|
|
*/
|
|
static void *
|
|
HnswMemoryContextAlloc(Size size, void *state)
|
|
{
|
|
HnswBuildState *buildstate = (HnswBuildState *) state;
|
|
void *chunk = MemoryContextAlloc(buildstate->graphCtx, size);
|
|
|
|
#if PG_VERSION_NUM >= 130000
|
|
buildstate->graphData.memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false);
|
|
#else
|
|
buildstate->graphData.memoryUsed += MAXALIGN(size);
|
|
#endif
|
|
|
|
return chunk;
|
|
}
|
|
|
|
/*
|
|
* Shared memory allocator
|
|
*/
|
|
static void *
|
|
HnswSharedMemoryAlloc(Size size, void *state)
|
|
{
|
|
HnswBuildState *buildstate = (HnswBuildState *) state;
|
|
void *chunk = buildstate->hnswarea + buildstate->graph->memoryUsed;
|
|
|
|
buildstate->graph->memoryUsed += MAXALIGN(size);
|
|
return chunk;
|
|
}
|
|
|
|
/*
|
|
* Initialize the build state
|
|
*/
|
|
static void
|
|
InitBuildState(HnswBuildState * buildstate, Relation heap, Relation index, IndexInfo *indexInfo, ForkNumber forkNum)
|
|
{
|
|
buildstate->heap = heap;
|
|
buildstate->index = index;
|
|
buildstate->indexInfo = indexInfo;
|
|
buildstate->forkNum = forkNum;
|
|
|
|
buildstate->m = HnswGetM(index);
|
|
buildstate->efConstruction = HnswGetEfConstruction(index);
|
|
buildstate->dimensions = TupleDescAttr(index->rd_att, 0)->atttypmod;
|
|
|
|
/* Require column to have dimensions to be indexed */
|
|
if (buildstate->dimensions < 0)
|
|
elog(ERROR, "column does not have dimensions");
|
|
|
|
if (buildstate->dimensions > HNSW_MAX_DIM)
|
|
elog(ERROR, "column cannot have more than %d dimensions for hnsw index", HNSW_MAX_DIM);
|
|
|
|
if (buildstate->efConstruction < 2 * buildstate->m)
|
|
elog(ERROR, "ef_construction must be greater than or equal to 2 * m");
|
|
|
|
buildstate->reltuples = 0;
|
|
buildstate->indtuples = 0;
|
|
|
|
/* Get support functions */
|
|
buildstate->procinfo = index_getprocinfo(index, 1, HNSW_DISTANCE_PROC);
|
|
buildstate->normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC);
|
|
buildstate->collation = index->rd_indcollation[0];
|
|
|
|
InitGraph(&buildstate->graphData, NULL, maintenance_work_mem * 1024L);
|
|
buildstate->graph = &buildstate->graphData;
|
|
buildstate->ml = HnswGetMl(buildstate->m);
|
|
buildstate->maxLevel = HnswGetMaxLevel(buildstate->m);
|
|
|
|
/* Reuse for each tuple */
|
|
buildstate->normvec = InitVector(buildstate->dimensions);
|
|
|
|
buildstate->graphCtx = GenerationContextCreate(CurrentMemoryContext,
|
|
"Hnsw build graph context",
|
|
#if PG_VERSION_NUM >= 150000
|
|
1024 * 1024, 1024 * 1024,
|
|
#endif
|
|
1024 * 1024);
|
|
buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
|
|
"Hnsw build temporary context",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
|
|
InitAllocator(&buildstate->allocator, &HnswMemoryContextAlloc, buildstate);
|
|
|
|
buildstate->hnswleader = NULL;
|
|
buildstate->hnswshared = NULL;
|
|
buildstate->hnswarea = NULL;
|
|
}
|
|
|
|
/*
|
|
* Free resources
|
|
*/
|
|
static void
|
|
FreeBuildState(HnswBuildState * buildstate)
|
|
{
|
|
pfree(buildstate->normvec);
|
|
MemoryContextDelete(buildstate->graphCtx);
|
|
MemoryContextDelete(buildstate->tmpCtx);
|
|
}
|
|
|
|
/*
|
|
* Within leader, wait for end of heap scan
|
|
*/
|
|
static double
|
|
ParallelHeapScan(HnswBuildState * buildstate)
|
|
{
|
|
HnswShared *hnswshared = buildstate->hnswleader->hnswshared;
|
|
int nparticipanttuplesorts;
|
|
double reltuples;
|
|
|
|
nparticipanttuplesorts = buildstate->hnswleader->nparticipanttuplesorts;
|
|
for (;;)
|
|
{
|
|
SpinLockAcquire(&hnswshared->mutex);
|
|
if (hnswshared->nparticipantsdone == nparticipanttuplesorts)
|
|
{
|
|
buildstate->graph = &hnswshared->graphData;
|
|
buildstate->hnswarea = buildstate->hnswleader->hnswarea;
|
|
reltuples = hnswshared->reltuples;
|
|
SpinLockRelease(&hnswshared->mutex);
|
|
break;
|
|
}
|
|
SpinLockRelease(&hnswshared->mutex);
|
|
|
|
ConditionVariableSleep(&hnswshared->workersdonecv,
|
|
WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
|
|
}
|
|
|
|
ConditionVariableCancelSleep();
|
|
|
|
return reltuples;
|
|
}
|
|
|
|
/*
|
|
* Perform a worker's portion of a parallel insert
|
|
*/
|
|
static void
|
|
HnswParallelScanAndInsert(Relation heapRel, Relation indexRel, HnswShared * hnswshared, char *hnswarea, bool progress)
|
|
{
|
|
HnswBuildState buildstate;
|
|
#if PG_VERSION_NUM >= 120000
|
|
TableScanDesc scan;
|
|
#else
|
|
HeapScanDesc scan;
|
|
#endif
|
|
double reltuples;
|
|
IndexInfo *indexInfo;
|
|
|
|
/* Join parallel scan */
|
|
indexInfo = BuildIndexInfo(indexRel);
|
|
indexInfo->ii_Concurrent = hnswshared->isconcurrent;
|
|
InitBuildState(&buildstate, heapRel, indexRel, indexInfo, MAIN_FORKNUM);
|
|
buildstate.graph = &hnswshared->graphData;
|
|
buildstate.hnswarea = hnswarea;
|
|
InitAllocator(&buildstate.allocator, &HnswSharedMemoryAlloc, &buildstate);
|
|
#if PG_VERSION_NUM >= 120000
|
|
scan = table_beginscan_parallel(heapRel,
|
|
ParallelTableScanFromHnswShared(hnswshared));
|
|
reltuples = table_index_build_scan(heapRel, indexRel, indexInfo,
|
|
true, progress, BuildCallback,
|
|
(void *) &buildstate, scan);
|
|
#else
|
|
scan = heap_beginscan_parallel(heapRel, &hnswshared->heapdesc);
|
|
reltuples = IndexBuildHeapScan(heapRel, indexRel, indexInfo,
|
|
true, BuildCallback,
|
|
(void *) &buildstate, scan);
|
|
#endif
|
|
|
|
/* Record statistics */
|
|
SpinLockAcquire(&hnswshared->mutex);
|
|
hnswshared->nparticipantsdone++;
|
|
hnswshared->reltuples += reltuples;
|
|
SpinLockRelease(&hnswshared->mutex);
|
|
|
|
/* Log statistics */
|
|
if (progress)
|
|
ereport(DEBUG1, (errmsg("leader processed " INT64_FORMAT " tuples", (int64) reltuples)));
|
|
else
|
|
ereport(DEBUG1, (errmsg("worker processed " INT64_FORMAT " tuples", (int64) reltuples)));
|
|
|
|
/* Notify leader */
|
|
ConditionVariableSignal(&hnswshared->workersdonecv);
|
|
|
|
FreeBuildState(&buildstate);
|
|
}
|
|
|
|
/*
|
|
* Perform work within a launched parallel process
|
|
*/
|
|
void
|
|
HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
|
|
{
|
|
char *sharedquery;
|
|
HnswShared *hnswshared;
|
|
char *hnswarea;
|
|
Relation heapRel;
|
|
Relation indexRel;
|
|
LOCKMODE heapLockmode;
|
|
LOCKMODE indexLockmode;
|
|
|
|
/* Set debug_query_string for individual workers first */
|
|
sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
|
|
debug_query_string = sharedquery;
|
|
|
|
/* Report the query string from leader */
|
|
pgstat_report_activity(STATE_RUNNING, debug_query_string);
|
|
|
|
/* Look up shared state */
|
|
hnswshared = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_SHARED, false);
|
|
|
|
/* Open relations using lock modes known to be obtained by index.c */
|
|
if (!hnswshared->isconcurrent)
|
|
{
|
|
heapLockmode = ShareLock;
|
|
indexLockmode = AccessExclusiveLock;
|
|
}
|
|
else
|
|
{
|
|
heapLockmode = ShareUpdateExclusiveLock;
|
|
indexLockmode = RowExclusiveLock;
|
|
}
|
|
|
|
/* Open relations within worker */
|
|
#if PG_VERSION_NUM >= 120000
|
|
heapRel = table_open(hnswshared->heaprelid, heapLockmode);
|
|
#else
|
|
heapRel = heap_open(hnswshared->heaprelid, heapLockmode);
|
|
#endif
|
|
indexRel = index_open(hnswshared->indexrelid, indexLockmode);
|
|
|
|
hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
|
|
|
|
/* Perform inserts */
|
|
HnswParallelScanAndInsert(heapRel, indexRel, hnswshared, hnswarea, false);
|
|
|
|
/* Close relations within worker */
|
|
index_close(indexRel, indexLockmode);
|
|
#if PG_VERSION_NUM >= 120000
|
|
table_close(heapRel, heapLockmode);
|
|
#else
|
|
heap_close(heapRel, heapLockmode);
|
|
#endif
|
|
}
|
|
|
|
/*
|
|
* End parallel build
|
|
*/
|
|
static void
|
|
HnswEndParallel(HnswLeader * hnswleader)
|
|
{
|
|
/* Shutdown worker processes */
|
|
WaitForParallelWorkersToFinish(hnswleader->pcxt);
|
|
|
|
/* Free last reference to MVCC snapshot, if one was used */
|
|
if (IsMVCCSnapshot(hnswleader->snapshot))
|
|
UnregisterSnapshot(hnswleader->snapshot);
|
|
DestroyParallelContext(hnswleader->pcxt);
|
|
ExitParallelMode();
|
|
}
|
|
|
|
/*
|
|
* Return size of shared memory required for parallel index build
|
|
*/
|
|
static Size
|
|
ParallelEstimateShared(Relation heap, Snapshot snapshot)
|
|
{
|
|
#if PG_VERSION_NUM >= 120000
|
|
return add_size(BUFFERALIGN(sizeof(HnswShared)), table_parallelscan_estimate(heap, snapshot));
|
|
#else
|
|
if (!IsMVCCSnapshot(snapshot))
|
|
{
|
|
Assert(snapshot == SnapshotAny);
|
|
return sizeof(HnswShared);
|
|
}
|
|
|
|
return add_size(offsetof(HnswShared, heapdesc) +
|
|
offsetof(ParallelHeapScanDescData, phs_snapshot_data),
|
|
EstimateSnapshotSpace(snapshot));
|
|
#endif
|
|
}
|
|
|
|
/*
|
|
* Within leader, participate as a parallel worker
|
|
*/
|
|
static void
|
|
HnswLeaderParticipateAsWorker(HnswBuildState * buildstate)
|
|
{
|
|
HnswLeader *hnswleader = buildstate->hnswleader;
|
|
|
|
/* Perform work common to all participants */
|
|
HnswParallelScanAndInsert(buildstate->heap, buildstate->index, hnswleader->hnswshared, hnswleader->hnswarea, true);
|
|
}
|
|
|
|
/*
|
|
* Begin parallel build
|
|
*/
|
|
static void
|
|
HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request)
|
|
{
|
|
ParallelContext *pcxt;
|
|
Snapshot snapshot;
|
|
Size esthnswshared;
|
|
Size esthnswarea;
|
|
Size estother;
|
|
HnswShared *hnswshared;
|
|
char *hnswarea;
|
|
HnswLeader *hnswleader = (HnswLeader *) palloc0(sizeof(HnswLeader));
|
|
bool leaderparticipates = true;
|
|
int querylen;
|
|
|
|
#ifdef DISABLE_LEADER_PARTICIPATION
|
|
leaderparticipates = false;
|
|
#endif
|
|
|
|
/* Enter parallel mode and create context */
|
|
EnterParallelMode();
|
|
Assert(request > 0);
|
|
#if PG_VERSION_NUM >= 120000
|
|
pcxt = CreateParallelContext("vector", "HnswParallelBuildMain", request);
|
|
#else
|
|
pcxt = CreateParallelContext("vector", "HnswParallelBuildMain", request, true);
|
|
#endif
|
|
|
|
/* Get snapshot for table scan */
|
|
if (!isconcurrent)
|
|
snapshot = SnapshotAny;
|
|
else
|
|
snapshot = RegisterSnapshot(GetTransactionSnapshot());
|
|
|
|
/* Estimate size of workspaces */
|
|
esthnswshared = ParallelEstimateShared(buildstate->heap, snapshot);
|
|
shm_toc_estimate_chunk(&pcxt->estimator, esthnswshared);
|
|
|
|
/* Leave space for other objects in shared memory */
|
|
/* Docker has a default limit of 64 MB for shm_size */
|
|
/* which happens to be the default value of maintenance_work_mem */
|
|
esthnswarea = maintenance_work_mem * 1024L;
|
|
estother = 2 * 1024 * 1024;
|
|
if (esthnswarea > estother)
|
|
esthnswarea -= estother;
|
|
|
|
shm_toc_estimate_chunk(&pcxt->estimator, esthnswarea);
|
|
shm_toc_estimate_keys(&pcxt->estimator, 2);
|
|
|
|
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
|
|
if (debug_query_string)
|
|
{
|
|
querylen = strlen(debug_query_string);
|
|
shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
|
|
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
|
}
|
|
else
|
|
querylen = 0; /* keep compiler quiet */
|
|
|
|
/* Everyone's had a chance to ask for space, so now create the DSM */
|
|
InitializeParallelDSM(pcxt);
|
|
|
|
/* If no DSM segment was available, back out (do serial build) */
|
|
if (pcxt->seg == NULL)
|
|
{
|
|
if (IsMVCCSnapshot(snapshot))
|
|
UnregisterSnapshot(snapshot);
|
|
DestroyParallelContext(pcxt);
|
|
ExitParallelMode();
|
|
return;
|
|
}
|
|
|
|
/* Store shared build state, for which we reserved space */
|
|
hnswshared = (HnswShared *) shm_toc_allocate(pcxt->toc, esthnswshared);
|
|
/* Initialize immutable state */
|
|
hnswshared->heaprelid = RelationGetRelid(buildstate->heap);
|
|
hnswshared->indexrelid = RelationGetRelid(buildstate->index);
|
|
hnswshared->isconcurrent = isconcurrent;
|
|
ConditionVariableInit(&hnswshared->workersdonecv);
|
|
SpinLockInit(&hnswshared->mutex);
|
|
/* Initialize mutable state */
|
|
hnswshared->nparticipantsdone = 0;
|
|
hnswshared->reltuples = 0;
|
|
#if PG_VERSION_NUM >= 120000
|
|
table_parallelscan_initialize(buildstate->heap,
|
|
ParallelTableScanFromHnswShared(hnswshared),
|
|
snapshot);
|
|
#else
|
|
heap_parallelscan_initialize(&hnswshared->heapdesc, buildstate->heap, snapshot);
|
|
#endif
|
|
|
|
hnswarea = (char *) shm_toc_allocate(pcxt->toc, esthnswarea);
|
|
/* Report less than allocated so never fails */
|
|
InitGraph(&hnswshared->graphData, hnswarea, esthnswarea - 1024 * 1024);
|
|
|
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_SHARED, hnswshared);
|
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_AREA, hnswarea);
|
|
|
|
/* Store query string for workers */
|
|
if (debug_query_string)
|
|
{
|
|
char *sharedquery;
|
|
|
|
sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
|
|
memcpy(sharedquery, debug_query_string, querylen + 1);
|
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
|
|
}
|
|
|
|
/* Launch workers, saving status for leader/caller */
|
|
LaunchParallelWorkers(pcxt);
|
|
hnswleader->pcxt = pcxt;
|
|
hnswleader->nparticipanttuplesorts = pcxt->nworkers_launched;
|
|
if (leaderparticipates)
|
|
hnswleader->nparticipanttuplesorts++;
|
|
hnswleader->hnswshared = hnswshared;
|
|
hnswleader->snapshot = snapshot;
|
|
hnswleader->hnswarea = hnswarea;
|
|
|
|
/* If no workers were successfully launched, back out (do serial build) */
|
|
if (pcxt->nworkers_launched == 0)
|
|
{
|
|
HnswEndParallel(hnswleader);
|
|
return;
|
|
}
|
|
|
|
/* Log participants */
|
|
ereport(DEBUG1, (errmsg("using %d parallel workers", pcxt->nworkers_launched)));
|
|
|
|
/* Save leader state now that it's clear build will be parallel */
|
|
buildstate->hnswleader = hnswleader;
|
|
|
|
/* Join heap scan ourselves */
|
|
if (leaderparticipates)
|
|
HnswLeaderParticipateAsWorker(buildstate);
|
|
|
|
/* Wait for all launched workers */
|
|
WaitForParallelWorkersToAttach(pcxt);
|
|
}
|
|
|
|
/*
|
|
* Compute parallel workers
|
|
*/
|
|
static int
|
|
ComputeParallelWorkers(Relation heap, Relation index)
|
|
{
|
|
int parallel_workers;
|
|
|
|
/* Make sure it's safe to use parallel workers */
|
|
parallel_workers = plan_create_index_workers(RelationGetRelid(heap), RelationGetRelid(index));
|
|
if (parallel_workers == 0)
|
|
return 0;
|
|
|
|
/* Use parallel_workers storage parameter on table if set */
|
|
parallel_workers = RelationGetParallelWorkers(heap, -1);
|
|
if (parallel_workers != -1)
|
|
return Min(parallel_workers, max_parallel_maintenance_workers);
|
|
|
|
return max_parallel_maintenance_workers;
|
|
}
|
|
|
|
/*
|
|
* Build graph
|
|
*/
|
|
static void
|
|
BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum)
|
|
{
|
|
int parallel_workers = 0;
|
|
|
|
UpdateProgress(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_HNSW_PHASE_LOAD);
|
|
|
|
/* Calculate parallel workers */
|
|
if (buildstate->heap != NULL)
|
|
parallel_workers = ComputeParallelWorkers(buildstate->heap, buildstate->index);
|
|
|
|
/* Attempt to launch parallel worker scan when required */
|
|
if (parallel_workers > 0)
|
|
HnswBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers);
|
|
|
|
/* Add tuples to graph */
|
|
if (buildstate->heap != NULL)
|
|
{
|
|
if (buildstate->hnswleader)
|
|
buildstate->reltuples = ParallelHeapScan(buildstate);
|
|
else
|
|
{
|
|
#if PG_VERSION_NUM >= 120000
|
|
buildstate->reltuples = table_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo,
|
|
true, true, BuildCallback, (void *) buildstate, NULL);
|
|
#else
|
|
buildstate->reltuples = IndexBuildHeapScan(buildstate->heap, buildstate->index, buildstate->indexInfo,
|
|
true, BuildCallback, (void *) buildstate, NULL);
|
|
#endif
|
|
}
|
|
|
|
buildstate->indtuples = buildstate->graph->indtuples;
|
|
}
|
|
|
|
/* Flush pages */
|
|
if (!buildstate->graph->flushed)
|
|
FlushPages(buildstate);
|
|
|
|
/* End parallel build */
|
|
if (buildstate->hnswleader)
|
|
HnswEndParallel(buildstate->hnswleader);
|
|
}
|
|
|
|
#if PG_VERSION_NUM < 110008
|
|
void
|
|
log_newpage_range(Relation rel, ForkNumber forkNum, BlockNumber startblk, BlockNumber endblk, bool page_std)
|
|
{
|
|
for (BlockNumber blkno = startblk; blkno < endblk; blkno++)
|
|
{
|
|
Buffer buf = ReadBufferExtended(rel, forkNum, blkno, RBM_NORMAL, NULL);
|
|
|
|
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
|
MarkBufferDirty(buf);
|
|
log_newpage_buffer(buf, page_std);
|
|
UnlockReleaseBuffer(buf);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
/*
|
|
* Build the index
|
|
*/
|
|
static void
|
|
BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
|
HnswBuildState * buildstate, ForkNumber forkNum)
|
|
{
|
|
#ifdef HNSW_MEMORY
|
|
SeedRandom(42);
|
|
#endif
|
|
|
|
InitBuildState(buildstate, heap, index, indexInfo, forkNum);
|
|
|
|
BuildGraph(buildstate, forkNum);
|
|
|
|
if (RelationNeedsWAL(index))
|
|
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true);
|
|
|
|
FreeBuildState(buildstate);
|
|
}
|
|
|
|
/*
|
|
* Build the index for a logged table
|
|
*/
|
|
IndexBuildResult *
|
|
hnswbuild(Relation heap, Relation index, IndexInfo *indexInfo)
|
|
{
|
|
IndexBuildResult *result;
|
|
HnswBuildState buildstate;
|
|
|
|
BuildIndex(heap, index, indexInfo, &buildstate, MAIN_FORKNUM);
|
|
|
|
result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
|
|
result->heap_tuples = buildstate.reltuples;
|
|
result->index_tuples = buildstate.indtuples;
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Build the index for an unlogged table
|
|
*/
|
|
void
|
|
hnswbuildempty(Relation index)
|
|
{
|
|
IndexInfo *indexInfo = BuildIndexInfo(index);
|
|
HnswBuildState buildstate;
|
|
|
|
BuildIndex(NULL, index, indexInfo, &buildstate, INIT_FORKNUM);
|
|
}
|