Use HnswLoadNeighborTids for inserts

This commit is contained in:
Andrew Kane
2024-09-29 18:52:12 -07:00
parent 5ce367e18b
commit ee43ee9b16
3 changed files with 18 additions and 48 deletions

View File

@@ -386,6 +386,7 @@ HnswSearchCandidate *HnswEntryCandidate(char *base, HnswElement em, Datum q, Rel
void HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum, bool building);
void HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m);
void HnswAddHeapTid(HnswElement element, ItemPointer heaptid);
HnswNeighborArray *HnswInitNeighborArray(int lm, HnswAllocator * allocator);
void HnswInitNeighbors(char *base, HnswElement element, int m, HnswAllocator * alloc);
bool HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, ItemPointer heap_tid, bool building);
void HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, bool checkExisting, bool building);
@@ -393,6 +394,7 @@ void HnswLoadElementFromTuple(HnswElement element, HnswElementTuple etup, bool
void HnswLoadElement(HnswElement element, double *distance, Datum *q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec, double *maxDistance);
void HnswSetElementTuple(char *base, HnswElementTuple etup, HnswElement element);
void HnswUpdateConnection(char *base, HnswElement element, HnswCandidate * hc, HnswNeighborArray * currentNeighbors, int lm, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation);
bool HnswLoadNeighborTids(HnswElement element, ItemPointerData *indextids, Relation index, int m, int lm, int lc);
void HnswInitLockTranche(void);
const HnswTypeInfo *HnswGetTypeInfo(Relation index);
PGDLLEXPORT void HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc);

View File

@@ -335,66 +335,33 @@ AddElementOnDisk(Relation index, HnswElement e, int m, BlockNumber insertPage, B
}
/*
* Load neighbors from page
* Load neighbors
*/
static void
LoadNeighborsFromPage(HnswElement element, Relation index, Page page, int m)
static HnswNeighborArray *
HnswLoadNeighbors(HnswElement element, Relation index, int m, int lm, int lc)
{
char *base = NULL;
HnswNeighborArray *neighbors = HnswInitNeighborArray(lm, NULL);
ItemPointerData indextids[HNSW_MAX_M * 2];
HnswNeighborTuple ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, element->neighborOffno));
int neighborCount = (element->level + 2) * m;
if (!HnswLoadNeighborTids(element, indextids, index, m, lm, lc))
return neighbors;
Assert(HnswIsNeighborTuple(ntup));
HnswInitNeighbors(base, element, m, NULL);
/* Ensure expected neighbors */
if (ntup->count != neighborCount)
return;
for (int i = 0; i < neighborCount; i++)
for (int i = 0; i < lm; i++)
{
ItemPointer indextid = &indextids[i];
HnswElement e;
int level;
HnswCandidate *hc;
ItemPointer indextid;
HnswNeighborArray *neighbors;
indextid = &ntup->indextids[i];
if (!ItemPointerIsValid(indextid))
continue;
break;
e = HnswInitElementFromBlock(ItemPointerGetBlockNumber(indextid), ItemPointerGetOffsetNumber(indextid));
/* Calculate level based on offset */
level = element->level - i / m;
if (level < 0)
level = 0;
neighbors = HnswGetNeighbors(base, element, level);
hc = &neighbors->items[neighbors->length++];
HnswPtrStore(base, hc->element, e);
}
}
/*
* Load neighbors
*/
static void
HnswLoadNeighbors(HnswElement element, Relation index, int m)
{
Buffer buf;
Page page;
buf = ReadBuffer(index, element->neighborPage);
LockBuffer(buf, BUFFER_LOCK_SHARE);
page = BufferGetPage(buf);
LoadNeighborsFromPage(element, index, page, m);
UnlockReleaseBuffer(buf);
return neighbors;
}
/*
@@ -441,6 +408,7 @@ HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, Hns
int startIdx;
HnswElement neighborElement = HnswPtrAccess(base, hc->element);
OffsetNumber offno = neighborElement->neighborOffno;
HnswNeighborArray *neighborNeighbors;
/*
* Get latest neighbors since they may have changed. Do not lock
@@ -448,7 +416,7 @@ HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, Hns
* optimistic locking to retry if another update occurs before
* getting exclusive lock.
*/
HnswLoadNeighbors(neighborElement, index, m);
neighborNeighbors = HnswLoadNeighbors(neighborElement, index, m, lm, lc);
/*
* Could improve performance for vacuuming by checking neighbors
@@ -458,7 +426,7 @@ HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, Hns
*/
/* Select neighbors */
HnswUpdateConnection(NULL, e, hc, HnswGetNeighbors(base, neighborElement, lc), lm, &idx, index, procinfo, collation);
HnswUpdateConnection(NULL, e, hc, neighborNeighbors, lm, &idx, index, procinfo, collation);
/* New element was not selected as a neighbor */
if (idx == -1)

View File

@@ -197,7 +197,7 @@ HnswInitPage(Buffer buf, Page page)
/*
* Allocate a neighbor array
*/
static HnswNeighborArray *
HnswNeighborArray *
HnswInitNeighborArray(int lm, HnswAllocator * allocator)
{
HnswNeighborArray *a = HnswAlloc(allocator, HNSW_NEIGHBOR_ARRAY_SIZE(lm));
@@ -682,7 +682,7 @@ HnswLoadUnvisitedFromMemory(char *base, HnswElement element, HnswUnvisited * unv
/*
* Load neighbor index TIDs
*/
static bool
bool
HnswLoadNeighborTids(HnswElement element, ItemPointerData *indextids, Relation index, int m, int lm, int lc)
{
Buffer buf;