mirror of
https://github.com/pgvector/pgvector.git
synced 2026-06-30 09:41:15 +08:00
Improved concurrent inserts with empty entry point, part 2
This commit is contained in:
@@ -258,9 +258,11 @@ List *HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, Fmgr
|
||||
HnswElement HnswGetEntryPoint(Relation index);
|
||||
HnswElement HnswInitElement(ItemPointer tid, int m, double ml, int maxLevel);
|
||||
void HnswFreeElement(HnswElement element);
|
||||
HnswElement HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno);
|
||||
void HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing);
|
||||
HnswElement HnswFindDuplicate(HnswElement e);
|
||||
HnswCandidate *HnswEntryCandidate(HnswElement em, Datum q, Relation rel, FmgrInfo *procinfo, Oid collation, bool loadVec);
|
||||
void HnswUpdateMetaPageInfo(Page page, bool updateEntry, HnswElement entryPoint, BlockNumber insertPage);
|
||||
void HnswUpdateMetaPage(Relation index, bool updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum);
|
||||
void HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m);
|
||||
void HnswAddHeapTid(HnswElement element, ItemPointer heaptid);
|
||||
|
||||
@@ -110,7 +110,7 @@ HnswInsertAppendPage(Relation index, Buffer *nbuf, Page *npage, GenericXLogState
|
||||
* Add to element and neighbor pages
|
||||
*/
|
||||
static void
|
||||
WriteNewElementPages(Relation index, HnswElement e, int m)
|
||||
WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPage, BlockNumber *newInsertPage)
|
||||
{
|
||||
Buffer buf;
|
||||
Page page;
|
||||
@@ -119,7 +119,6 @@ WriteNewElementPages(Relation index, HnswElement e, int m)
|
||||
Size ntupSize;
|
||||
Size combinedSize;
|
||||
HnswElementTuple etup;
|
||||
BlockNumber insertPage = GetInsertPage(index);
|
||||
BlockNumber originalInsertPage = insertPage;
|
||||
int dimensions = e->vec->dim;
|
||||
HnswNeighborTuple ntup;
|
||||
@@ -266,7 +265,7 @@ WriteNewElementPages(Relation index, HnswElement e, int m)
|
||||
|
||||
/* Update the insert page */
|
||||
if (insertPage != originalInsertPage && (!OffsetNumberIsValid(freeOffno) || firstFreePage == insertPage))
|
||||
HnswUpdateMetaPage(index, false, NULL, insertPage, MAIN_FORKNUM);
|
||||
*newInsertPage = insertPage;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -350,6 +349,50 @@ UpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswEleme
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Add the entry point
|
||||
*/
|
||||
static bool
|
||||
HnswAddEntryPoint(Relation index, HnswElement element, int m, HnswElement * entryPoint)
|
||||
{
|
||||
Buffer buf;
|
||||
Page page;
|
||||
GenericXLogState *state;
|
||||
HnswMetaPage metap;
|
||||
BlockNumber newInsertPage = InvalidBlockNumber;
|
||||
|
||||
/* Lock the metapage to prevent concurrent inserts */
|
||||
buf = ReadBuffer(index, HNSW_METAPAGE_BLKNO);
|
||||
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
||||
page = BufferGetPage(buf);
|
||||
metap = HnswPageGetMeta(page);
|
||||
|
||||
/* Check for new entry point after lock is acquired */
|
||||
if (BlockNumberIsValid(metap->entryBlkno))
|
||||
{
|
||||
*entryPoint = HnswInitElementFromBlock(metap->entryBlkno, metap->entryOffno);
|
||||
|
||||
UnlockReleaseBuffer(buf);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Write element and neighbor tuples */
|
||||
WriteNewElementPages(index, element, m, metap->insertPage, &newInsertPage);
|
||||
|
||||
/* Start WAL */
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
|
||||
/* Update the metapage info */
|
||||
HnswUpdateMetaPageInfo(page, true, element, newInsertPage);
|
||||
|
||||
/* Commit and unlock */
|
||||
HnswCommitBuffer(buf, state);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Add a heap TID to an existing element
|
||||
*/
|
||||
@@ -406,6 +449,8 @@ HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup)
|
||||
static void
|
||||
WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement dup, HnswElement entryPoint)
|
||||
{
|
||||
BlockNumber newInsertPage = InvalidBlockNumber;
|
||||
|
||||
/* Try to add to existing page */
|
||||
if (dup != NULL)
|
||||
{
|
||||
@@ -413,25 +458,19 @@ WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement elem
|
||||
return;
|
||||
}
|
||||
|
||||
/* If fails, take this path */
|
||||
WriteNewElementPages(index, element, m);
|
||||
/* Write element and neighbor tuples */
|
||||
WriteNewElementPages(index, element, m, GetInsertPage(index), &newInsertPage);
|
||||
|
||||
/* Update insert page if needed */
|
||||
if (BlockNumberIsValid(newInsertPage))
|
||||
HnswUpdateMetaPage(index, false, NULL, newInsertPage, MAIN_FORKNUM);
|
||||
|
||||
/* Update neighbors */
|
||||
UpdateNeighborPages(index, procinfo, collation, element, m);
|
||||
|
||||
/* Update metapage if needed */
|
||||
if (entryPoint == NULL || element->level > entryPoint->level)
|
||||
{
|
||||
/* TODO Lock metapage for entire block */
|
||||
HnswElement newEntryPoint = HnswGetEntryPoint(index);
|
||||
|
||||
if (entryPoint == NULL && newEntryPoint != NULL)
|
||||
{
|
||||
/* Try again with new entry point */
|
||||
HnswInsertElement(element, newEntryPoint, index, procinfo, collation, m, efConstruction, true);
|
||||
UpdateNeighborPages(index, procinfo, collation, element, m);
|
||||
}
|
||||
else
|
||||
HnswUpdateMetaPage(index, true, element, InvalidBlockNumber, MAIN_FORKNUM);
|
||||
}
|
||||
if (element->level > entryPoint->level)
|
||||
HnswUpdateMetaPage(index, true, element, InvalidBlockNumber, MAIN_FORKNUM);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -469,6 +508,13 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti
|
||||
/* Get entry point */
|
||||
entryPoint = HnswGetEntryPoint(index);
|
||||
|
||||
/* Special case for no entry point */
|
||||
if (entryPoint == NULL)
|
||||
{
|
||||
if (HnswAddEntryPoint(index, element, m, &entryPoint))
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Insert element in graph */
|
||||
HnswInsertElement(element, entryPoint, index, procinfo, collation, m, efConstruction, false);
|
||||
|
||||
|
||||
@@ -197,8 +197,8 @@ HnswAddHeapTid(HnswElement element, ItemPointer heaptid)
|
||||
/*
|
||||
* Allocate an element from block and offset numbers
|
||||
*/
|
||||
static HnswElement
|
||||
InitElementFromBlock(BlockNumber blkno, OffsetNumber offno)
|
||||
HnswElement
|
||||
HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno)
|
||||
{
|
||||
HnswElement element = palloc(sizeof(HnswElementData));
|
||||
|
||||
@@ -226,7 +226,7 @@ HnswGetEntryPoint(Relation index)
|
||||
metap = HnswPageGetMeta(page);
|
||||
|
||||
if (BlockNumberIsValid(metap->entryBlkno))
|
||||
entryPoint = InitElementFromBlock(metap->entryBlkno, metap->entryOffno);
|
||||
entryPoint = HnswInitElementFromBlock(metap->entryBlkno, metap->entryOffno);
|
||||
|
||||
UnlockReleaseBuffer(buf);
|
||||
|
||||
@@ -234,22 +234,12 @@ HnswGetEntryPoint(Relation index)
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the metapage
|
||||
* Update the metapage info
|
||||
*/
|
||||
void
|
||||
HnswUpdateMetaPage(Relation index, bool updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum)
|
||||
HnswUpdateMetaPageInfo(Page page, bool updateEntry, HnswElement entryPoint, BlockNumber insertPage)
|
||||
{
|
||||
Buffer buf;
|
||||
Page page;
|
||||
GenericXLogState *state;
|
||||
HnswMetaPage metap;
|
||||
|
||||
buf = ReadBufferExtended(index, forkNum, HNSW_METAPAGE_BLKNO, RBM_NORMAL, NULL);
|
||||
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
|
||||
metap = HnswPageGetMeta(page);
|
||||
HnswMetaPage metap = HnswPageGetMeta(page);
|
||||
|
||||
if (updateEntry)
|
||||
{
|
||||
@@ -269,6 +259,24 @@ HnswUpdateMetaPage(Relation index, bool updateEntry, HnswElement entryPoint, Blo
|
||||
|
||||
if (BlockNumberIsValid(insertPage))
|
||||
metap->insertPage = insertPage;
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the metapage
|
||||
*/
|
||||
void
|
||||
HnswUpdateMetaPage(Relation index, bool updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum)
|
||||
{
|
||||
Buffer buf;
|
||||
Page page;
|
||||
GenericXLogState *state;
|
||||
|
||||
buf = ReadBufferExtended(index, forkNum, HNSW_METAPAGE_BLKNO, RBM_NORMAL, NULL);
|
||||
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
|
||||
HnswUpdateMetaPageInfo(page, updateEntry, entryPoint, insertPage);
|
||||
|
||||
HnswCommitBuffer(buf, state);
|
||||
}
|
||||
@@ -356,7 +364,7 @@ LoadNeighborsFromPage(HnswElement element, Relation index, Page page)
|
||||
if (!ItemPointerIsValid(indextid))
|
||||
continue;
|
||||
|
||||
e = InitElementFromBlock(ItemPointerGetBlockNumber(indextid), ItemPointerGetOffsetNumber(indextid));
|
||||
e = HnswInitElementFromBlock(ItemPointerGetBlockNumber(indextid), ItemPointerGetOffsetNumber(indextid));
|
||||
|
||||
/* Calculate level based on offset */
|
||||
level = element->level - i / m;
|
||||
@@ -896,19 +904,15 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F
|
||||
bool removeEntryPoint;
|
||||
HnswCandidate *entryCandidate;
|
||||
|
||||
/* No neighbors if no entry point */
|
||||
if (entryPoint == NULL)
|
||||
return;
|
||||
|
||||
/* Get entry point and level */
|
||||
if (entryPoint != NULL)
|
||||
{
|
||||
entryCandidate = HnswEntryCandidate(entryPoint, q, index, procinfo, collation, true);
|
||||
ep = lappend(ep, entryCandidate);
|
||||
entryLevel = entryPoint->level;
|
||||
removeEntryPoint = existing && list_length(entryPoint->heaptids) == 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
entryLevel = -1;
|
||||
removeEntryPoint = false;
|
||||
}
|
||||
entryCandidate = HnswEntryCandidate(entryPoint, q, index, procinfo, collation, true);
|
||||
ep = lappend(ep, entryCandidate);
|
||||
entryLevel = entryPoint->level;
|
||||
removeEntryPoint = existing && list_length(entryPoint->heaptids) == 0;
|
||||
|
||||
/* 1st phase: greedy search to insert level */
|
||||
for (int lc = entryLevel; lc >= level + 1; lc--)
|
||||
|
||||
Reference in New Issue
Block a user