Moved logic to get update neighbor on disk to separate function

This commit is contained in:
Andrew Kane
2024-09-30 10:30:01 -07:00
parent a8b4b6675a
commit ff6da4fcea

View File

@@ -449,6 +449,78 @@ ConnectionExists(HnswElement e, HnswNeighborTuple ntup, int startIdx, int lm)
return false;
}
/*
* Update neighbor
*/
static void
UpdateNeighborOnDisk(HnswElement element, HnswElement newElement, int idx, int m, int lm, int lc, Relation index, bool checkExisting, bool building)
{
Buffer buf;
Page page;
GenericXLogState *state;
HnswNeighborTuple ntup;
int startIdx;
OffsetNumber offno = element->neighborOffno;
/* Register page */
buf = ReadBuffer(index, element->neighborPage);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
if (building)
{
state = NULL;
page = BufferGetPage(buf);
}
else
{
state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buf, 0);
}
/* Get tuple */
ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, offno));
/* Calculate index for update */
startIdx = (element->level - lc) * m;
/* Check for existing connection */
if (checkExisting && ConnectionExists(newElement, ntup, startIdx, lm))
idx = -1;
else if (idx == -2)
{
/* Find free offset if still exists */
/* TODO Retry updating connections if not */
for (int j = 0; j < lm; j++)
{
if (!ItemPointerIsValid(&ntup->indextids[startIdx + j]))
{
idx = startIdx + j;
break;
}
}
}
else
idx += startIdx;
/* Make robust to issues */
if (idx >= 0 && idx < ntup->count)
{
ItemPointer indextid = &ntup->indextids[idx];
/* Update neighbor on the buffer */
ItemPointerSet(indextid, newElement->blkno, newElement->offno);
/* Commit */
if (building)
MarkBufferDirty(buf);
else
GenericXLogFinish(state);
}
else if (!building)
GenericXLogAbort(state);
UnlockReleaseBuffer(buf);
}
/*
* Update neighbors
*/
@@ -465,14 +537,8 @@ HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, Hns
for (int i = 0; i < neighbors->length; i++)
{
HnswCandidate *hc = &neighbors->items[i];
Buffer buf;
Page page;
GenericXLogState *state;
HnswNeighborTuple ntup;
int idx;
int startIdx;
HnswElement neighborElement = HnswPtrAccess(base, hc->element);
OffsetNumber offno = neighborElement->neighborOffno;
int idx;
idx = GetUpdateIndex(neighborElement, e, hc->distance, m, lm, lc, index, procinfo, collation);
@@ -480,63 +546,7 @@ HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, Hns
if (idx == -1)
continue;
/* Register page */
buf = ReadBuffer(index, neighborElement->neighborPage);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
if (building)
{
state = NULL;
page = BufferGetPage(buf);
}
else
{
state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buf, 0);
}
/* Get tuple */
ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, offno));
/* Calculate index for update */
startIdx = (neighborElement->level - lc) * m;
/* Check for existing connection */
if (checkExisting && ConnectionExists(e, ntup, startIdx, lm))
idx = -1;
else if (idx == -2)
{
/* Find free offset if still exists */
/* TODO Retry updating connections if not */
for (int j = 0; j < lm; j++)
{
if (!ItemPointerIsValid(&ntup->indextids[startIdx + j]))
{
idx = startIdx + j;
break;
}
}
}
else
idx += startIdx;
/* Make robust to issues */
if (idx >= 0 && idx < ntup->count)
{
ItemPointer indextid = &ntup->indextids[idx];
/* Update neighbor on the buffer */
ItemPointerSet(indextid, e->blkno, e->offno);
/* Commit */
if (building)
MarkBufferDirty(buf);
else
GenericXLogFinish(state);
}
else if (!building)
GenericXLogAbort(state);
UnlockReleaseBuffer(buf);
UpdateNeighborOnDisk(neighborElement, e, idx, m, lm, lc, index, checkExisting, building);
}
}
}