From 67c72ac8c3bf69004cd6ac33c172979a415e9798 Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Tue, 18 Nov 2025 22:44:20 -0800 Subject: [PATCH] Added support for async I/O [skip ci] --- src/hnsw.h | 11 ++++- src/hnswbuild.c | 2 +- src/hnswinsert.c | 2 +- src/hnswscan.c | 6 +-- src/hnswutils.c | 110 +++++++++++++++++++++++++++++++++++++++++------ src/hnswvacuum.c | 2 +- 6 files changed, 111 insertions(+), 22 deletions(-) diff --git a/src/hnsw.h b/src/hnsw.h index 5102bfb..3f41afd 100644 --- a/src/hnsw.h +++ b/src/hnsw.h @@ -362,6 +362,13 @@ typedef union ItemPointerData indextid; } HnswUnvisited; +typedef struct HnswReadStreamData +{ + HnswUnvisited *unvisited; + int unvisitedLength; + int visited; +} HnswReadStreamData; + typedef struct HnswScanOpaqueData { const HnswTypeInfo *typeInfo; @@ -417,13 +424,13 @@ bool HnswCheckNorm(HnswSupport * support, Datum value); Buffer HnswNewBuffer(Relation index, ForkNumber forkNum); void HnswInitPage(Buffer buf, Page page); void HnswInit(void); -List *HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation index, HnswSupport * support, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples); +List *HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation index, HnswSupport * support, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples, bool maintenance); HnswElement HnswGetEntryPoint(Relation index); void HnswGetMetaPageInfo(Relation index, int *m, HnswElement * entryPoint); void *HnswAlloc(HnswAllocator * allocator, Size size); HnswElement HnswInitElement(char *base, ItemPointer tid, int m, double ml, int maxLevel, HnswAllocator * alloc); HnswElement HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno); -void HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, HnswSupport * support, int m, int efConstruction, bool existing); +void HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, HnswSupport * support, int m, int efConstruction, bool existing, bool maintenance); HnswSearchCandidate *HnswEntryCandidate(char *base, HnswElement em, HnswQuery * q, Relation rel, HnswSupport * support, bool loadVec); void HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum, bool building); void HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m); diff --git a/src/hnswbuild.c b/src/hnswbuild.c index 03f0ef4..11f94e8 100644 --- a/src/hnswbuild.c +++ b/src/hnswbuild.c @@ -461,7 +461,7 @@ InsertTupleInMemory(HnswBuildState * buildstate, HnswElement element) } /* Find neighbors for element */ - HnswFindElementNeighbors(base, element, entryPoint, NULL, support, m, efConstruction, false); + HnswFindElementNeighbors(base, element, entryPoint, NULL, support, m, efConstruction, false, true); /* Update graph in memory */ UpdateGraphInMemory(support, element, m, entryPoint, buildstate); diff --git a/src/hnswinsert.c b/src/hnswinsert.c index a4d2885..a02df0e 100644 --- a/src/hnswinsert.c +++ b/src/hnswinsert.c @@ -729,7 +729,7 @@ HnswInsertTupleOnDisk(Relation index, HnswSupport * support, Datum value, ItemPo } /* Find neighbors for element */ - HnswFindElementNeighbors(base, element, entryPoint, index, support, m, efConstruction, false); + HnswFindElementNeighbors(base, element, entryPoint, index, support, m, efConstruction, false, building); /* Update graph on disk */ UpdateGraphOnDisk(index, support, element, m, entryPoint, building); diff --git a/src/hnswscan.c b/src/hnswscan.c index 5c526f4..d95b03e 100644 --- a/src/hnswscan.c +++ b/src/hnswscan.c @@ -37,11 +37,11 @@ GetScanItems(IndexScanDesc scan, Datum value) for (int lc = entryPoint->level; lc >= 1; lc--) { - w = HnswSearchLayer(base, q, ep, 1, lc, index, support, m, false, NULL, NULL, NULL, true, NULL); + w = HnswSearchLayer(base, q, ep, 1, lc, index, support, m, false, NULL, NULL, NULL, true, NULL, false); ep = w; } - return HnswSearchLayer(base, q, ep, hnsw_ef_search, 0, index, support, m, false, NULL, &so->v, hnsw_iterative_scan != HNSW_ITERATIVE_SCAN_OFF ? &so->discarded : NULL, true, &so->tuples); + return HnswSearchLayer(base, q, ep, hnsw_ef_search, 0, index, support, m, false, NULL, &so->v, hnsw_iterative_scan != HNSW_ITERATIVE_SCAN_OFF ? &so->discarded : NULL, true, &so->tuples, false); } /* @@ -72,7 +72,7 @@ ResumeScanItems(IndexScanDesc scan) ep = lappend(ep, sc); } - return HnswSearchLayer(base, &so->q, ep, batch_size, 0, index, &so->support, so->m, false, NULL, &so->v, &so->discarded, false, &so->tuples); + return HnswSearchLayer(base, &so->q, ep, batch_size, 0, index, &so->support, so->m, false, NULL, &so->v, &so->discarded, false, &so->tuples, false); } /* diff --git a/src/hnswutils.c b/src/hnswutils.c index 8e2a42c..2981b69 100644 --- a/src/hnswutils.c +++ b/src/hnswutils.c @@ -19,6 +19,10 @@ #include "varatt.h" #endif +#if PG_VERSION_NUM >= 170000 +#include "storage/read_stream.h" +#endif + #if PG_VERSION_NUM < 170000 static inline uint64 murmurhash64(uint64 data) @@ -529,14 +533,12 @@ HnswGetDistance(Datum a, Datum b, HnswSupport * support) * Load an element and optionally get its distance from q */ static void -HnswLoadElementImpl(BlockNumber blkno, OffsetNumber offno, double *distance, HnswQuery * q, Relation index, HnswSupport * support, bool loadVec, double *maxDistance, HnswElement * element) +HnswLoadElementImpl(Buffer buf, OffsetNumber offno, double *distance, HnswQuery * q, Relation index, HnswSupport * support, bool loadVec, double *maxDistance, HnswElement * element) { - Buffer buf; Page page; HnswElementTuple etup; /* Read vector */ - buf = ReadBuffer(index, blkno); LockBuffer(buf, BUFFER_LOCK_SHARE); page = BufferGetPage(buf); @@ -557,7 +559,7 @@ HnswLoadElementImpl(BlockNumber blkno, OffsetNumber offno, double *distance, Hns if (distance == NULL || maxDistance == NULL || *distance < *maxDistance) { if (*element == NULL) - *element = HnswInitElementFromBlock(blkno, offno); + *element = HnswInitElementFromBlock(BufferGetBlockNumber(buf), offno); HnswLoadElementFromTuple(*element, etup, true, loadVec); } @@ -571,7 +573,9 @@ HnswLoadElementImpl(BlockNumber blkno, OffsetNumber offno, double *distance, Hns void HnswLoadElement(HnswElement element, double *distance, HnswQuery * q, Relation index, HnswSupport * support, bool loadVec, double *maxDistance) { - HnswLoadElementImpl(element->blkno, element->offno, distance, q, index, support, loadVec, maxDistance, &element); + Buffer buf = ReadBuffer(index, element->blkno); + + HnswLoadElementImpl(buf, element->offno, distance, q, index, support, loadVec, maxDistance, &element); } /* @@ -811,11 +815,31 @@ HnswLoadUnvisitedFromDisk(HnswElement element, HnswUnvisited * unvisited, int *u } } +#if PG_VERSION_NUM >= 170000 +/* + * Get next block number for read stream + */ +static BlockNumber +HnswReadStreamNextBlock(ReadStream *stream, void *callback_private_data, void *per_buffer_data) +{ + HnswReadStreamData *streamData = callback_private_data; + OffsetNumber *offno = per_buffer_data; + HnswUnvisited *uv; + + if (streamData->visited == streamData->unvisitedLength) + return InvalidBlockNumber; + + uv = &streamData->unvisited[streamData->visited++]; + *offno = ItemPointerGetOffsetNumber(&uv->indextid); + return ItemPointerGetBlockNumber(&uv->indextid); +} +#endif + /* * Algorithm 2 from paper */ List * -HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation index, HnswSupport * support, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples) +HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation index, HnswSupport * support, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples, bool maintenance) { List *w = NIL; pairingheap *C = pairingheap_allocate(CompareNearestCandidates, NULL); @@ -830,6 +854,27 @@ HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation in int unvisitedLength; bool inMemory = index == NULL; +#if PG_VERSION_NUM >= 170000 + HnswReadStreamData streamData; + ReadStream *stream = NULL; + + if (!inMemory) + { + int flags = READ_STREAM_DEFAULT; + + if (maintenance) + { + flags |= READ_STREAM_MAINTENANCE; + } + +#if PG_VERSION_NUM >= 180000 + flags |= READ_STREAM_USE_BATCHING; +#endif + + stream = read_stream_begin_relation(flags, NULL, index, MAIN_FORKNUM, HnswReadStreamNextBlock, &streamData, sizeof(OffsetNumber)); + } +#endif + if (v == NULL) { v = &vh; @@ -892,13 +937,23 @@ HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation in if (inMemory) HnswLoadUnvisitedFromMemory(base, cElement, unvisited, &unvisitedLength, v, lc, localNeighborhood, neighborhoodSize); else + { HnswLoadUnvisitedFromDisk(cElement, unvisited, &unvisitedLength, v, index, m, lm, lc); +#if PG_VERSION_NUM >= 170000 + read_stream_resume(stream); + + streamData.unvisited = unvisited; + streamData.unvisitedLength = unvisitedLength; + streamData.visited = 0; +#endif + } + /* OK to count elements instead of tuples */ if (tuples != NULL) (*tuples) += unvisitedLength; - for (int i = 0; i < unvisitedLength; i++) + for (int i = 0;; i++) { HnswElement eElement; HnswSearchCandidate *e; @@ -909,18 +964,40 @@ HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation in if (inMemory) { + if (i == unvisitedLength) + break; + eElement = unvisited[i].element; eDistance = GetElementDistance(base, eElement, q, support); } else { - ItemPointer indextid = &unvisited[i].indextid; - BlockNumber blkno = ItemPointerGetBlockNumber(indextid); - OffsetNumber offno = ItemPointerGetOffsetNumber(indextid); + Buffer buf; + OffsetNumber offno; + +#if PG_VERSION_NUM >= 170000 + void *offnoPtr; + + buf = read_stream_next_buffer(stream, &offnoPtr); + + if (!BufferIsValid(buf)) + break; + + offno = *((OffsetNumber *) offnoPtr); +#else + ItemPointer indextid; + + if (i == unvisitedLength) + break; + + indextid = &unvisited[i].indextid; + buf = ReadBuffer(index, ItemPointerGetBlockNumber(indextid)); + offno = ItemPointerGetOffsetNumber(indextid); +#endif /* Avoid any allocations if not adding */ eElement = NULL; - HnswLoadElementImpl(blkno, offno, &eDistance, q, index, support, inserting, alwaysAdd || discarded != NULL ? NULL : &f->distance, &eElement); + HnswLoadElementImpl(buf, offno, &eDistance, q, index, support, inserting, alwaysAdd || discarded != NULL ? NULL : &f->distance, &eElement); if (eElement == NULL) continue; @@ -976,6 +1053,11 @@ HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation in w = lappend(w, sc); } +#if PG_VERSION_NUM >= 170000 + if (!inMemory) + read_stream_end(stream); +#endif + return w; } @@ -1271,7 +1353,7 @@ PrecomputeHash(char *base, HnswElement element) * Algorithm 1 from paper */ void -HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, HnswSupport * support, int m, int efConstruction, bool existing) +HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, HnswSupport * support, int m, int efConstruction, bool existing, bool maintenance) { List *ep; List *w; @@ -1298,7 +1380,7 @@ HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint /* 1st phase: greedy search to insert level */ for (int lc = entryLevel; lc >= level + 1; lc--) { - w = HnswSearchLayer(base, &q, ep, 1, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL); + w = HnswSearchLayer(base, &q, ep, 1, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL, maintenance); ep = w; } @@ -1317,7 +1399,7 @@ HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint List *lw = NIL; ListCell *lc2; - w = HnswSearchLayer(base, &q, ep, efConstruction, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL); + w = HnswSearchLayer(base, &q, ep, efConstruction, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL, maintenance); /* Convert search candidates to candidates */ foreach(lc2, w) diff --git a/src/hnswvacuum.c b/src/hnswvacuum.c index 3a8ee26..e00a920 100644 --- a/src/hnswvacuum.c +++ b/src/hnswvacuum.c @@ -212,7 +212,7 @@ RepairGraphElement(HnswVacuumState * vacuumstate, HnswElement element, HnswEleme element->heaptidsLength = 0; /* Find neighbors for element, skipping itself */ - HnswFindElementNeighbors(base, element, entryPoint, index, support, m, efConstruction, true); + HnswFindElementNeighbors(base, element, entryPoint, index, support, m, efConstruction, true, true); /* Zero memory for each element */ MemSet(ntup, 0, HNSW_TUPLE_ALLOC_SIZE);