mirror of
https://github.com/pgvector/pgvector.git
synced 2026-07-01 10:11:20 +08:00
Reduced WAL generation for HNSW index builds
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
|
||||
- Improved performance of HNSW
|
||||
- Added support for on-disk parallel index builds for HNSW
|
||||
- Reduced WAL generation for HNSW index builds
|
||||
- Fixed `invalid memory alloc request size` error with HNSW index build
|
||||
|
||||
## 0.5.1 (2023-10-10)
|
||||
|
||||
@@ -336,8 +336,8 @@ void HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint
|
||||
void HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m);
|
||||
void HnswAddHeapTid(HnswElement element, ItemPointer heaptid);
|
||||
void HnswInitNeighbors(HnswElement element, int m);
|
||||
bool HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel);
|
||||
void HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting);
|
||||
bool HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building);
|
||||
void HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building);
|
||||
void HnswLoadElementFromTuple(HnswElement element, HnswElementTuple etup, bool loadHeaptids, bool loadVec);
|
||||
void HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec);
|
||||
void HnswSetElementTuple(HnswElementTuple etup, HnswElement element);
|
||||
|
||||
@@ -95,7 +95,7 @@ CreateMetaPage(HnswBuildState * buildstate)
|
||||
* Add a new page
|
||||
*/
|
||||
static void
|
||||
HnswBuildAppendPage(Relation index, Buffer *buf, Page *page, GenericXLogState **state, ForkNumber forkNum)
|
||||
HnswBuildAppendPage(Relation index, Buffer *buf, Page *page, ForkNumber forkNum)
|
||||
{
|
||||
/* Add a new page */
|
||||
Buffer newbuf = HnswNewBuffer(index, forkNum);
|
||||
@@ -104,7 +104,6 @@ HnswBuildAppendPage(Relation index, Buffer *buf, Page *page, GenericXLogState **
|
||||
HnswPageGetOpaque(*page)->nextblkno = BufferGetBlockNumber(newbuf);
|
||||
|
||||
/* Commit */
|
||||
GenericXLogFinish(*state);
|
||||
UnlockReleaseBuffer(*buf);
|
||||
|
||||
/* Can take a while, so ensure we can interrupt */
|
||||
@@ -115,8 +114,7 @@ HnswBuildAppendPage(Relation index, Buffer *buf, Page *page, GenericXLogState **
|
||||
|
||||
/* Prepare new page */
|
||||
*buf = newbuf;
|
||||
*state = GenericXLogStart(index);
|
||||
*page = GenericXLogRegisterBuffer(*state, *buf, GENERIC_XLOG_FULL_IMAGE);
|
||||
*page = BufferGetPage(*buf);
|
||||
HnswInitPage(*buf, *page);
|
||||
}
|
||||
|
||||
@@ -135,7 +133,6 @@ CreateElementPages(HnswBuildState * buildstate)
|
||||
BlockNumber insertPage;
|
||||
Buffer buf;
|
||||
Page page;
|
||||
GenericXLogState *state;
|
||||
ListCell *lc;
|
||||
|
||||
/* Calculate sizes */
|
||||
@@ -148,8 +145,7 @@ CreateElementPages(HnswBuildState * buildstate)
|
||||
|
||||
/* Prepare first page */
|
||||
buf = HnswNewBuffer(index, forkNum);
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, GENERIC_XLOG_FULL_IMAGE);
|
||||
page = BufferGetPage(buf);
|
||||
HnswInitPage(buf, page);
|
||||
|
||||
foreach(lc, buildstate->elements)
|
||||
@@ -175,7 +171,7 @@ CreateElementPages(HnswBuildState * buildstate)
|
||||
|
||||
/* Keep element and neighbors on the same page if possible */
|
||||
if (PageGetFreeSpace(page) < etupSize || (combinedSize <= maxSize && PageGetFreeSpace(page) < combinedSize))
|
||||
HnswBuildAppendPage(index, &buf, &page, &state, forkNum);
|
||||
HnswBuildAppendPage(index, &buf, &page, forkNum);
|
||||
|
||||
/* Calculate offsets */
|
||||
element->blkno = BufferGetBlockNumber(buf);
|
||||
@@ -199,7 +195,7 @@ CreateElementPages(HnswBuildState * buildstate)
|
||||
|
||||
/* Add new page if needed */
|
||||
if (PageGetFreeSpace(page) < ntupSize)
|
||||
HnswBuildAppendPage(index, &buf, &page, &state, forkNum);
|
||||
HnswBuildAppendPage(index, &buf, &page, forkNum);
|
||||
|
||||
/* Add placeholder for neighbors */
|
||||
if (PageAddItem(page, (Item) ntup, ntupSize, InvalidOffsetNumber, false, false) != element->neighborOffno)
|
||||
@@ -209,7 +205,6 @@ CreateElementPages(HnswBuildState * buildstate)
|
||||
insertPage = BufferGetBlockNumber(buf);
|
||||
|
||||
/* Commit */
|
||||
GenericXLogFinish(state);
|
||||
UnlockReleaseBuffer(buf);
|
||||
|
||||
HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, buildstate->entryPoint, insertPage, forkNum);
|
||||
@@ -238,7 +233,6 @@ CreateNeighborPages(HnswBuildState * buildstate)
|
||||
HnswElement e = lfirst(lc);
|
||||
Buffer buf;
|
||||
Page page;
|
||||
GenericXLogState *state;
|
||||
Size ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(e->level, m);
|
||||
|
||||
/* Can take a while, so ensure we can interrupt */
|
||||
@@ -247,8 +241,7 @@ CreateNeighborPages(HnswBuildState * buildstate)
|
||||
|
||||
buf = ReadBufferExtended(index, forkNum, e->neighborPage, RBM_NORMAL, NULL);
|
||||
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
page = BufferGetPage(buf);
|
||||
|
||||
HnswSetNeighborTuple(ntup, e, m);
|
||||
|
||||
@@ -256,7 +249,6 @@ CreateNeighborPages(HnswBuildState * buildstate)
|
||||
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
|
||||
|
||||
/* Commit */
|
||||
GenericXLogFinish(state);
|
||||
UnlockReleaseBuffer(buf);
|
||||
}
|
||||
|
||||
@@ -398,7 +390,7 @@ BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
|
||||
|
||||
oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
|
||||
|
||||
if (HnswInsertTuple(buildstate->index, values, isnull, tid, buildstate->heap))
|
||||
if (HnswInsertTuple(buildstate->index, values, isnull, tid, buildstate->heap, true))
|
||||
{
|
||||
if (buildstate->hnswshared)
|
||||
{
|
||||
@@ -895,6 +887,9 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
||||
if (!buildstate->flushed)
|
||||
FlushPages(buildstate);
|
||||
|
||||
if (RelationNeedsWAL(index))
|
||||
log_newpage_range(index, MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index), true);
|
||||
|
||||
FreeBuildState(buildstate);
|
||||
}
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ HnswFreeOffset(Relation index, Buffer buf, Page page, HnswElement element, Size
|
||||
* Add a new page
|
||||
*/
|
||||
static void
|
||||
HnswInsertAppendPage(Relation index, Buffer *nbuf, Page *npage, GenericXLogState *state, Page page)
|
||||
HnswInsertAppendPage(Relation index, Buffer *nbuf, Page *npage, GenericXLogState *state, Page page, bool building)
|
||||
{
|
||||
/* Add a new page */
|
||||
LockRelationForExtension(index, ExclusiveLock);
|
||||
@@ -100,7 +100,11 @@ HnswInsertAppendPage(Relation index, Buffer *nbuf, Page *npage, GenericXLogState
|
||||
UnlockRelationForExtension(index, ExclusiveLock);
|
||||
|
||||
/* Init new page */
|
||||
*npage = GenericXLogRegisterBuffer(state, *nbuf, GENERIC_XLOG_FULL_IMAGE);
|
||||
if (building)
|
||||
*npage = BufferGetPage(*nbuf);
|
||||
else
|
||||
*npage = GenericXLogRegisterBuffer(state, *nbuf, GENERIC_XLOG_FULL_IMAGE);
|
||||
|
||||
HnswInitPage(*nbuf, *npage);
|
||||
|
||||
/* Update previous buffer */
|
||||
@@ -111,7 +115,7 @@ HnswInsertAppendPage(Relation index, Buffer *nbuf, Page *npage, GenericXLogState
|
||||
* Add to element and neighbor pages
|
||||
*/
|
||||
static void
|
||||
WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPage, BlockNumber *updatedInsertPage)
|
||||
WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPage, BlockNumber *updatedInsertPage, bool building)
|
||||
{
|
||||
Buffer buf;
|
||||
Page page;
|
||||
@@ -151,8 +155,13 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag
|
||||
buf = ReadBuffer(index, currentPage);
|
||||
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
||||
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
if (building)
|
||||
page = BufferGetPage(buf);
|
||||
else
|
||||
{
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
}
|
||||
|
||||
/* Keep track of first page where element at level 0 can fit */
|
||||
if (!BlockNumberIsValid(newInsertPage) && PageGetFreeSpace(page) >= minCombinedSize)
|
||||
@@ -172,7 +181,12 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag
|
||||
if (HnswFreeOffset(index, buf, page, e, ntupSize, &nbuf, &npage, &freeOffno, &freeNeighborOffno, &newInsertPage))
|
||||
{
|
||||
if (nbuf != buf)
|
||||
npage = GenericXLogRegisterBuffer(state, nbuf, 0);
|
||||
{
|
||||
if (building)
|
||||
npage = BufferGetPage(nbuf);
|
||||
else
|
||||
npage = GenericXLogRegisterBuffer(state, nbuf, 0);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
@@ -181,7 +195,7 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag
|
||||
/* Skip if both tuples can fit on the same page */
|
||||
if (combinedSize > maxSize && PageGetFreeSpace(page) >= etupSize && !BlockNumberIsValid(HnswPageGetOpaque(page)->nextblkno))
|
||||
{
|
||||
HnswInsertAppendPage(index, &nbuf, &npage, state, page);
|
||||
HnswInsertAppendPage(index, &nbuf, &npage, state, page, building);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -190,7 +204,8 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag
|
||||
if (BlockNumberIsValid(currentPage))
|
||||
{
|
||||
/* Move to next page */
|
||||
GenericXLogAbort(state);
|
||||
if (!building)
|
||||
GenericXLogAbort(state);
|
||||
UnlockReleaseBuffer(buf);
|
||||
}
|
||||
else
|
||||
@@ -198,22 +213,28 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag
|
||||
Buffer newbuf;
|
||||
Page newpage;
|
||||
|
||||
HnswInsertAppendPage(index, &newbuf, &newpage, state, page);
|
||||
HnswInsertAppendPage(index, &newbuf, &newpage, state, page, building);
|
||||
|
||||
/* Commit */
|
||||
GenericXLogFinish(state);
|
||||
if (!building)
|
||||
GenericXLogFinish(state);
|
||||
|
||||
/* Unlock previous buffer */
|
||||
UnlockReleaseBuffer(buf);
|
||||
|
||||
/* Prepare new buffer */
|
||||
state = GenericXLogStart(index);
|
||||
buf = newbuf;
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
if (building)
|
||||
page = BufferGetPage(buf);
|
||||
else
|
||||
{
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
}
|
||||
|
||||
/* Create new page for neighbors if needed */
|
||||
if (PageGetFreeSpace(page) < combinedSize)
|
||||
HnswInsertAppendPage(index, &nbuf, &npage, state, page);
|
||||
HnswInsertAppendPage(index, &nbuf, &npage, state, page, building);
|
||||
else
|
||||
{
|
||||
nbuf = buf;
|
||||
@@ -267,7 +288,8 @@ WriteNewElementPages(Relation index, HnswElement e, int m, BlockNumber insertPag
|
||||
}
|
||||
|
||||
/* Commit */
|
||||
GenericXLogFinish(state);
|
||||
if (!building)
|
||||
GenericXLogFinish(state);
|
||||
UnlockReleaseBuffer(buf);
|
||||
if (nbuf != buf)
|
||||
UnlockReleaseBuffer(nbuf);
|
||||
@@ -301,7 +323,7 @@ ConnectionExists(HnswElement e, HnswNeighborTuple ntup, int startIdx, int lm)
|
||||
* Update neighbors
|
||||
*/
|
||||
void
|
||||
HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting)
|
||||
HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building)
|
||||
{
|
||||
for (int lc = e->level; lc >= 0; lc--)
|
||||
{
|
||||
@@ -342,8 +364,13 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE
|
||||
/* Register page */
|
||||
buf = ReadBuffer(index, hc->element->neighborPage);
|
||||
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
if (building)
|
||||
page = BufferGetPage(buf);
|
||||
else
|
||||
{
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
}
|
||||
|
||||
/* Get tuple */
|
||||
itemid = PageGetItemId(page, offno);
|
||||
@@ -385,9 +412,10 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE
|
||||
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
|
||||
|
||||
/* Commit */
|
||||
GenericXLogFinish(state);
|
||||
if (!building)
|
||||
GenericXLogFinish(state);
|
||||
}
|
||||
else
|
||||
else if (!building)
|
||||
GenericXLogAbort(state);
|
||||
|
||||
UnlockReleaseBuffer(buf);
|
||||
@@ -399,7 +427,7 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE
|
||||
* Add a heap TID to an existing element
|
||||
*/
|
||||
static bool
|
||||
HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup)
|
||||
HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup, bool building)
|
||||
{
|
||||
Buffer buf;
|
||||
Page page;
|
||||
@@ -412,8 +440,13 @@ HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup)
|
||||
/* Read page */
|
||||
buf = ReadBuffer(index, dup->blkno);
|
||||
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
if (building)
|
||||
page = BufferGetPage(buf);
|
||||
else
|
||||
{
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
}
|
||||
|
||||
/* Find space */
|
||||
itemid = PageGetItemId(page, dup->offno);
|
||||
@@ -428,7 +461,8 @@ HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup)
|
||||
/* Either being deleted or we lost our chance to another backend */
|
||||
if (i == 0 || i == HNSW_HEAPTIDS)
|
||||
{
|
||||
GenericXLogAbort(state);
|
||||
if (!building)
|
||||
GenericXLogAbort(state);
|
||||
UnlockReleaseBuffer(buf);
|
||||
return false;
|
||||
}
|
||||
@@ -441,7 +475,8 @@ HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup)
|
||||
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
|
||||
|
||||
/* Commit */
|
||||
GenericXLogFinish(state);
|
||||
if (!building)
|
||||
GenericXLogFinish(state);
|
||||
UnlockReleaseBuffer(buf);
|
||||
|
||||
return true;
|
||||
@@ -451,26 +486,26 @@ HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup)
|
||||
* Write changes to disk
|
||||
*/
|
||||
static void
|
||||
WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement dup, HnswElement entryPoint)
|
||||
WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement element, int m, int efConstruction, HnswElement dup, HnswElement entryPoint, bool building)
|
||||
{
|
||||
BlockNumber newInsertPage = InvalidBlockNumber;
|
||||
|
||||
/* Try to add to existing page */
|
||||
if (dup != NULL)
|
||||
{
|
||||
if (HnswAddDuplicate(index, element, dup))
|
||||
if (HnswAddDuplicate(index, element, dup, building))
|
||||
return;
|
||||
}
|
||||
|
||||
/* Write element and neighbor tuples */
|
||||
WriteNewElementPages(index, element, m, GetInsertPage(index), &newInsertPage);
|
||||
WriteNewElementPages(index, element, m, GetInsertPage(index), &newInsertPage, building);
|
||||
|
||||
/* Update insert page if needed */
|
||||
if (BlockNumberIsValid(newInsertPage))
|
||||
HnswUpdateMetaPage(index, 0, NULL, newInsertPage, MAIN_FORKNUM);
|
||||
|
||||
/* Update neighbors */
|
||||
HnswUpdateNeighborPages(index, procinfo, collation, element, m, false);
|
||||
HnswUpdateNeighborPages(index, procinfo, collation, element, m, false, building);
|
||||
|
||||
/* Update metapage if needed */
|
||||
if (entryPoint == NULL || element->level > entryPoint->level)
|
||||
@@ -481,7 +516,7 @@ WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement elem
|
||||
* Insert a tuple into the index
|
||||
*/
|
||||
bool
|
||||
HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel)
|
||||
HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel, bool building)
|
||||
{
|
||||
Datum value;
|
||||
FmgrInfo *normprocinfo;
|
||||
@@ -540,7 +575,7 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti
|
||||
dup = HnswFindDuplicate(element);
|
||||
|
||||
/* Write to disk */
|
||||
WriteElement(index, procinfo, collation, element, m, efConstruction, dup, entryPoint);
|
||||
WriteElement(index, procinfo, collation, element, m, efConstruction, dup, entryPoint, building);
|
||||
|
||||
/* Release lock */
|
||||
UnlockPage(index, HNSW_UPDATE_LOCK, lockmode);
|
||||
@@ -574,7 +609,7 @@ hnswinsert(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid,
|
||||
oldCtx = MemoryContextSwitchTo(insertCtx);
|
||||
|
||||
/* Insert tuple */
|
||||
HnswInsertTuple(index, values, isnull, heap_tid, heap);
|
||||
HnswInsertTuple(index, values, isnull, heap_tid, heap, false);
|
||||
|
||||
/* Delete memory context */
|
||||
MemoryContextSwitchTo(oldCtx);
|
||||
|
||||
@@ -230,7 +230,7 @@ RepairGraphElement(HnswVacuumState * vacuumstate, HnswElement element, HnswEleme
|
||||
UnlockReleaseBuffer(buf);
|
||||
|
||||
/* Update neighbors */
|
||||
HnswUpdateNeighborPages(index, procinfo, collation, element, m, true);
|
||||
HnswUpdateNeighborPages(index, procinfo, collation, element, m, true, false);
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user