diff --git a/src/hnswinsert.c b/src/hnswinsert.c index c198c93..a3949b2 100644 --- a/src/hnswinsert.c +++ b/src/hnswinsert.c @@ -449,6 +449,78 @@ ConnectionExists(HnswElement e, HnswNeighborTuple ntup, int startIdx, int lm) return false; } +/* + * Update neighbor + */ +static void +UpdateNeighborOnDisk(HnswElement element, HnswElement newElement, int idx, int m, int lm, int lc, Relation index, bool checkExisting, bool building) +{ + Buffer buf; + Page page; + GenericXLogState *state; + HnswNeighborTuple ntup; + int startIdx; + OffsetNumber offno = element->neighborOffno; + + /* Register page */ + buf = ReadBuffer(index, element->neighborPage); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + if (building) + { + state = NULL; + page = BufferGetPage(buf); + } + else + { + state = GenericXLogStart(index); + page = GenericXLogRegisterBuffer(state, buf, 0); + } + + /* Get tuple */ + ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, offno)); + + /* Calculate index for update */ + startIdx = (element->level - lc) * m; + + /* Check for existing connection */ + if (checkExisting && ConnectionExists(newElement, ntup, startIdx, lm)) + idx = -1; + else if (idx == -2) + { + /* Find free offset if still exists */ + /* TODO Retry updating connections if not */ + for (int j = 0; j < lm; j++) + { + if (!ItemPointerIsValid(&ntup->indextids[startIdx + j])) + { + idx = startIdx + j; + break; + } + } + } + else + idx += startIdx; + + /* Make robust to issues */ + if (idx >= 0 && idx < ntup->count) + { + ItemPointer indextid = &ntup->indextids[idx]; + + /* Update neighbor on the buffer */ + ItemPointerSet(indextid, newElement->blkno, newElement->offno); + + /* Commit */ + if (building) + MarkBufferDirty(buf); + else + GenericXLogFinish(state); + } + else if (!building) + GenericXLogAbort(state); + + UnlockReleaseBuffer(buf); +} + /* * Update neighbors */ @@ -465,14 +537,8 @@ HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, Hns for (int i = 0; i < neighbors->length; i++) { HnswCandidate *hc = &neighbors->items[i]; - Buffer buf; - Page page; - GenericXLogState *state; - HnswNeighborTuple ntup; - int idx; - int startIdx; HnswElement neighborElement = HnswPtrAccess(base, hc->element); - OffsetNumber offno = neighborElement->neighborOffno; + int idx; idx = GetUpdateIndex(neighborElement, e, hc->distance, m, lm, lc, index, procinfo, collation); @@ -480,63 +546,7 @@ HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, Hns if (idx == -1) continue; - /* Register page */ - buf = ReadBuffer(index, neighborElement->neighborPage); - LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - if (building) - { - state = NULL; - page = BufferGetPage(buf); - } - else - { - state = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(state, buf, 0); - } - - /* Get tuple */ - ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, offno)); - - /* Calculate index for update */ - startIdx = (neighborElement->level - lc) * m; - - /* Check for existing connection */ - if (checkExisting && ConnectionExists(e, ntup, startIdx, lm)) - idx = -1; - else if (idx == -2) - { - /* Find free offset if still exists */ - /* TODO Retry updating connections if not */ - for (int j = 0; j < lm; j++) - { - if (!ItemPointerIsValid(&ntup->indextids[startIdx + j])) - { - idx = startIdx + j; - break; - } - } - } - else - idx += startIdx; - - /* Make robust to issues */ - if (idx >= 0 && idx < ntup->count) - { - ItemPointer indextid = &ntup->indextids[idx]; - - /* Update neighbor on the buffer */ - ItemPointerSet(indextid, e->blkno, e->offno); - - /* Commit */ - if (building) - MarkBufferDirty(buf); - else - GenericXLogFinish(state); - } - else if (!building) - GenericXLogAbort(state); - - UnlockReleaseBuffer(buf); + UpdateNeighborOnDisk(neighborElement, e, idx, m, lm, lc, index, checkExisting, building); } } }