Added support for in-memory parallel index builds for HNSW

Co-authored-by: Heikki Linnakangas <heikki.linnakangas@iki.fi>
This commit is contained in:
Andrew Kane
2024-01-22 23:19:10 -08:00
parent 4c6928bd3c
commit 2d0f162bd7
9 changed files with 844 additions and 324 deletions

View File

@@ -1,7 +1,7 @@
## 0.5.2 (unreleased)
- Improved performance of HNSW
- Added support for on-disk parallel index builds for HNSW
- Added support for parallel index builds for HNSW
- Reduced memory usage for HNSW index builds
- Reduced WAL generation for HNSW index builds
- Fixed error with logical replication

View File

@@ -14,15 +14,45 @@
#endif
int hnsw_ef_search;
bool hnsw_enable_parallel_build;
int hnsw_lock_tranche_id;
static relopt_kind hnsw_relopt_kind;
/*
* Assign a tranche ID for our LWLocks. This only needs to be done by one
* backend, as the tranche ID is remembered in shared memory.
*
* This shared memory area is very small, so we just allocate it from the
* "slop" that PostgreSQL reserves for small allocations like this. If
* this grows bigger, we should use a shmem_request_hook and
* RequestAddinShmemSpace() to pre-reserve space for this.
*/
static void
HnswInitLockTranche(void)
{
int *tranche_ids;
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
tranche_ids = ShmemInitStruct("hnsw LWLock ids",
sizeof(int) * 1,
&found);
if (!found)
tranche_ids[0] = LWLockNewTrancheId();
hnsw_lock_tranche_id = tranche_ids[0];
LWLockRelease(AddinShmemInitLock);
/* Per-backend registration of the tranche ID */
LWLockRegisterTranche(hnsw_lock_tranche_id, "HnswBuild");
}
/*
* Initialize index options and variables
*/
void
HnswInit(void)
{
HnswInitLockTranche();
hnsw_relopt_kind = add_reloption_kind();
add_int_reloption(hnsw_relopt_kind, "m", "Max number of connections",
HNSW_DEFAULT_M, HNSW_MIN_M, HNSW_MAX_M
@@ -40,11 +70,6 @@ HnswInit(void)
DefineCustomIntVariable("hnsw.ef_search", "Sets the size of the dynamic candidate list for search",
"Valid range is 1..1000.", &hnsw_ef_search,
HNSW_DEFAULT_EF_SEARCH, HNSW_MIN_EF_SEARCH, HNSW_MAX_EF_SEARCH, PGC_USERSET, 0, NULL, NULL, NULL);
/* Behind a variable for now since can be slower than building in memory */
DefineCustomBoolVariable("hnsw.enable_parallel_build", "Enables or disables building indexes in parallel",
NULL, &hnsw_enable_parallel_build,
false, PGC_USERSET, 0, NULL, NULL, NULL);
}
/*

View File

@@ -6,9 +6,9 @@
#include "access/generic_xlog.h"
#include "access/parallel.h"
#include "access/reloptions.h"
#include "lib/ilist.h"
#include "nodes/execnodes.h"
#include "port.h" /* for random() */
#include "utils/relptr.h"
#include "utils/sampling.h"
#include "vector.h"
@@ -96,33 +96,62 @@
/* Ensure fits on page and in uint8 */
#define HnswGetMaxLevel(m) Min(((BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(HnswPageOpaqueData)) - offsetof(HnswNeighborTupleData, indextids) - sizeof(ItemIdData)) / (sizeof(ItemPointerData)) / (m)) - 2, 255)
#define HnswGetNeighbors(element, lc) (AssertMacro((element)->level >= (lc)), &(element)->neighbors[lc])
#define HnswGetValue(base, element) PointerGetDatum(HnswPtrAccess(base, (element)->value))
#if PG_VERSION_NUM < 140005
#define relptr_offset(rp) ((rp).relptr_off - 1)
#endif
/* Pointer macros */
#define HnswPtrAccess(base, hp) ((base) == NULL ? (hp).ptr : relptr_access(base, (hp).relptr))
#define HnswPtrStore(base, hp, value) ((base) == NULL ? (void) ((hp).ptr = (value)) : (void) relptr_store(base, (hp).relptr, value))
#define HnswPtrIsNull(base, hp) ((base) == NULL ? (hp).ptr == NULL : relptr_is_null((hp).relptr))
#define HnswPtrEqual(base, hp1, hp2) ((base) == NULL ? (hp1).ptr == (hp2).ptr : relptr_offset((hp1).relptr) == relptr_offset((hp2).relptr))
/* For code paths dedicated to each type */
#define HnswPtrPointer(hp) (hp).ptr
#define HnswPtrOffset(hp) relptr_offset((hp).relptr)
/* Variables */
extern int hnsw_ef_search;
extern bool hnsw_enable_parallel_build;
extern int hnsw_lock_tranche_id;
typedef struct HnswElementData HnswElementData;
typedef struct HnswNeighborArray HnswNeighborArray;
#define HnswPtrDeclare(type, relptrtype, ptrtype) \
relptr_declare(type, relptrtype); \
typedef union { type *ptr; relptrtype relptr; } ptrtype;
/* Pointers that can be absolute or relative */
/* Use char for DatumPtr so works with Pointer */
HnswPtrDeclare(HnswElementData, HnswElementRelptr, HnswElementPtr);
HnswPtrDeclare(HnswNeighborArray, HnswNeighborArrayRelptr, HnswNeighborArrayPtr);
HnswPtrDeclare(HnswNeighborArrayPtr, HnswNeighborsRelptr, HnswNeighborsPtr);
HnswPtrDeclare(char, DatumRelptr, DatumPtr);
typedef struct HnswElementData
{
slist_node next;
HnswElementPtr next;
ItemPointerData heaptids[HNSW_HEAPTIDS];
uint8 heaptidsLength;
uint8 level;
uint8 deleted;
uint32 hash;
struct HnswNeighborArray *neighbors;
HnswNeighborsPtr neighbors;
BlockNumber blkno;
OffsetNumber offno;
OffsetNumber neighborOffno;
BlockNumber neighborPage;
Datum value;
DatumPtr value;
LWLock lock;
} HnswElementData;
typedef HnswElementData * HnswElement;
typedef struct HnswCandidate
{
HnswElement element;
HnswElementPtr element;
float distance;
bool closer;
} HnswCandidate;
@@ -131,7 +160,7 @@ typedef struct HnswNeighborArray
{
int length;
bool closerSet;
HnswCandidate *items;
HnswCandidate items[FLEXIBLE_ARRAY_MEMBER];
} HnswNeighborArray;
typedef struct HnswPairingHeapNode
@@ -150,12 +179,23 @@ typedef struct HnswOptions
typedef struct HnswGraph
{
slist_head elements;
HnswElement entryPoint;
/* Graph state */
slock_t lock;
HnswElementPtr head;
double indtuples;
/* Entry state */
LWLock entryLock;
HnswElementPtr entryPoint;
/* Allocations state */
LWLock allocatorLock;
long memoryUsed;
long memoryTotal;
/* Flushed state */
LWLock flushLock;
bool flushed;
double indtuples;
} HnswGraph;
typedef struct HnswShared
@@ -192,8 +232,15 @@ typedef struct HnswLeader
int nparticipanttuplesorts;
HnswShared *hnswshared;
Snapshot snapshot;
char *hnswarea;
} HnswLeader;
typedef struct HnswAllocator
{
void *(*alloc) (Size size, void *state);
void *state;
} HnswAllocator;
typedef struct HnswBuildState
{
/* Info */
@@ -226,10 +273,12 @@ typedef struct HnswBuildState
/* Memory */
MemoryContext graphCtx;
MemoryContext tmpCtx;
HnswAllocator allocator;
/* Parallel builds */
HnswLeader *hnswleader;
HnswShared *hnswshared;
char *hnswarea;
} HnswBuildState;
typedef struct HnswMetaPageData
@@ -328,23 +377,24 @@ bool HnswNormValue(FmgrInfo *procinfo, Oid collation, Datum *value, Vector * re
Buffer HnswNewBuffer(Relation index, ForkNumber forkNum);
void HnswInitPage(Buffer buf, Page page);
void HnswInit(void);
List *HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement);
List *HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement);
HnswElement HnswGetEntryPoint(Relation index);
void HnswGetMetaPageInfo(Relation index, int *m, HnswElement * entryPoint);
HnswElement HnswInitElement(ItemPointer tid, int m, double ml, int maxLevel);
void *HnswAlloc(HnswAllocator * allocator, Size size);
HnswElement HnswInitElement(char *base, ItemPointer tid, int m, double ml, int maxLevel, HnswAllocator * alloc);
HnswElement HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno);
void HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing);
HnswCandidate *HnswEntryCandidate(HnswElement em, Datum q, Relation rel, FmgrInfo *procinfo, Oid collation, bool loadVec);
void HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing);
HnswCandidate *HnswEntryCandidate(char *base, HnswElement em, Datum q, Relation rel, FmgrInfo *procinfo, Oid collation, bool loadVec);
void HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum, bool building);
void HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m);
void HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m);
void HnswAddHeapTid(HnswElement element, ItemPointer heaptid);
void HnswInitNeighbors(HnswElement element, int m);
bool HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building);
void HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building);
void HnswInitNeighbors(char *base, HnswElement element, int m, HnswAllocator * alloc);
bool HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building);
void HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building);
void HnswLoadElementFromTuple(HnswElement element, HnswElementTuple etup, bool loadHeaptids, bool loadVec);
void HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec);
void HnswSetElementTuple(HnswElementTuple etup, HnswElement element);
void HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation);
void HnswSetElementTuple(char *base, HnswElementTuple etup, HnswElement element);
void HnswUpdateConnection(char *base, HnswElement element, HnswCandidate * hc, int lm, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation);
void HnswLoadNeighbors(HnswElement element, Relation index, int m);
PGDLLEXPORT void HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc);
@@ -364,6 +414,16 @@ void hnswrescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys,
bool hnswgettuple(IndexScanDesc scan, ScanDirection dir);
void hnswendscan(IndexScanDesc scan);
static inline HnswNeighborArray *
HnswGetNeighbors(char *base, HnswElement element, int lc)
{
HnswNeighborArrayPtr *neighborList = HnswPtrAccess(base, element->neighbors);
Assert(element->level >= lc);
return HnswPtrAccess(base, neighborList[lc]);
}
/* Hash tables */
typedef struct TidHashEntry
{
@@ -391,4 +451,17 @@ typedef struct PointerHashEntry
#define SH_DECLARE
#include "lib/simplehash.h"
typedef struct OffsetHashEntry
{
Size offset;
char status;
} OffsetHashEntry;
#define SH_PREFIX offsethash
#define SH_ELEMENT_TYPE OffsetHashEntry
#define SH_KEY_TYPE Size
#define SH_SCOPE extern
#define SH_DECLARE
#include "lib/simplehash.h"
#endif

View File

@@ -1,3 +1,39 @@
/*
* The HNSW build happens in two phases:
*
* 1. In-memory phase
*
* In this first phase, the graph is held completely in memory. When the graph
* is fully built, or we run out of memory reserved for the build (determined
* by maintenance_work_mem), we materialize the graph to disk (see
* FlushPages()), and switch to the on-disk phase.
*
* In a parallel build, a large contiguous chunk of shared memory is allocated
* to hold the graph. Each worker process has its own HnswBuildState struct in
* private memory, which contains information that doesn't change throughout
* the build, and pointers to the shared structs in shared memory. The shared
* memory area is mapped to a different address in each worker process, and
* 'HnswBuildState.hnswarea' points to the beginning of the shared area in the
* worker process's address space. All pointers used in the graph are
* "relative pointers", stored as an offset from 'hnswarea'.
*
* Each element is protected by an LWLock. It must be held when reading or
* modifying the element's neighbors or 'heaptids'.
*
* In a non-parallel build, the graph is held in backend-private memory. All
* the elements are allocated in a dedicated memory context, 'graphCtx', and
* the pointers used in the graph are regular pointers.
*
* 2. On-disk phase
*
* In the on-disk phase, the index is built by inserting each vector to the
* index one by one, just like on INSERT. The only difference is that we don't
* WAL-log the individual inserts. If the graph fit completely in memory and
* was fully built in the in-memory phase, the on-disk phase is skipped.
*
* After we have finished building the graph, we perform one more scan through
* the index and write all the pages to the WAL.
*/
#include "postgres.h"
#include <math.h>
@@ -54,7 +90,8 @@
#endif
#define PARALLEL_KEY_HNSW_SHARED UINT64CONST(0xA000000000000001)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000002)
#define PARALLEL_KEY_HNSW_AREA UINT64CONST(0xA000000000000002)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000003)
#if PG_VERSION_NUM < 130000
#define GENERATIONCHUNK_RAWSIZE (SIZEOF_SIZE_T + SIZEOF_VOID_P * 2)
@@ -135,9 +172,11 @@ CreateElementPages(HnswBuildState * buildstate)
HnswElementTuple etup;
HnswNeighborTuple ntup;
BlockNumber insertPage;
HnswElement entryPoint;
Buffer buf;
Page page;
slist_iter iter;
HnswElementPtr iter = buildstate->graph->head;
char *base = buildstate->hnswarea;
/* Calculate sizes */
etupAllocSize = BLCKSZ;
@@ -152,18 +191,22 @@ CreateElementPages(HnswBuildState * buildstate)
page = BufferGetPage(buf);
HnswInitPage(buf, page);
slist_foreach(iter, &buildstate->graph->elements)
while (!HnswPtrIsNull(base, iter))
{
HnswElement element = slist_container(HnswElementData, next, iter.cur);
HnswElement element = HnswPtrAccess(base, iter);
Size etupSize;
Size ntupSize;
Size combinedSize;
void *valuePtr = HnswPtrAccess(base, element->value);
/* Update iterator */
iter = element->next;
/* Zero memory for each element */
MemSet(etup, 0, etupAllocSize);
/* Calculate sizes */
etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(DatumGetPointer(element->value)));
etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(valuePtr));
ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(element->level, buildstate->m);
combinedSize = etupSize + ntupSize + sizeof(ItemIdData);
@@ -171,7 +214,7 @@ CreateElementPages(HnswBuildState * buildstate)
if (etupSize > etupAllocSize)
elog(ERROR, "index tuple too large");
HnswSetElementTuple(etup, element);
HnswSetElementTuple(base, etup, element);
/* Keep element and neighbors on the same page if possible */
if (PageGetFreeSpace(page) < etupSize || (combinedSize <= maxSize && PageGetFreeSpace(page) < combinedSize))
@@ -212,7 +255,8 @@ CreateElementPages(HnswBuildState * buildstate)
MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);
HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, buildstate->graph->entryPoint, insertPage, forkNum, true);
entryPoint = HnswPtrAccess(base, buildstate->graph->entryPoint);
HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, entryPoint, insertPage, forkNum, true);
pfree(etup);
pfree(ntup);
@@ -227,19 +271,23 @@ CreateNeighborPages(HnswBuildState * buildstate)
Relation index = buildstate->index;
ForkNumber forkNum = buildstate->forkNum;
int m = buildstate->m;
slist_iter iter;
HnswElementPtr iter = buildstate->graph->head;
char *base = buildstate->hnswarea;
HnswNeighborTuple ntup;
/* Allocate once */
ntup = palloc0(BLCKSZ);
slist_foreach(iter, &buildstate->graph->elements)
while (!HnswPtrIsNull(base, iter))
{
HnswElement e = slist_container(HnswElementData, next, iter.cur);
HnswElement e = HnswPtrAccess(base, iter);
Buffer buf;
Page page;
Size ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(e->level, m);
/* Update iterator */
iter = e->next;
/* Can take a while, so ensure we can interrupt */
/* Needs to be called when no buffer locks are held */
CHECK_FOR_INTERRUPTS();
@@ -248,7 +296,7 @@ CreateNeighborPages(HnswBuildState * buildstate)
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
HnswSetNeighborTuple(ntup, e, m);
HnswSetNeighborTuple(base, ntup, e, m);
if (!PageIndexTupleOverwrite(page, e->neighborOffno, (Item) ntup, ntupSize))
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
@@ -261,24 +309,6 @@ CreateNeighborPages(HnswBuildState * buildstate)
pfree(ntup);
}
#ifdef HNSW_MEMORY
/*
* Show memory usage
*/
static void
ShowMemoryUsage(HnswBuildState * buildstate)
{
#if PG_VERSION_NUM >= 130000
elog(INFO, "graph memory: %zu MB, total memory: %zu MB",
MemoryContextMemAllocated(buildstate->graphCtx, false) / (1024 * 1024),
MemoryContextMemAllocated(CurrentMemoryContext, true) / (1024 * 1024));
#else
MemoryContextStats(CurrentMemoryContext);
elog(INFO, "estimated memory: %zu MB", buildstate->memoryUsed / (1024 * 1024));
#endif
}
#endif
/*
* Flush pages
*/
@@ -286,7 +316,7 @@ static void
FlushPages(HnswBuildState * buildstate)
{
#ifdef HNSW_MEMORY
ShowMemoryUsage(buildstate);
elog(INFO, "memory: %zu MB", buildstate->graph->memoryUsed / (1024 * 1024));
#endif
CreateMetaPage(buildstate);
@@ -297,66 +327,172 @@ FlushPages(HnswBuildState * buildstate)
MemoryContextReset(buildstate->graphCtx);
}
#if PG_VERSION_NUM < 130000
/*
* Get the memory used by an element
* Add a heap TID to an existing element
*/
static long
HnswElementMemory(HnswElement e, int m)
static bool
AddDuplicateInMemory(HnswElement element, HnswElement dup)
{
long elementSize = sizeof(HnswElementData);
LWLockAcquire(&dup->lock, LW_EXCLUSIVE);
elementSize += sizeof(HnswNeighborArray) * (e->level + 1);
elementSize += sizeof(HnswCandidate) * (m * (e->level + 2));
elementSize += VARSIZE_ANY(DatumGetPointer(e->value));
/* Each allocation has a chunk header */
elementSize += (e->level + 4) * GENERATIONCHUNK_RAWSIZE;
/* Add an extra 5% for alignment and other overhead */
return elementSize * 1.05;
if (dup->heaptidsLength == HNSW_HEAPTIDS)
{
LWLockRelease(&dup->lock);
return false;
}
HnswAddHeapTid(dup, &element->heaptids[0]);
LWLockRelease(&dup->lock);
return true;
}
#endif
/*
* Find duplicate element
*/
static bool
HnswFindDuplicateInMemory(HnswElement element)
FindDuplicateInMemory(char *base, HnswElement element)
{
HnswNeighborArray *neighbors = HnswGetNeighbors(element, 0);
HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0);
Datum value = HnswGetValue(base, element);
for (int i = 0; i < neighbors->length; i++)
{
HnswCandidate *neighbor = &neighbors->items[i];
HnswElement neighborElement = HnswPtrAccess(base, neighbor->element);
Datum neighborValue = HnswGetValue(base, neighborElement);
/* Exit early since ordered by distance */
if (!datumIsEqual(element->value, neighbor->element->value, false, -1))
if (!datumIsEqual(value, neighborValue, false, -1))
return false;
/* Check for space */
if (neighbor->element->heaptidsLength < HNSW_HEAPTIDS)
{
HnswAddHeapTid(neighbor->element, &element->heaptids[0]);
if (AddDuplicateInMemory(element, neighborElement))
return true;
}
}
return false;
}
/*
* Insert tuple into in-memory graph
* Add to element list
*/
static bool
InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuildState * buildstate)
static void
AddElementInMemory(char *base, HnswGraph * graph, HnswElement element)
{
SpinLockAcquire(&graph->lock);
element->next = graph->head;
HnswPtrStore(base, graph->head, element);
SpinLockRelease(&graph->lock);
}
/*
* Update neighbors
*/
static void
UpdateNeighborsInMemory(char *base, FmgrInfo *procinfo, Oid collation, HnswElement e, int m)
{
for (int lc = e->level; lc >= 0; lc--)
{
int lm = HnswGetLayerM(m, lc);
HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc);
for (int i = 0; i < neighbors->length; i++)
{
HnswCandidate *hc = &neighbors->items[i];
HnswElement neighborElement = HnswPtrAccess(base, hc->element);
/* Keep scan-build happy on Mac x86-64 */
Assert(neighborElement);
/* Use element for lock instead of hc since hc can be replaced */
LWLockAcquire(&neighborElement->lock, LW_EXCLUSIVE);
HnswUpdateConnection(base, e, hc, lm, lc, NULL, NULL, procinfo, collation);
LWLockRelease(&neighborElement->lock);
}
}
}
/*
* Update graph in memory
*/
static void
UpdateGraphInMemory(FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement entryPoint, HnswBuildState * buildstate)
{
HnswGraph *graph = buildstate->graph;
char *base = buildstate->hnswarea;
/* Look for duplicate */
if (FindDuplicateInMemory(base, element))
return;
/* Add element */
AddElementInMemory(base, graph, element);
/* Update neighbors */
UpdateNeighborsInMemory(base, procinfo, collation, element, m);
/* Update entry point if needed (already have lock) */
if (entryPoint == NULL || element->level > entryPoint->level)
HnswPtrStore(base, graph->entryPoint, element);
}
/*
* Insert tuple in memory
*/
static void
InsertTupleInMemory(HnswBuildState * buildstate, HnswElement element)
{
FmgrInfo *procinfo = buildstate->procinfo;
Oid collation = buildstate->collation;
HnswGraph *graph = buildstate->graph;
HnswElement entryPoint = graph->entryPoint;
HnswElement entryPoint;
LWLock *entryLock = &graph->entryLock;
int efConstruction = buildstate->efConstruction;
int m = buildstate->m;
MemoryContext oldCtx;
char *base = buildstate->hnswarea;
/* Get entry point */
LWLockAcquire(entryLock, LW_SHARED);
entryPoint = HnswPtrAccess(base, graph->entryPoint);
/* Prevent concurrent inserts when likely updating entry point */
if (entryPoint == NULL || element->level > entryPoint->level)
{
/* Release shared lock */
LWLockRelease(entryLock);
/* Get exclusive lock */
LWLockAcquire(entryLock, LW_EXCLUSIVE);
/* Get latest entry point after lock is acquired */
entryPoint = HnswPtrAccess(base, graph->entryPoint);
}
/* Find neighbors for element */
HnswFindElementNeighbors(base, element, entryPoint, NULL, procinfo, collation, m, efConstruction, false);
/* Update graph in memory */
UpdateGraphInMemory(procinfo, collation, element, m, efConstruction, entryPoint, buildstate);
/* Release entry lock */
LWLockRelease(entryLock);
}
/*
* Insert tuple
*/
static bool
InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, HnswBuildState * buildstate)
{
HnswGraph *graph = buildstate->graph;
HnswElement element;
HnswAllocator *allocator = &buildstate->allocator;
Size valueSize;
Pointer valuePtr;
LWLock *flushLock = &graph->flushLock;
char *base = buildstate->hnswarea;
/* Detoast once for all calls */
Datum value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
@@ -364,73 +500,83 @@ InsertTupleInMemory(Relation index, Datum *values, ItemPointer heaptid, HnswBuil
/* Normalize if needed */
if (buildstate->normprocinfo != NULL)
{
if (!HnswNormValue(buildstate->normprocinfo, collation, &value, buildstate->normvec))
if (!HnswNormValue(buildstate->normprocinfo, buildstate->collation, &value, buildstate->normvec))
return false;
}
/* Allocate element in graph memory context */
oldCtx = MemoryContextSwitchTo(buildstate->graphCtx);
element = HnswInitElement(heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel);
element->value = datumCopy(value, false, -1);
MemoryContextSwitchTo(oldCtx);
/* Get datum size */
valueSize = VARSIZE_ANY(DatumGetPointer(value));
/* Update memory usage */
#if PG_VERSION_NUM >= 130000
graph->memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false);
#else
graph->memoryUsed += HnswElementMemory(element, buildstate->m);
#endif
/* Ensure graph not flushed when inserting */
LWLockAcquire(flushLock, LW_SHARED);
/* Insert element in graph */
HnswInsertElement(element, entryPoint, NULL, procinfo, collation, m, efConstruction, false);
/* Look for duplicate */
if (HnswFindDuplicateInMemory(element))
/* Are we in the on-disk phase? */
if (graph->flushed)
{
/* No need to free element since memory unlikely to be reallocated */
return true;
LWLockRelease(flushLock);
return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, buildstate->heap, true);
}
/* Add element */
slist_push_head(&graph->elements, &element->next);
/*
* In a parallel build, the HnswElement is allocated from the shared
* memory area, so we need to coordinate with other processes.
*/
LWLockAcquire(&graph->allocatorLock, LW_EXCLUSIVE);
/* Update neighbors */
for (int lc = element->level; lc >= 0; lc--)
/*
* Check that we have enough memory available for the new element now that
* we have the allocator lock, and flush pages if needed.
*/
if (graph->memoryUsed >= graph->memoryTotal)
{
int lm = HnswGetLayerM(m, lc);
HnswNeighborArray *neighbors = HnswGetNeighbors(element, lc);
LWLockRelease(&graph->allocatorLock);
for (int i = 0; i < neighbors->length; i++)
HnswUpdateConnection(element, &neighbors->items[i], lm, lc, NULL, NULL, procinfo, collation);
LWLockRelease(flushLock);
LWLockAcquire(flushLock, LW_EXCLUSIVE);
if (!graph->flushed)
{
ereport(NOTICE,
(errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) graph->indtuples),
errdetail("Building will take significantly more time."),
errhint("Increase maintenance_work_mem to speed up builds.")));
FlushPages(buildstate);
}
LWLockRelease(flushLock);
return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, buildstate->heap, true);
}
/* Update entry point if needed */
if (entryPoint == NULL || element->level > entryPoint->level)
graph->entryPoint = element;
/* Ok, we can proceed to allocate the element */
element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator);
valuePtr = HnswAlloc(allocator, valueSize);
/*
* We have now allocated the space needed for the element, so we don't
* need the allocator lock anymore. Release it and initialize the rest of
* the element.
*/
LWLockRelease(&graph->allocatorLock);
/* Copy the datum */
memcpy(valuePtr, DatumGetPointer(value), valueSize);
HnswPtrStore(base, element->value, valuePtr);
/* Create a lock for the element */
LWLockInitialize(&element->lock, hnsw_lock_tranche_id);
/* Insert tuple */
InsertTupleInMemory(buildstate, element);
/* Release flush lock */
LWLockRelease(flushLock);
return true;
}
/*
* Acquire a lock if needed
*/
static inline void
HnswLockAcquire(HnswShared * hnswshared)
{
if (hnswshared)
SpinLockAcquire(&hnswshared->mutex);
}
/*
* Release a lock if needed
*/
static inline void
HnswLockRelease(HnswShared * hnswshared)
{
if (hnswshared)
SpinLockRelease(&hnswshared->mutex);
}
/*
* Callback for table_index_build_scan
*/
@@ -440,9 +586,7 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
{
HnswBuildState *buildstate = (HnswBuildState *) state;
HnswGraph *graph = buildstate->graph;
HnswShared *hnswshared = buildstate->hnswshared;
MemoryContext oldCtx;
bool inserted;
#if PG_VERSION_NUM < 130000
ItemPointer tid = &hup->t_self;
@@ -452,31 +596,16 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
if (isnull[0])
return;
/* Flush pages if needed */
if (!graph->flushed && graph->memoryUsed >= graph->memoryTotal)
{
ereport(NOTICE,
(errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) graph->indtuples),
errdetail("Building will take significantly more time."),
errhint("Increase maintenance_work_mem to speed up builds.")));
FlushPages(buildstate);
}
/* Use memory context */
oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
/* Insert tuple */
if (graph->flushed)
inserted = HnswInsertTuple(index, values, isnull, tid, buildstate->heap, true);
else
inserted = InsertTupleInMemory(index, values, tid, buildstate);
/* Update progress */
if (inserted)
if (InsertTuple(index, values, isnull, tid, buildstate))
{
HnswLockAcquire(hnswshared);
/* Update progress */
SpinLockAcquire(&graph->lock);
UpdateProgress(PROGRESS_CREATEIDX_TUPLES_DONE, ++graph->indtuples);
HnswLockRelease(hnswshared);
SpinLockRelease(&graph->lock);
}
/* Reset memory context */
@@ -488,14 +617,59 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
* Initialize the graph
*/
static void
InitGraph(HnswGraph * graph)
InitGraph(HnswGraph * graph, char *base, long memoryTotal)
{
slist_init(&graph->elements);
graph->entryPoint = NULL;
HnswPtrStore(base, graph->head, (HnswElement) NULL);
HnswPtrStore(base, graph->entryPoint, (HnswElement) NULL);
graph->memoryUsed = 0;
graph->memoryTotal = maintenance_work_mem * 1024L;
graph->memoryTotal = memoryTotal;
graph->flushed = false;
graph->indtuples = 0;
SpinLockInit(&graph->lock);
LWLockInitialize(&graph->entryLock, hnsw_lock_tranche_id);
LWLockInitialize(&graph->allocatorLock, hnsw_lock_tranche_id);
LWLockInitialize(&graph->flushLock, hnsw_lock_tranche_id);
}
/*
* Initialize an allocator
*/
static void
InitAllocator(HnswAllocator * allocator, void *(*alloc) (Size size, void *state), void *state)
{
allocator->alloc = alloc;
allocator->state = state;
}
/*
* Memory context allocator
*/
static void *
HnswMemoryContextAlloc(Size size, void *state)
{
HnswBuildState *buildstate = (HnswBuildState *) state;
void *chunk = MemoryContextAlloc(buildstate->graphCtx, size);
#if PG_VERSION_NUM >= 130000
buildstate->graphData.memoryUsed = MemoryContextMemAllocated(buildstate->graphCtx, false);
#else
buildstate->graphData.memoryUsed += MAXALIGN(size);
#endif
return chunk;
}
/*
* Shared memory allocator
*/
static void *
HnswSharedMemoryAlloc(Size size, void *state)
{
HnswBuildState *buildstate = (HnswBuildState *) state;
void *chunk = buildstate->hnswarea + buildstate->graph->memoryUsed;
buildstate->graph->memoryUsed += MAXALIGN(size);
return chunk;
}
/*
@@ -531,7 +705,7 @@ InitBuildState(HnswBuildState * buildstate, Relation heap, Relation index, Index
buildstate->normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC);
buildstate->collation = index->rd_indcollation[0];
InitGraph(&buildstate->graphData);
InitGraph(&buildstate->graphData, NULL, maintenance_work_mem * 1024L);
buildstate->graph = &buildstate->graphData;
buildstate->ml = HnswGetMl(buildstate->m);
buildstate->maxLevel = HnswGetMaxLevel(buildstate->m);
@@ -549,8 +723,11 @@ InitBuildState(HnswBuildState * buildstate, Relation heap, Relation index, Index
"Hnsw build temporary context",
ALLOCSET_DEFAULT_SIZES);
InitAllocator(&buildstate->allocator, &HnswMemoryContextAlloc, buildstate);
buildstate->hnswleader = NULL;
buildstate->hnswshared = NULL;
buildstate->hnswarea = NULL;
}
/*
@@ -581,6 +758,7 @@ ParallelHeapScan(HnswBuildState * buildstate)
if (hnswshared->nparticipantsdone == nparticipanttuplesorts)
{
buildstate->graph = &hnswshared->graphData;
buildstate->hnswarea = buildstate->hnswleader->hnswarea;
reltuples = hnswshared->reltuples;
SpinLockRelease(&hnswshared->mutex);
break;
@@ -600,7 +778,7 @@ ParallelHeapScan(HnswBuildState * buildstate)
* Perform a worker's portion of a parallel insert
*/
static void
HnswParallelScanAndInsert(Relation heapRel, Relation indexRel, HnswShared * hnswshared, bool progress)
HnswParallelScanAndInsert(Relation heapRel, Relation indexRel, HnswShared * hnswshared, char *hnswarea, bool progress)
{
HnswBuildState buildstate;
#if PG_VERSION_NUM >= 120000
@@ -616,7 +794,8 @@ HnswParallelScanAndInsert(Relation heapRel, Relation indexRel, HnswShared * hnsw
indexInfo->ii_Concurrent = hnswshared->isconcurrent;
InitBuildState(&buildstate, heapRel, indexRel, indexInfo, MAIN_FORKNUM);
buildstate.graph = &hnswshared->graphData;
buildstate.hnswshared = hnswshared;
buildstate.hnswarea = hnswarea;
InitAllocator(&buildstate.allocator, &HnswSharedMemoryAlloc, &buildstate);
#if PG_VERSION_NUM >= 120000
scan = table_beginscan_parallel(heapRel,
ParallelTableScanFromHnswShared(hnswshared));
@@ -656,6 +835,7 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
{
char *sharedquery;
HnswShared *hnswshared;
char *hnswarea;
Relation heapRel;
Relation indexRel;
LOCKMODE heapLockmode;
@@ -691,8 +871,10 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
#endif
indexRel = index_open(hnswshared->indexrelid, indexLockmode);
hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
/* Perform inserts */
HnswParallelScanAndInsert(heapRel, indexRel, hnswshared, false);
HnswParallelScanAndInsert(heapRel, indexRel, hnswshared, hnswarea, false);
/* Close relations within worker */
index_close(indexRel, indexLockmode);
@@ -749,7 +931,7 @@ HnswLeaderParticipateAsWorker(HnswBuildState * buildstate)
HnswLeader *hnswleader = buildstate->hnswleader;
/* Perform work common to all participants */
HnswParallelScanAndInsert(buildstate->heap, buildstate->index, hnswleader->hnswshared, true);
HnswParallelScanAndInsert(buildstate->heap, buildstate->index, hnswleader->hnswshared, hnswleader->hnswarea, true);
}
/*
@@ -761,7 +943,10 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request)
ParallelContext *pcxt;
Snapshot snapshot;
Size esthnswshared;
Size esthnswarea;
Size estother;
HnswShared *hnswshared;
char *hnswarea;
HnswLeader *hnswleader = (HnswLeader *) palloc0(sizeof(HnswLeader));
bool leaderparticipates = true;
int querylen;
@@ -788,7 +973,17 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request)
/* Estimate size of workspaces */
esthnswshared = ParallelEstimateShared(buildstate->heap, snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, esthnswshared);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Leave space for other objects in shared memory */
/* Docker has a default limit of 64 MB for shm_size */
/* which happens to be the default value of maintenance_work_mem */
esthnswarea = maintenance_work_mem * 1024L;
estother = 2 * 1024 * 1024;
if (esthnswarea > estother)
esthnswarea -= estother;
shm_toc_estimate_chunk(&pcxt->estimator, esthnswarea);
shm_toc_estimate_keys(&pcxt->estimator, 2);
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
if (debug_query_string)
@@ -824,10 +1019,6 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request)
/* Initialize mutable state */
hnswshared->nparticipantsdone = 0;
hnswshared->reltuples = 0;
InitGraph(&hnswshared->graphData);
/* TODO Support in-memory builds */
hnswshared->graphData.memoryTotal = 0;
hnswshared->graphData.flushed = true;
#if PG_VERSION_NUM >= 120000
table_parallelscan_initialize(buildstate->heap,
ParallelTableScanFromHnswShared(hnswshared),
@@ -836,7 +1027,12 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request)
heap_parallelscan_initialize(&hnswshared->heapdesc, buildstate->heap, snapshot);
#endif
hnswarea = (char *) shm_toc_allocate(pcxt->toc, esthnswarea);
/* Report less than allocated so never fails */
InitGraph(&hnswshared->graphData, hnswarea, esthnswarea - 1024 * 1024);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_SHARED, hnswshared);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_AREA, hnswarea);
/* Store query string for workers */
if (debug_query_string)
@@ -856,6 +1052,7 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request)
hnswleader->nparticipanttuplesorts++;
hnswleader->hnswshared = hnswshared;
hnswleader->snapshot = snapshot;
hnswleader->hnswarea = hnswarea;
/* If no workers were successfully launched, back out (do serial build) */
if (pcxt->nworkers_launched == 0)
@@ -910,16 +1107,12 @@ BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum)
UpdateProgress(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_HNSW_PHASE_LOAD);
/* Calculate parallel workers */
if (buildstate->heap != NULL && hnsw_enable_parallel_build)
if (buildstate->heap != NULL)
parallel_workers = ComputeParallelWorkers(buildstate->heap, buildstate->index);
/* Attempt to launch parallel worker scan when required */
if (parallel_workers > 0)
{
/* TODO Support in-memory builds */
FlushPages(buildstate);
HnswBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers);
}
/* Add tuples to graph */
if (buildstate->heap != NULL)

View File

@@ -116,7 +116,7 @@ HnswInsertAppendPage(Relation index, Buffer *nbuf, Page *npage, GenericXLogState
* Add to element and neighbor pages
*/
static void
WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPage, BlockNumber *updatedInsertPage, bool building)
AddElementOnDisk(Relation index, HnswElement e, int m, BlockNumber insertPage, BlockNumber *updatedInsertPage, bool building)
{
Buffer buf;
Page page;
@@ -134,9 +134,10 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag
OffsetNumber freeOffno = InvalidOffsetNumber;
OffsetNumber freeNeighborOffno = InvalidOffsetNumber;
BlockNumber newInsertPage = InvalidBlockNumber;
char *base = NULL;
/* Calculate sizes */
etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(DatumGetPointer(e->value)));
etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(HnswPtrAccess(base, e->value)));
ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(e->level, m);
combinedSize = etupSize + ntupSize + sizeof(ItemIdData);
maxSize = HNSW_MAX_SIZE;
@@ -144,11 +145,11 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag
/* Prepare element tuple */
etup = palloc0(etupSize);
HnswSetElementTuple(etup, e);
HnswSetElementTuple(base, etup, e);
/* Prepare neighbor tuple */
ntup = palloc0(ntupSize);
HnswSetNeighborTuple(ntup, e, m);
HnswSetNeighborTuple(base, ntup, e, m);
/* Find a page (or two if needed) to insert the tuples */
for (;;)
@@ -338,12 +339,14 @@ ConnectionExists(HnswElement e, HnswNeighborTuple ntup, int startIdx, int lm)
* Update neighbors
*/
void
HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building)
HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building)
{
char *base = NULL;
for (int lc = e->level; lc >= 0; lc--)
{
int lm = HnswGetLayerM(m, lc);
HnswNeighborArray *neighbors = HnswGetNeighbors(e, lc);
HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc);
for (int i = 0; i < neighbors->length; i++)
{
@@ -356,11 +359,12 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE
Size ntupSize;
int idx = -1;
int startIdx;
OffsetNumber offno = hc->element->neighborOffno;
HnswElement neighborElement = HnswPtrAccess(base, hc->element);
OffsetNumber offno = neighborElement->neighborOffno;
/* Get latest neighbors since they may have changed */
/* Do not lock yet since selecting neighbors can take time */
HnswLoadNeighbors(hc->element, index, m);
HnswLoadNeighbors(neighborElement, index, m);
/*
* Could improve performance for vacuuming by checking neighbors
@@ -370,14 +374,14 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE
*/
/* Select neighbors */
HnswUpdateConnection(e, hc, lm, lc, &idx, index, procinfo, collation);
HnswUpdateConnection(NULL, e, hc, lm, lc, &idx, index, procinfo, collation);
/* New element was not selected as a neighbor */
if (idx == -1)
continue;
/* Register page */
buf = ReadBuffer(index, hc->element->neighborPage);
buf = ReadBuffer(index, neighborElement->neighborPage);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
if (building)
{
@@ -396,7 +400,7 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE
ntupSize = ItemIdGetLength(itemid);
/* Calculate index for update */
startIdx = (hc->element->level - lc) * m;
startIdx = (neighborElement->level - lc) * m;
/* Check for existing connection */
if (checkExisting && ConnectionExists(e, ntup, startIdx, lm))
@@ -447,7 +451,7 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE
* Add a heap TID to an existing element
*/
static bool
HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup, bool building)
AddDuplicateOnDisk(Relation index, HnswElement element, HnswElement dup, bool building)
{
Buffer buf;
Page page;
@@ -511,19 +515,23 @@ HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup, bool buil
* Find duplicate element
*/
static bool
HnswFindDuplicate(Relation index, HnswElement element, bool building)
FindDuplicateOnDisk(Relation index, HnswElement element, bool building)
{
HnswNeighborArray *neighbors = HnswGetNeighbors(element, 0);
char *base = NULL;
HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0);
Datum value = HnswGetValue(base, element);
for (int i = 0; i < neighbors->length; i++)
{
HnswCandidate *neighbor = &neighbors->items[i];
HnswElement neighborElement = HnswPtrAccess(base, neighbor->element);
Datum neighborValue = HnswGetValue(base, neighborElement);
/* Exit early since ordered by distance */
if (!datumIsEqual(element->value, neighbor->element->value, false, -1))
if (!datumIsEqual(value, neighborValue, false, -1))
return false;
if (HnswAddDuplicate(index, element, neighbor->element, building))
if (AddDuplicateOnDisk(index, element, neighborElement, building))
return true;
}
@@ -531,26 +539,26 @@ HnswFindDuplicate(Relation index, HnswElement element, bool building)
}
/*
* Write changes to disk
* Update graph on disk
*/
static void
WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement entryPoint, bool building)
UpdateGraphOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement entryPoint, bool building)
{
BlockNumber newInsertPage = InvalidBlockNumber;
/* Look for duplicate */
if (HnswFindDuplicate(index, element, building))
if (FindDuplicateOnDisk(index, element, building))
return;
/* Write element and neighbor tuples */
WriteNewElementPages(index, element, m, GetInsertPage(index), &newInsertPage, building);
/* Add element */
AddElementOnDisk(index, element, m, GetInsertPage(index), &newInsertPage, building);
/* Update insert page if needed */
if (BlockNumberIsValid(newInsertPage))
HnswUpdateMetaPage(index, 0, NULL, newInsertPage, MAIN_FORKNUM, building);
/* Update neighbors */
HnswUpdateNeighborPages(index, procinfo, collation, element, m, false, building);
HnswUpdateNeighborsOnDisk(index, procinfo, collation, element, m, false, building);
/* Update entry point if needed */
if (entryPoint == NULL || element->level > entryPoint->level)
@@ -561,10 +569,8 @@ WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement elem
* Insert a tuple into the index
*/
bool
HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building)
HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building)
{
Datum value;
FmgrInfo *normprocinfo;
HnswElement entryPoint;
HnswElement element;
int m;
@@ -572,17 +578,7 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti
FmgrInfo *procinfo = index_getprocinfo(index, 1, HNSW_DISTANCE_PROC);
Oid collation = index->rd_indcollation[0];
LOCKMODE lockmode = ShareLock;
/* Detoast once for all calls */
value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
/* Normalize if needed */
normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC);
if (normprocinfo != NULL)
{
if (!HnswNormValue(normprocinfo, collation, &value, NULL))
return false;
}
char *base = NULL;
/*
* Get a shared lock. This allows vacuum to ensure no in-flight inserts
@@ -595,8 +591,8 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti
HnswGetMetaPageInfo(index, &m, &entryPoint);
/* Create an element */
element = HnswInitElement(heap_tid, m, HnswGetMl(m), HnswGetMaxLevel(m));
element->value = value;
element = HnswInitElement(base, heap_tid, m, HnswGetMl(m), HnswGetMaxLevel(m), NULL);
HnswPtrStore(base, element->value, DatumGetPointer(value));
/* Prevent concurrent inserts when likely updating entry point */
if (entryPoint == NULL || element->level > entryPoint->level)
@@ -612,11 +608,11 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti
entryPoint = HnswGetEntryPoint(index);
}
/* Insert element in graph */
HnswInsertElement(element, entryPoint, index, procinfo, collation, m, efConstruction, false);
/* Find neighbors for element */
HnswFindElementNeighbors(base, element, entryPoint, index, procinfo, collation, m, efConstruction, false);
/* Write to disk */
WriteElement(index, procinfo, collation, element, m, efConstruction, entryPoint, building);
/* Update graph on disk */
UpdateGraphOnDisk(index, procinfo, collation, element, m, efConstruction, entryPoint, building);
/* Release lock */
UnlockPage(index, HNSW_UPDATE_LOCK, lockmode);
@@ -624,6 +620,30 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti
return true;
}
/*
* Insert a tuple into the index
*/
static void
HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel)
{
Datum value;
FmgrInfo *normprocinfo;
Oid collation = index->rd_indcollation[0];
/* Detoast once for all calls */
value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
/* Normalize if needed */
normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC);
if (normprocinfo != NULL)
{
if (!HnswNormValue(normprocinfo, collation, &value, NULL))
return;
}
HnswInsertTupleOnDisk(index, value, values, isnull, heap_tid, heapRel, false);
}
/*
* Insert a tuple into the index
*/
@@ -650,7 +670,7 @@ hnswinsert(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid,
oldCtx = MemoryContextSwitchTo(insertCtx);
/* Insert tuple */
HnswInsertTuple(index, values, isnull, heap_tid, heap, false);
HnswInsertTuple(index, values, isnull, heap_tid, heap);
/* Delete memory context */
MemoryContextSwitchTo(oldCtx);

View File

@@ -21,6 +21,7 @@ GetScanItems(IndexScanDesc scan, Datum q)
List *w;
int m;
HnswElement entryPoint;
char *base = NULL;
/* Get m and entry point */
HnswGetMetaPageInfo(index, &m, &entryPoint);
@@ -28,15 +29,15 @@ GetScanItems(IndexScanDesc scan, Datum q)
if (entryPoint == NULL)
return NIL;
ep = list_make1(HnswEntryCandidate(entryPoint, q, index, procinfo, collation, false));
ep = list_make1(HnswEntryCandidate(base, entryPoint, q, index, procinfo, collation, false));
for (int lc = entryPoint->level; lc >= 1; lc--)
{
w = HnswSearchLayer(q, ep, 1, lc, index, procinfo, collation, m, false, NULL);
w = HnswSearchLayer(base, q, ep, 1, lc, index, procinfo, collation, m, false, NULL);
ep = w;
}
return HnswSearchLayer(q, ep, hnsw_ef_search, 0, index, procinfo, collation, m, false, NULL);
return HnswSearchLayer(base, q, ep, hnsw_ef_search, 0, index, procinfo, collation, m, false, NULL);
}
/*
@@ -184,17 +185,19 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir)
while (list_length(so->w) > 0)
{
char *base = NULL;
HnswCandidate *hc = llast(so->w);
HnswElement element = HnswPtrAccess(base, hc->element);
ItemPointer heaptid;
/* Move to next element if no valid heap TIDs */
if (hc->element->heaptidsLength == 0)
if (element->heaptidsLength == 0)
{
so->w = list_delete_last(so->w);
continue;
}
heaptid = &hc->element->heaptids[--hc->element->heaptidsLength];
heaptid = &element->heaptids[--element->heaptidsLength];
MemoryContextSwitchTo(oldCtx);

View File

@@ -84,9 +84,40 @@ hash_pointer(uintptr_t ptr)
#define SH_DEFINE
#include "lib/simplehash.h"
/* Needed to include simplehash.h again */
#if PG_VERSION_NUM < 120000
#undef SH_EQUAL
#undef sh_log2
#undef sh_pow2
#define sh_log2 offsethash_sh_log2
#define sh_pow2 offsethash_sh_pow2
#endif
/* Offset hash table */
static uint32
hash_offset(Size offset)
{
#if SIZEOF_SIZE_T == 8
return murmurhash64((uint64) offset);
#else
return murmurhash32((uint32) offset);
#endif
}
#define SH_PREFIX offsethash
#define SH_ELEMENT_TYPE OffsetHashEntry
#define SH_KEY_TYPE Size
#define SH_KEY offset
#define SH_HASH_KEY(tb, key) hash_offset(key)
#define SH_EQUAL(tb, a, b) (a == b)
#define SH_SCOPE extern
#define SH_DEFINE
#include "lib/simplehash.h"
typedef union
{
pointerhash_hash *pointers;
offsethash_hash *offsets;
tidhash_hash *tids;
} visited_hash;
@@ -188,31 +219,46 @@ HnswInitPage(Buffer buf, Page page)
* Allocate neighbors
*/
void
HnswInitNeighbors(HnswElement element, int m)
HnswInitNeighbors(char *base, HnswElement element, int m, HnswAllocator * allocator)
{
int level = element->level;
element->neighbors = palloc(sizeof(HnswNeighborArray) * (level + 1));
HnswNeighborArrayPtr *neighborList = (HnswNeighborArrayPtr *) HnswAlloc(allocator, sizeof(HnswNeighborArrayPtr) * (level + 1));
HnswPtrStore(base, element->neighbors, neighborList);
for (int lc = 0; lc <= level; lc++)
{
HnswNeighborArray *a;
int lm = HnswGetLayerM(m, lc);
a = &element->neighbors[lc];
HnswPtrStore(base, neighborList[lc], (HnswNeighborArray *) HnswAlloc(allocator, offsetof(HnswNeighborArray, items) + sizeof(HnswCandidate) * lm));
a = HnswGetNeighbors(base, element, lc);
a->length = 0;
a->items = palloc(sizeof(HnswCandidate) * lm);
a->closerSet = false;
}
}
/*
* Allocate memory from the allocator
*/
void *
HnswAlloc(HnswAllocator * allocator, Size size)
{
if (allocator)
return (*(allocator)->alloc) (size, (allocator)->state);
return palloc(size);
}
/*
* Allocate an element
*/
HnswElement
HnswInitElement(ItemPointer heaptid, int m, double ml, int maxLevel)
HnswInitElement(char *base, ItemPointer heaptid, int m, double ml, int maxLevel, HnswAllocator * allocator)
{
HnswElement element = palloc(sizeof(HnswElementData));
HnswElement element = HnswAlloc(allocator, sizeof(HnswElementData));
int level = (int) (-log(RandomDouble()) * ml);
@@ -226,9 +272,9 @@ HnswInitElement(ItemPointer heaptid, int m, double ml, int maxLevel)
element->level = level;
element->deleted = 0;
HnswInitNeighbors(element, m);
HnswInitNeighbors(base, element, m, allocator);
element->value = PointerGetDatum(NULL);
HnswPtrStore(base, element->value, (Pointer) NULL);
return element;
}
@@ -249,11 +295,12 @@ HnswElement
HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno)
{
HnswElement element = palloc(sizeof(HnswElementData));
char *base = NULL;
element->blkno = blkno;
element->offno = offno;
element->neighbors = NULL;
element->value = PointerGetDatum(NULL);
HnswPtrStore(base, element->neighbors, (HnswNeighborArrayPtr *) NULL);
HnswPtrStore(base, element->value, (Pointer) NULL);
return element;
}
@@ -363,8 +410,10 @@ HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, Bloc
* Set element tuple, except for neighbor info
*/
void
HnswSetElementTuple(HnswElementTuple etup, HnswElement element)
HnswSetElementTuple(char *base, HnswElementTuple etup, HnswElement element)
{
Pointer valuePtr = HnswPtrAccess(base, element->value);
etup->type = HNSW_ELEMENT_TUPLE_TYPE;
etup->level = element->level;
etup->deleted = 0;
@@ -375,14 +424,14 @@ HnswSetElementTuple(HnswElementTuple etup, HnswElement element)
else
ItemPointerSetInvalid(&etup->heaptids[i]);
}
memcpy(&etup->data, DatumGetPointer(element->value), VARSIZE_ANY(DatumGetPointer(element->value)));
memcpy(&etup->data, valuePtr, VARSIZE_ANY(valuePtr));
}
/*
* Set neighbor tuple
*/
void
HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m)
HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m)
{
int idx = 0;
@@ -390,7 +439,7 @@ HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m)
for (int lc = e->level; lc >= 0; lc--)
{
HnswNeighborArray *neighbors = HnswGetNeighbors(e, lc);
HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc);
int lm = HnswGetLayerM(m, lc);
for (int i = 0; i < lm; i++)
@@ -400,8 +449,9 @@ HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m)
if (i < neighbors->length)
{
HnswCandidate *hc = &neighbors->items[i];
HnswElement hce = HnswPtrAccess(base, hc->element);
ItemPointerSet(indextid, hc->element->blkno, hc->element->offno);
ItemPointerSet(indextid, hce->blkno, hce->offno);
}
else
ItemPointerSetInvalid(indextid);
@@ -417,12 +467,14 @@ HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m)
static void
LoadNeighborsFromPage(HnswElement element, Relation index, Page page, int m)
{
char *base = NULL;
HnswNeighborTuple ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, element->neighborOffno));
int neighborCount = (element->level + 2) * m;
Assert(HnswIsNeighborTuple(ntup));
HnswInitNeighbors(element, m);
HnswInitNeighbors(base, element, m, NULL);
/* Ensure expected neighbors */
if (ntup->count != neighborCount)
@@ -448,9 +500,9 @@ LoadNeighborsFromPage(HnswElement element, Relation index, Page page, int m)
if (level < 0)
level = 0;
neighbors = HnswGetNeighbors(element, level);
neighbors = HnswGetNeighbors(base, element, level);
hc = &neighbors->items[neighbors->length++];
hc->element = e;
HnswPtrStore(base, hc->element, e);
}
}
@@ -497,7 +549,12 @@ HnswLoadElementFromTuple(HnswElement element, HnswElementTuple etup, bool loadHe
}
if (loadVec)
element->value = datumCopy(PointerGetDatum(&etup->data), false, -1);
{
char *base = NULL;
Datum value = datumCopy(PointerGetDatum(&etup->data), false, -1);
HnswPtrStore(base, element->value, DatumGetPointer(value));
}
}
/*
@@ -533,24 +590,27 @@ HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index,
* Get the distance for a candidate
*/
static float
GetCandidateDistance(HnswCandidate * hc, Datum q, FmgrInfo *procinfo, Oid collation)
GetCandidateDistance(char *base, HnswCandidate * hc, Datum q, FmgrInfo *procinfo, Oid collation)
{
return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, q, hc->element->value));
HnswElement hce = HnswPtrAccess(base, hc->element);
Datum value = HnswGetValue(base, hce);
return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, q, value));
}
/*
* Create a candidate for the entry point
*/
HnswCandidate *
HnswEntryCandidate(HnswElement entryPoint, Datum q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec)
HnswEntryCandidate(char *base, HnswElement entryPoint, Datum q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec)
{
HnswCandidate *hc = palloc(sizeof(HnswCandidate));
hc->element = entryPoint;
HnswPtrStore(base, hc->element, entryPoint);
if (index == NULL)
hc->distance = GetCandidateDistance(hc, q, procinfo, collation);
hc->distance = GetCandidateDistance(base, hc, q, procinfo, collation);
else
HnswLoadElement(hc->element, &hc->distance, &q, index, procinfo, collation, loadVec);
HnswLoadElement(entryPoint, &hc->distance, &q, index, procinfo, collation, loadVec);
return hc;
}
@@ -596,34 +656,79 @@ CreatePairingHeapNode(HnswCandidate * c)
return node;
}
/*
* Init visited
*/
static inline void
InitVisited(char *base, visited_hash * v, Relation index, int ef, int m)
{
if (index != NULL)
v->tids = tidhash_create(CurrentMemoryContext, ef * m * 2, NULL);
else if (base != NULL)
v->offsets = offsethash_create(CurrentMemoryContext, ef * m * 2, NULL);
else
v->pointers = pointerhash_create(CurrentMemoryContext, ef * m * 2, NULL);
}
/*
* Add to visited
*/
static inline void
AddToVisited(visited_hash * v, HnswCandidate * hc, Relation index, bool *found)
AddToVisited(char *base, visited_hash * v, HnswCandidate * hc, Relation index, bool *found)
{
if (index == NULL)
if (index != NULL)
{
HnswElement element = HnswPtrAccess(base, hc->element);
ItemPointerData indextid;
ItemPointerSet(&indextid, element->blkno, element->offno);
tidhash_insert(v->tids, indextid, found);
}
else if (base != NULL)
{
#if PG_VERSION_NUM >= 130000
pointerhash_insert_hash(v->pointers, (uintptr_t) hc->element, hc->element->hash, found);
HnswElement element = HnswPtrAccess(base, hc->element);
offsethash_insert_hash(v->offsets, HnswPtrOffset(hc->element), element->hash, found);
#else
pointerhash_insert(v->pointers, (uintptr_t) hc->element, found);
offsethash_insert(v->offsets, HnswPtrOffset(hc->element), found);
#endif
}
else
{
ItemPointerData indextid;
#if PG_VERSION_NUM >= 130000
HnswElement element = HnswPtrAccess(base, hc->element);
ItemPointerSet(&indextid, hc->element->blkno, hc->element->offno);
tidhash_insert(v->tids, indextid, found);
pointerhash_insert_hash(v->pointers, (uintptr_t) HnswPtrPointer(hc->element), element->hash, found);
#else
pointerhash_insert(v->pointers, (uintptr_t) HnswPtrPointer(hc->element), found);
#endif
}
}
/*
* Count element towards ef
*/
static inline bool
CountElement(char *base, HnswElement skipElement, HnswCandidate * hc)
{
HnswElement e;
if (skipElement == NULL)
return true;
/* Ensure does not access heaptidsLength during in-memory build */
pg_memory_barrier();
e = HnswPtrAccess(base, hc->element);
return e->heaptidsLength != 0;
}
/*
* Algorithm 2 from paper
*/
List *
HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement)
HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement)
{
List *w = NIL;
pairingheap *C = pairingheap_allocate(CompareNearestCandidates, NULL);
@@ -631,12 +736,17 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
int wlen = 0;
visited_hash v;
ListCell *lc2;
HnswNeighborArray *neighborhoodData = NULL;
Size neighborhoodSize;
/* Create hash table */
InitVisited(base, &v, index, ef, m);
/* Create local memory for neighborhood if needed */
if (index == NULL)
v.pointers = pointerhash_create(CurrentMemoryContext, ef * m * 2, NULL);
else
v.tids = tidhash_create(CurrentMemoryContext, ef * m * 2, NULL);
{
neighborhoodSize = offsetof(HnswNeighborArray, items) + sizeof(HnswCandidate) * HnswGetLayerM(m, lc);
neighborhoodData = palloc(neighborhoodSize);
}
/* Add entry points to v, C, and W */
foreach(lc2, ep)
@@ -644,7 +754,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
HnswCandidate *hc = (HnswCandidate *) lfirst(lc2);
bool found;
AddToVisited(&v, hc, index, &found);
AddToVisited(base, &v, hc, index, &found);
pairingheap_add(C, &(CreatePairingHeapNode(hc)->ph_node));
pairingheap_add(W, &(CreatePairingHeapNode(hc)->ph_node));
@@ -654,7 +764,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
* would be ideal to do this for inserts as well, but this could
* affect insert performance.
*/
if (skipElement == NULL || hc->element->heaptidsLength != 0)
if (CountElement(base, skipElement, hc))
wlen++;
}
@@ -663,38 +773,51 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
HnswNeighborArray *neighborhood;
HnswCandidate *c = ((HnswPairingHeapNode *) pairingheap_remove_first(C))->inner;
HnswCandidate *f = ((HnswPairingHeapNode *) pairingheap_first(W))->inner;
HnswElement cElement;
if (c->distance > f->distance)
break;
if (c->element->neighbors == NULL)
HnswLoadNeighbors(c->element, index, m);
cElement = HnswPtrAccess(base, c->element);
if (HnswPtrIsNull(base, cElement->neighbors))
HnswLoadNeighbors(cElement, index, m);
/* Get the neighborhood at layer lc */
neighborhood = HnswGetNeighbors(c->element, lc);
neighborhood = HnswGetNeighbors(base, cElement, lc);
/* Copy neighborhood to local memory if needed */
if (index == NULL)
{
LWLockAcquire(&cElement->lock, LW_SHARED);
memcpy(neighborhoodData, neighborhood, neighborhoodSize);
LWLockRelease(&cElement->lock);
neighborhood = neighborhoodData;
}
for (int i = 0; i < neighborhood->length; i++)
{
HnswCandidate *e = &neighborhood->items[i];
bool visited;
AddToVisited(&v, e, index, &visited);
AddToVisited(base, &v, e, index, &visited);
if (!visited)
{
float eDistance;
HnswElement eElement = HnswPtrAccess(base, e->element);
f = ((HnswPairingHeapNode *) pairingheap_first(W))->inner;
if (index == NULL)
eDistance = GetCandidateDistance(e, q, procinfo, collation);
eDistance = GetCandidateDistance(base, e, q, procinfo, collation);
else
HnswLoadElement(e->element, &eDistance, &q, index, procinfo, collation, inserting);
HnswLoadElement(eElement, &eDistance, &q, index, procinfo, collation, inserting);
Assert(!e->element->deleted);
Assert(!eElement->deleted);
/* Make robust to issues */
if (e->element->level < lc)
if (eElement->level < lc)
continue;
if (eDistance < f->distance || wlen < ef)
@@ -702,7 +825,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
/* Copy e */
HnswCandidate *ec = palloc(sizeof(HnswCandidate));
ec->element = e->element;
HnswPtrStore(base, ec->element, eElement);
ec->distance = eDistance;
pairingheap_add(C, &(CreatePairingHeapNode(ec)->ph_node));
@@ -713,7 +836,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
* vacuuming. It would be ideal to do this for inserts as
* well, but this could affect insert performance.
*/
if (skipElement == NULL || e->element->heaptidsLength != 0)
if (CountElement(base, skipElement, e))
{
wlen++;
@@ -738,7 +861,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
}
/*
* Compare candidate distances
* Compare candidate distances with pointer tie-breaker
*/
static int
#if PG_VERSION_NUM >= 130000
@@ -756,10 +879,38 @@ CompareCandidateDistances(const void *a, const void *b)
if (hca->distance > hcb->distance)
return -1;
if (hca->element < hcb->element)
if (HnswPtrPointer(hca->element) < HnswPtrPointer(hcb->element))
return 1;
if (hca->element > hcb->element)
if (HnswPtrPointer(hca->element) > HnswPtrPointer(hcb->element))
return -1;
return 0;
}
/*
* Compare candidate distances with offset tie-breaker
*/
static int
#if PG_VERSION_NUM >= 130000
CompareCandidateDistancesOffset(const ListCell *a, const ListCell *b)
#else
CompareCandidateDistancesOffset(const void *a, const void *b)
#endif
{
HnswCandidate *hca = lfirst((ListCell *) a);
HnswCandidate *hcb = lfirst((ListCell *) b);
if (hca->distance < hcb->distance)
return 1;
if (hca->distance > hcb->distance)
return -1;
if (HnswPtrOffset(hca->element) < HnswPtrOffset(hcb->element))
return 1;
if (HnswPtrOffset(hca->element) > HnswPtrOffset(hcb->element))
return -1;
return 0;
@@ -769,46 +920,58 @@ CompareCandidateDistances(const void *a, const void *b)
* Calculate the distance between elements
*/
static float
HnswGetDistance(HnswElement a, HnswElement b, int lc, FmgrInfo *procinfo, Oid collation)
HnswGetDistance(char *base, HnswElement a, HnswElement b, int lc, FmgrInfo *procinfo, Oid collation)
{
Datum aValue;
Datum bValue;
/* Look for cached distance */
if (a->neighbors != NULL)
if (!HnswPtrIsNull(base, a->neighbors))
{
HnswNeighborArray *neighbors = HnswGetNeighbors(a, lc);
HnswNeighborArray *neighbors = HnswGetNeighbors(base, a, lc);
for (int i = 0; i < neighbors->length; i++)
{
if (neighbors->items[i].element == b)
HnswElement element = HnswPtrAccess(base, neighbors->items[i].element);
if (element == b)
return neighbors->items[i].distance;
}
}
if (b->neighbors != NULL)
if (!HnswPtrIsNull(base, b->neighbors))
{
HnswNeighborArray *neighbors = HnswGetNeighbors(b, lc);
HnswNeighborArray *neighbors = HnswGetNeighbors(base, b, lc);
for (int i = 0; i < neighbors->length; i++)
{
if (neighbors->items[i].element == a)
HnswElement element = HnswPtrAccess(base, neighbors->items[i].element);
if (element == a)
return neighbors->items[i].distance;
}
}
return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, a->value, b->value));
aValue = HnswGetValue(base, a);
bValue = HnswGetValue(base, b);
return DatumGetFloat8(FunctionCall2Coll(procinfo, collation, aValue, bValue));
}
/*
* Check if an element is closer to q than any element from R
*/
static bool
CheckElementCloser(HnswCandidate * e, List *r, int lc, FmgrInfo *procinfo, Oid collation)
CheckElementCloser(char *base, HnswCandidate * e, List *r, int lc, FmgrInfo *procinfo, Oid collation)
{
HnswElement eElement = HnswPtrAccess(base, e->element);
ListCell *lc2;
foreach(lc2, r)
{
HnswCandidate *ri = lfirst(lc2);
float distance = HnswGetDistance(e->element, ri->element, lc, procinfo, collation);
HnswElement riElement = HnswPtrAccess(base, ri->element);
float distance = HnswGetDistance(base, eElement, riElement, lc, procinfo, collation);
if (distance <= e->distance)
return false;
@@ -821,12 +984,12 @@ CheckElementCloser(HnswCandidate * e, List *r, int lc, FmgrInfo *procinfo, Oid c
* Algorithm 4 from paper
*/
static List *
SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, HnswElement e2, HnswCandidate * newCandidate, HnswCandidate * *pruned, bool sortCandidates)
SelectNeighbors(char *base, List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, HnswElement e2, HnswCandidate * newCandidate, HnswCandidate * *pruned, bool sortCandidates)
{
List *r = NIL;
List *w = list_copy(c);
pairingheap *wd;
HnswNeighborArray *neighbors = HnswGetNeighbors(e2, lc);
HnswNeighborArray *neighbors = HnswGetNeighbors(base, e2, lc);
bool mustCalculate = !neighbors->closerSet;
List *added = NIL;
bool removedAny = false;
@@ -838,7 +1001,12 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw
/* Ensure order of candidates is deterministic for closer caching */
if (sortCandidates)
list_sort(w, CompareCandidateDistances);
{
if (base == NULL)
list_sort(w, CompareCandidateDistances);
else
list_sort(w, CompareCandidateDistancesOffset);
}
while (list_length(w) > 0 && list_length(r) < lm)
{
@@ -849,7 +1017,7 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw
/* Use previous state of r and wd to skip work when possible */
if (mustCalculate)
e->closer = CheckElementCloser(e, r, lc, procinfo, collation);
e->closer = CheckElementCloser(base, e, r, lc, procinfo, collation);
else if (list_length(added) > 0)
{
/*
@@ -858,7 +1026,7 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw
*/
if (e->closer)
{
e->closer = CheckElementCloser(e, added, lc, procinfo, collation);
e->closer = CheckElementCloser(base, e, added, lc, procinfo, collation);
if (!e->closer)
removedAny = true;
@@ -871,7 +1039,7 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw
*/
if (removedAny)
{
e->closer = CheckElementCloser(e, r, lc, procinfo, collation);
e->closer = CheckElementCloser(base, e, r, lc, procinfo, collation);
if (e->closer)
added = lappend(added, e);
}
@@ -879,7 +1047,7 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw
}
else if (e == newCandidate)
{
e->closer = CheckElementCloser(e, r, lc, procinfo, collation);
e->closer = CheckElementCloser(base, e, r, lc, procinfo, collation);
if (e->closer)
added = lappend(added, e);
}
@@ -913,10 +1081,10 @@ SelectNeighbors(List *c, int lm, int lc, FmgrInfo *procinfo, Oid collation, Hnsw
* Add connections
*/
static void
AddConnections(HnswElement element, List *neighbors, int lc)
AddConnections(char *base, HnswElement element, List *neighbors, int lc)
{
ListCell *lc2;
HnswNeighborArray *a = HnswGetNeighbors(element, lc);
HnswNeighborArray *a = HnswGetNeighbors(base, element, lc);
foreach(lc2, neighbors)
a->items[a->length++] = *((HnswCandidate *) lfirst(lc2));
@@ -926,13 +1094,13 @@ AddConnections(HnswElement element, List *neighbors, int lc)
* Update connections
*/
void
HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation)
HnswUpdateConnection(char *base, HnswElement element, HnswCandidate * hc, int lm, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation)
{
HnswNeighborArray *currentNeighbors = HnswGetNeighbors(hc->element, lc);
HnswElement hce = HnswPtrAccess(base, hc->element);
HnswNeighborArray *currentNeighbors = HnswGetNeighbors(base, hce, lc);
HnswCandidate hc2;
hc2.element = element;
HnswPtrStore(base, hc2.element, element);
hc2.distance = hc->distance;
if (currentNeighbors->length < lm)
@@ -951,19 +1119,20 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, in
/* Load elements on insert */
if (index != NULL)
{
Datum q = hc->element->value;
Datum q = HnswGetValue(base, hce);
for (int i = 0; i < currentNeighbors->length; i++)
{
HnswCandidate *hc3 = &currentNeighbors->items[i];
HnswElement hc3Element = HnswPtrAccess(base, hc3->element);
if (DatumGetPointer(hc3->element->value) == NULL)
HnswLoadElement(hc3->element, &hc3->distance, &q, index, procinfo, collation, true);
if (HnswPtrIsNull(base, hc3Element->value))
HnswLoadElement(hc3Element, &hc3->distance, &q, index, procinfo, collation, true);
else
hc3->distance = GetCandidateDistance(hc3, q, procinfo, collation);
hc3->distance = GetCandidateDistance(base, hc3, q, procinfo, collation);
/* Prune element if being deleted */
if (hc3->element->heaptidsLength == 0)
if (hc3Element->heaptidsLength == 0)
{
pruned = &currentNeighbors->items[i];
break;
@@ -980,7 +1149,7 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, in
c = lappend(c, &currentNeighbors->items[i]);
c = lappend(c, &hc2);
SelectNeighbors(c, lm, lc, procinfo, collation, hc->element, &hc2, &pruned, true);
SelectNeighbors(base, c, lm, lc, procinfo, collation, hce, &hc2, &pruned, true);
/* Should not happen */
if (pruned == NULL)
@@ -990,7 +1159,7 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, in
/* Find and replace the pruned element */
for (int i = 0; i < currentNeighbors->length; i++)
{
if (currentNeighbors->items[i].element == pruned->element)
if (HnswPtrEqual(base, currentNeighbors->items[i].element, pruned->element))
{
currentNeighbors->items[i] = hc2;
@@ -1008,43 +1177,65 @@ HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int lm, int lc, in
* Remove elements being deleted or skipped
*/
static List *
RemoveElements(List *w, HnswElement skipElement)
RemoveElements(char *base, List *w, HnswElement skipElement)
{
ListCell *lc2;
List *w2 = NIL;
/* Ensure does not access heaptidsLength during in-memory build */
pg_memory_barrier();
foreach(lc2, w)
{
HnswCandidate *hc = (HnswCandidate *) lfirst(lc2);
HnswElement hce = HnswPtrAccess(base, hc->element);
/* Skip self for vacuuming update */
if (skipElement != NULL && hc->element->blkno == skipElement->blkno && hc->element->offno == skipElement->offno)
if (skipElement != NULL && hce->blkno == skipElement->blkno && hce->offno == skipElement->offno)
continue;
if (hc->element->heaptidsLength != 0)
if (hce->heaptidsLength != 0)
w2 = lappend(w2, hc);
}
return w2;
}
#if PG_VERSION_NUM >= 130000
/*
* Precompute hash
*/
static void
PrecomputeHash(char *base, HnswElement element)
{
HnswElementPtr ptr;
HnswPtrStore(base, ptr, element);
if (base == NULL)
element->hash = hash_pointer((uintptr_t) HnswPtrPointer(ptr));
else
element->hash = hash_offset(HnswPtrOffset(ptr));
}
#endif
/*
* Algorithm 1 from paper
*/
void
HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing)
HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing)
{
List *ep;
List *w;
int level = element->level;
int entryLevel;
Datum q = element->value;
Datum q = HnswGetValue(base, element);
HnswElement skipElement = existing ? element : NULL;
#if PG_VERSION_NUM >= 130000
/* Precompute hash */
if (index == NULL)
element->hash = hash_pointer((uintptr_t) element);
PrecomputeHash(base, element);
#endif
/* No neighbors if no entry point */
@@ -1052,13 +1243,13 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F
return;
/* Get entry point and level */
ep = list_make1(HnswEntryCandidate(entryPoint, q, index, procinfo, collation, true));
ep = list_make1(HnswEntryCandidate(base, entryPoint, q, index, procinfo, collation, true));
entryLevel = entryPoint->level;
/* 1st phase: greedy search to insert level */
for (int lc = entryLevel; lc >= level + 1; lc--)
{
w = HnswSearchLayer(q, ep, 1, lc, index, procinfo, collation, m, true, skipElement);
w = HnswSearchLayer(base, q, ep, 1, lc, index, procinfo, collation, m, true, skipElement);
ep = w;
}
@@ -1076,12 +1267,12 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F
List *neighbors;
List *lw;
w = HnswSearchLayer(q, ep, efConstruction, lc, index, procinfo, collation, m, true, skipElement);
w = HnswSearchLayer(base, q, ep, efConstruction, lc, index, procinfo, collation, m, true, skipElement);
/* Elements being deleted or skipped can help with search */
/* but should be removed before selecting neighbors */
if (index != NULL)
lw = RemoveElements(w, skipElement);
lw = RemoveElements(base, w, skipElement);
else
lw = w;
@@ -1090,9 +1281,9 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F
* sortCandidates to true for in-memory builds to enable closer
* caching, but there does not seem to be a difference in performance.
*/
neighbors = SelectNeighbors(lw, lm, lc, procinfo, collation, element, NULL, NULL, false);
neighbors = SelectNeighbors(base, lw, lm, lc, procinfo, collation, element, NULL, NULL, false);
AddConnections(element, neighbors, lc);
AddConnections(base, element, neighbors, lc);
ep = w;
}

View File

@@ -199,21 +199,22 @@ RepairGraphElement(HnswVacuumState * vacuumstate, HnswElement element, HnswEleme
BufferAccessStrategy bas = vacuumstate->bas;
HnswNeighborTuple ntup = vacuumstate->ntup;
Size ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(element->level, m);
char *base = NULL;
/* Skip if element is entry point */
if (entryPoint != NULL && element->blkno == entryPoint->blkno && element->offno == entryPoint->offno)
return;
/* Init fields */
HnswInitNeighbors(element, m);
HnswInitNeighbors(base, element, m, NULL);
element->heaptidsLength = 0;
/* Add element to graph, skipping itself */
HnswInsertElement(element, entryPoint, index, procinfo, collation, m, efConstruction, true);
/* Find neighbors for element, skipping itself */
HnswFindElementNeighbors(base, element, entryPoint, index, procinfo, collation, m, efConstruction, true);
/* Update neighbor tuple */
/* Do this before getting page to minimize locking */
HnswSetNeighborTuple(ntup, element, m);
HnswSetNeighborTuple(base, ntup, element, m);
/* Get neighbor page */
buf = ReadBufferExtended(index, MAIN_FORKNUM, element->neighborPage, RBM_NORMAL, bas);
@@ -230,7 +231,7 @@ RepairGraphElement(HnswVacuumState * vacuumstate, HnswElement element, HnswEleme
UnlockReleaseBuffer(buf);
/* Update neighbors */
HnswUpdateNeighborPages(index, procinfo, collation, element, m, true, false);
HnswUpdateNeighborsOnDisk(index, procinfo, collation, element, m, true, false);
}
/*
@@ -301,7 +302,7 @@ RepairGraphEntryPoint(HnswVacuumState * vacuumstate)
{
/* Reset neighbors from previous update */
if (highestPoint != NULL)
highestPoint->neighbors = NULL;
HnswPtrStore((char *) NULL, highestPoint->neighbors, (HnswNeighborArrayPtr *) NULL);
RepairGraphElement(vacuumstate, entryPoint, highestPoint);
}

View File

@@ -95,11 +95,10 @@ for my $i (0 .. $#operators)
$node->safe_psql("postgres", "DROP INDEX idx;");
# Build index in parallel
# Build index in parallel in memory
my ($ret, $stdout, $stderr) = $node->psql("postgres", qq(
SET client_min_messages = DEBUG;
SET min_parallel_table_scan_size = 1;
SET hnsw.enable_parallel_build = on;
CREATE INDEX idx ON tst USING hnsw (v $opclass);
));
is($ret, 0, $stderr);
@@ -109,6 +108,21 @@ for my $i (0 .. $#operators)
test_recall($min, $operator);
$node->safe_psql("postgres", "DROP INDEX idx;");
# Build index in parallel on disk
# Set parallel_workers on table to use workers with low maintenance_work_mem
($ret, $stdout, $stderr) = $node->psql("postgres", qq(
ALTER TABLE tst SET (parallel_workers = 2);
SET client_min_messages = DEBUG;
SET maintenance_work_mem = '4MB';
CREATE INDEX idx ON tst USING hnsw (v $opclass);
ALTER TABLE tst RESET (parallel_workers);
));
is($ret, 0, $stderr);
like($stderr, qr/using \d+ parallel workers/);
like($stderr, qr/hnsw graph no longer fits into maintenance_work_mem/);
$node->safe_psql("postgres", "DROP INDEX idx;");
}
done_testing();