Files
pgvector/src/hnswinsert.c
2024-04-25 13:46:45 -07:00

673 lines
17 KiB
C

#include "postgres.h"
#include <math.h>
#include "access/generic_xlog.h"
#include "hnsw.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/datum.h"
#include "utils/memutils.h"
/*
* Get the insert page
*/
static BlockNumber
GetInsertPage(Relation index)
{
Buffer buf;
Page page;
HnswMetaPage metap;
BlockNumber insertPage;
buf = ReadBuffer(index, HNSW_METAPAGE_BLKNO);
LockBuffer(buf, BUFFER_LOCK_SHARE);
page = BufferGetPage(buf);
metap = HnswPageGetMeta(page);
insertPage = metap->insertPage;
UnlockReleaseBuffer(buf);
return insertPage;
}
/*
* Check for a free offset
*/
static bool
HnswFreeOffset(Relation index, Buffer buf, Page page, HnswElement element, Size ntupSize, Buffer *nbuf, Page *npage, OffsetNumber *freeOffno, OffsetNumber *freeNeighborOffno, BlockNumber *newInsertPage)
{
OffsetNumber offno;
OffsetNumber maxoffno = PageGetMaxOffsetNumber(page);
for (offno = FirstOffsetNumber; offno <= maxoffno; offno = OffsetNumberNext(offno))
{
HnswElementTuple etup = (HnswElementTuple) PageGetItem(page, PageGetItemId(page, offno));
/* Skip neighbor tuples */
if (!HnswIsElementTuple(etup))
continue;
if (etup->deleted)
{
BlockNumber elementPage = BufferGetBlockNumber(buf);
BlockNumber neighborPage = ItemPointerGetBlockNumber(&etup->neighbortid);
OffsetNumber neighborOffno = ItemPointerGetOffsetNumber(&etup->neighbortid);
ItemId itemid;
if (!BlockNumberIsValid(*newInsertPage))
*newInsertPage = elementPage;
if (neighborPage == elementPage)
{
*nbuf = buf;
*npage = page;
}
else
{
*nbuf = ReadBuffer(index, neighborPage);
LockBuffer(*nbuf, BUFFER_LOCK_EXCLUSIVE);
/* Skip WAL for now */
*npage = BufferGetPage(*nbuf);
}
itemid = PageGetItemId(*npage, neighborOffno);
/* Check for space on neighbor tuple page */
if (PageGetFreeSpace(*npage) + ItemIdGetLength(itemid) - sizeof(ItemIdData) >= ntupSize)
{
*freeOffno = offno;
*freeNeighborOffno = neighborOffno;
return true;
}
else if (*nbuf != buf)
UnlockReleaseBuffer(*nbuf);
}
}
return false;
}
/*
* Add a new page
*/
static void
HnswInsertAppendPage(Relation index, Buffer *nbuf, Page *npage, GenericXLogState *state, Page page, bool building)
{
/* Add a new page */
LockRelationForExtension(index, ExclusiveLock);
*nbuf = HnswNewBuffer(index, MAIN_FORKNUM);
UnlockRelationForExtension(index, ExclusiveLock);
/* Init new page */
if (building)
*npage = BufferGetPage(*nbuf);
else
*npage = GenericXLogRegisterBuffer(state, *nbuf, GENERIC_XLOG_FULL_IMAGE);
HnswInitPage(*nbuf, *npage);
/* Update previous buffer */
HnswPageGetOpaque(page)->nextblkno = BufferGetBlockNumber(*nbuf);
}
/*
* Add to element and neighbor pages
*/
static void
AddElementOnDisk(Relation index, HnswElement e, int m, BlockNumber insertPage, BlockNumber *updatedInsertPage, bool building)
{
Buffer buf;
Page page;
GenericXLogState *state;
Size etupSize;
Size ntupSize;
Size combinedSize;
Size maxSize;
Size minCombinedSize;
HnswElementTuple etup;
BlockNumber currentPage = insertPage;
HnswNeighborTuple ntup;
Buffer nbuf;
Page npage;
OffsetNumber freeOffno = InvalidOffsetNumber;
OffsetNumber freeNeighborOffno = InvalidOffsetNumber;
BlockNumber newInsertPage = InvalidBlockNumber;
char *base = NULL;
/* Calculate sizes */
etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(HnswPtrAccess(base, e->value)));
ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(e->level, m);
combinedSize = etupSize + ntupSize + sizeof(ItemIdData);
maxSize = HNSW_MAX_SIZE;
minCombinedSize = etupSize + HNSW_NEIGHBOR_TUPLE_SIZE(0, m) + sizeof(ItemIdData);
/* Prepare element tuple */
etup = palloc0(etupSize);
HnswSetElementTuple(base, etup, e);
/* Prepare neighbor tuple */
ntup = palloc0(ntupSize);
HnswSetNeighborTuple(base, ntup, e, m);
/* Find a page (or two if needed) to insert the tuples */
for (;;)
{
buf = ReadBuffer(index, currentPage);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
if (building)
{
state = NULL;
page = BufferGetPage(buf);
}
else
{
state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buf, 0);
}
/* Keep track of first page where element at level 0 can fit */
if (!BlockNumberIsValid(newInsertPage) && PageGetFreeSpace(page) >= minCombinedSize)
newInsertPage = currentPage;
/* First, try the fastest path */
/* Space for both tuples on the current page */
/* This can split existing tuples in rare cases */
if (PageGetFreeSpace(page) >= combinedSize)
{
nbuf = buf;
npage = page;
break;
}
/* Next, try space from a deleted element */
if (HnswFreeOffset(index, buf, page, e, ntupSize, &nbuf, &npage, &freeOffno, &freeNeighborOffno, &newInsertPage))
{
if (nbuf != buf)
{
if (building)
npage = BufferGetPage(nbuf);
else
npage = GenericXLogRegisterBuffer(state, nbuf, 0);
}
break;
}
/* Finally, try space for element only if last page */
/* Skip if both tuples can fit on the same page */
if (combinedSize > maxSize && PageGetFreeSpace(page) >= etupSize && !BlockNumberIsValid(HnswPageGetOpaque(page)->nextblkno))
{
HnswInsertAppendPage(index, &nbuf, &npage, state, page, building);
break;
}
currentPage = HnswPageGetOpaque(page)->nextblkno;
if (BlockNumberIsValid(currentPage))
{
/* Move to next page */
if (!building)
GenericXLogAbort(state);
UnlockReleaseBuffer(buf);
}
else
{
Buffer newbuf;
Page newpage;
HnswInsertAppendPage(index, &newbuf, &newpage, state, page, building);
/* Commit */
if (building)
MarkBufferDirty(buf);
else
GenericXLogFinish(state);
/* Unlock previous buffer */
UnlockReleaseBuffer(buf);
/* Prepare new buffer */
buf = newbuf;
if (building)
{
state = NULL;
page = BufferGetPage(buf);
}
else
{
state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buf, 0);
}
/* Create new page for neighbors if needed */
if (PageGetFreeSpace(page) < combinedSize)
HnswInsertAppendPage(index, &nbuf, &npage, state, page, building);
else
{
nbuf = buf;
npage = page;
}
break;
}
}
e->blkno = BufferGetBlockNumber(buf);
e->neighborPage = BufferGetBlockNumber(nbuf);
/* Added tuple to new page if newInsertPage is not set */
/* So can set to neighbor page instead of element page */
if (!BlockNumberIsValid(newInsertPage))
newInsertPage = e->neighborPage;
if (OffsetNumberIsValid(freeOffno))
{
e->offno = freeOffno;
e->neighborOffno = freeNeighborOffno;
}
else
{
e->offno = OffsetNumberNext(PageGetMaxOffsetNumber(page));
if (nbuf == buf)
e->neighborOffno = OffsetNumberNext(e->offno);
else
e->neighborOffno = FirstOffsetNumber;
}
ItemPointerSet(&etup->neighbortid, e->neighborPage, e->neighborOffno);
/* Add element and neighbors */
if (OffsetNumberIsValid(freeOffno))
{
if (!PageIndexTupleOverwrite(page, e->offno, (Item) etup, etupSize))
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
if (!PageIndexTupleOverwrite(npage, e->neighborOffno, (Item) ntup, ntupSize))
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
}
else
{
if (PageAddItem(page, (Item) etup, etupSize, InvalidOffsetNumber, false, false) != e->offno)
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
if (PageAddItem(npage, (Item) ntup, ntupSize, InvalidOffsetNumber, false, false) != e->neighborOffno)
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
}
/* Commit */
if (building)
{
MarkBufferDirty(buf);
if (nbuf != buf)
MarkBufferDirty(nbuf);
}
else
GenericXLogFinish(state);
UnlockReleaseBuffer(buf);
if (nbuf != buf)
UnlockReleaseBuffer(nbuf);
/* Update the insert page */
if (BlockNumberIsValid(newInsertPage) && newInsertPage != insertPage)
*updatedInsertPage = newInsertPage;
}
/*
* Check if connection already exists
*/
static bool
ConnectionExists(HnswElement e, HnswNeighborTuple ntup, int startIdx, int lm)
{
for (int i = 0; i < lm; i++)
{
ItemPointer indextid = &ntup->indextids[startIdx + i];
if (!ItemPointerIsValid(indextid))
break;
if (ItemPointerGetBlockNumber(indextid) == e->blkno && ItemPointerGetOffsetNumber(indextid) == e->offno)
return true;
}
return false;
}
/*
* Update neighbors
*/
void
HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building)
{
char *base = NULL;
for (int lc = e->level; lc >= 0; lc--)
{
int lm = HnswGetLayerM(m, lc);
HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc);
for (int i = 0; i < neighbors->length; i++)
{
HnswCandidate *hc = &neighbors->items[i];
Buffer buf;
Page page;
GenericXLogState *state;
HnswNeighborTuple ntup;
int idx = -1;
int startIdx;
HnswElement neighborElement = HnswPtrAccess(base, hc->element);
OffsetNumber offno = neighborElement->neighborOffno;
/* Get latest neighbors since they may have changed */
/* Do not lock yet since selecting neighbors can take time */
HnswLoadNeighbors(neighborElement, index, m);
/*
* Could improve performance for vacuuming by checking neighbors
* against list of elements being deleted to find index. It's
* important to exclude already deleted elements for this since
* they can be replaced at any time.
*/
/* Select neighbors */
HnswUpdateConnection(NULL, e, hc, lm, lc, &idx, index, procinfo, collation);
/* New element was not selected as a neighbor */
if (idx == -1)
continue;
/* Register page */
buf = ReadBuffer(index, neighborElement->neighborPage);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
if (building)
{
state = NULL;
page = BufferGetPage(buf);
}
else
{
state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buf, 0);
}
/* Get tuple */
ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, offno));
/* Calculate index for update */
startIdx = (neighborElement->level - lc) * m;
/* Check for existing connection */
if (checkExisting && ConnectionExists(e, ntup, startIdx, lm))
idx = -1;
else if (idx == -2)
{
/* Find free offset if still exists */
/* TODO Retry updating connections if not */
for (int j = 0; j < lm; j++)
{
if (!ItemPointerIsValid(&ntup->indextids[startIdx + j]))
{
idx = startIdx + j;
break;
}
}
}
else
idx += startIdx;
/* Make robust to issues */
if (idx >= 0 && idx < ntup->count)
{
ItemPointer indextid = &ntup->indextids[idx];
/* Update neighbor on the buffer */
ItemPointerSet(indextid, e->blkno, e->offno);
/* Commit */
if (building)
MarkBufferDirty(buf);
else
GenericXLogFinish(state);
}
else if (!building)
GenericXLogAbort(state);
UnlockReleaseBuffer(buf);
}
}
}
/*
* Add a heap TID to an existing element
*/
static bool
AddDuplicateOnDisk(Relation index, HnswElement element, HnswElement dup, bool building)
{
Buffer buf;
Page page;
GenericXLogState *state;
HnswElementTuple etup;
int i;
/* Read page */
buf = ReadBuffer(index, dup->blkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
if (building)
{
state = NULL;
page = BufferGetPage(buf);
}
else
{
state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buf, 0);
}
/* Find space */
etup = (HnswElementTuple) PageGetItem(page, PageGetItemId(page, dup->offno));
for (i = 0; i < HNSW_HEAPTIDS; i++)
{
if (!ItemPointerIsValid(&etup->heaptids[i]))
break;
}
/* Either being deleted or we lost our chance to another backend */
if (i == 0 || i == HNSW_HEAPTIDS)
{
if (!building)
GenericXLogAbort(state);
UnlockReleaseBuffer(buf);
return false;
}
/* Add heap TID, modifying the tuple on the page directly */
etup->heaptids[i] = element->heaptids[0];
/* Commit */
if (building)
MarkBufferDirty(buf);
else
GenericXLogFinish(state);
UnlockReleaseBuffer(buf);
return true;
}
/*
* Find duplicate element
*/
static bool
FindDuplicateOnDisk(Relation index, HnswElement element, bool building)
{
char *base = NULL;
HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0);
Datum value = HnswGetValue(base, element);
for (int i = 0; i < neighbors->length; i++)
{
HnswCandidate *neighbor = &neighbors->items[i];
HnswElement neighborElement = HnswPtrAccess(base, neighbor->element);
Datum neighborValue = HnswGetValue(base, neighborElement);
/* Exit early since ordered by distance */
if (!datumIsEqual(value, neighborValue, false, -1))
return false;
if (AddDuplicateOnDisk(index, element, neighborElement, building))
return true;
}
return false;
}
/*
* Update graph on disk
*/
static void
UpdateGraphOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement entryPoint, bool building)
{
BlockNumber newInsertPage = InvalidBlockNumber;
/* Look for duplicate */
if (FindDuplicateOnDisk(index, element, building))
return;
/* Add element */
AddElementOnDisk(index, element, m, GetInsertPage(index), &newInsertPage, building);
/* Update insert page if needed */
if (BlockNumberIsValid(newInsertPage))
HnswUpdateMetaPage(index, 0, NULL, newInsertPage, MAIN_FORKNUM, building);
/* Update neighbors */
HnswUpdateNeighborsOnDisk(index, procinfo, collation, element, m, false, building);
/* Update entry point if needed */
if (entryPoint == NULL || element->level > entryPoint->level)
HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_GREATER, element, InvalidBlockNumber, MAIN_FORKNUM, building);
}
/*
* Insert a tuple into the index
*/
bool
HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, ItemPointer heap_tid, bool building)
{
HnswElement entryPoint;
HnswElement element;
int m;
int efConstruction = HnswGetEfConstruction(index);
FmgrInfo *procinfo = index_getprocinfo(index, 1, HNSW_DISTANCE_PROC);
Oid collation = index->rd_indcollation[0];
LOCKMODE lockmode = ShareLock;
char *base = NULL;
/*
* Get a shared lock. This allows vacuum to ensure no in-flight inserts
* before repairing graph. Use a page lock so it does not interfere with
* buffer lock (or reads when vacuuming).
*/
LockPage(index, HNSW_UPDATE_LOCK, lockmode);
/* Get m and entry point */
HnswGetMetaPageInfo(index, &m, &entryPoint);
/* Create an element */
element = HnswInitElement(base, heap_tid, m, HnswGetMl(m), HnswGetMaxLevel(m), NULL);
HnswPtrStore(base, element->value, DatumGetPointer(value));
/* Prevent concurrent inserts when likely updating entry point */
if (entryPoint == NULL || element->level > entryPoint->level)
{
/* Release shared lock */
UnlockPage(index, HNSW_UPDATE_LOCK, lockmode);
/* Get exclusive lock */
lockmode = ExclusiveLock;
LockPage(index, HNSW_UPDATE_LOCK, lockmode);
/* Get latest entry point after lock is acquired */
entryPoint = HnswGetEntryPoint(index);
}
/* Find neighbors for element */
HnswFindElementNeighbors(base, element, entryPoint, index, procinfo, collation, m, efConstruction, false);
/* Update graph on disk */
UpdateGraphOnDisk(index, procinfo, collation, element, m, efConstruction, entryPoint, building);
/* Release lock */
UnlockPage(index, HNSW_UPDATE_LOCK, lockmode);
return true;
}
/*
* Insert a tuple into the index
*/
static void
HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid)
{
Datum value;
const HnswTypeInfo *typeInfo = HnswGetTypeInfo(index);
FmgrInfo *normprocinfo;
Oid collation = index->rd_indcollation[0];
/* Detoast once for all calls */
value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
/* Check value */
if (typeInfo->checkValue != NULL)
typeInfo->checkValue(DatumGetPointer(value));
/* Normalize if needed */
normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC);
if (normprocinfo != NULL)
{
if (!HnswCheckNorm(normprocinfo, collation, value))
return;
value = HnswNormValue(typeInfo, collation, value);
}
HnswInsertTupleOnDisk(index, value, values, isnull, heap_tid, false);
}
/*
* Insert a tuple into the index
*/
bool
hnswinsert(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid,
Relation heap, IndexUniqueCheck checkUnique
#if PG_VERSION_NUM >= 140000
,bool indexUnchanged
#endif
,IndexInfo *indexInfo
)
{
MemoryContext oldCtx;
MemoryContext insertCtx;
/* Skip nulls */
if (isnull[0])
return false;
/* Create memory context */
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
"Hnsw insert temporary context",
ALLOCSET_DEFAULT_SIZES);
oldCtx = MemoryContextSwitchTo(insertCtx);
/* Insert tuple */
HnswInsertTuple(index, values, isnull, heap_tid);
/* Delete memory context */
MemoryContextSwitchTo(oldCtx);
MemoryContextDelete(insertCtx);
return false;
}