mirror of
https://github.com/pgvector/pgvector.git
synced 2026-06-06 05:51:21 +08:00
Added support for async I/O [skip ci]
This commit is contained in:
11
src/hnsw.h
11
src/hnsw.h
@@ -372,6 +372,13 @@ typedef union
|
|||||||
ItemPointerData indextid;
|
ItemPointerData indextid;
|
||||||
} HnswUnvisited;
|
} HnswUnvisited;
|
||||||
|
|
||||||
|
typedef struct HnswReadStreamData
|
||||||
|
{
|
||||||
|
HnswUnvisited *unvisited;
|
||||||
|
int unvisitedLength;
|
||||||
|
int visited;
|
||||||
|
} HnswReadStreamData;
|
||||||
|
|
||||||
typedef struct HnswScanOpaqueData
|
typedef struct HnswScanOpaqueData
|
||||||
{
|
{
|
||||||
const HnswTypeInfo *typeInfo;
|
const HnswTypeInfo *typeInfo;
|
||||||
@@ -427,13 +434,13 @@ bool HnswCheckNorm(HnswSupport * support, Datum value);
|
|||||||
Buffer HnswNewBuffer(Relation index, ForkNumber forkNum);
|
Buffer HnswNewBuffer(Relation index, ForkNumber forkNum);
|
||||||
void HnswInitPage(Buffer buf, Page page);
|
void HnswInitPage(Buffer buf, Page page);
|
||||||
void HnswInit(void);
|
void HnswInit(void);
|
||||||
List *HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation index, HnswSupport * support, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples);
|
List *HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation index, HnswSupport * support, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples, bool maintenance);
|
||||||
HnswElement HnswGetEntryPoint(Relation index);
|
HnswElement HnswGetEntryPoint(Relation index);
|
||||||
void HnswGetMetaPageInfo(Relation index, int *m, HnswElement * entryPoint);
|
void HnswGetMetaPageInfo(Relation index, int *m, HnswElement * entryPoint);
|
||||||
void *HnswAlloc(HnswAllocator * allocator, Size size);
|
void *HnswAlloc(HnswAllocator * allocator, Size size);
|
||||||
HnswElement HnswInitElement(char *base, ItemPointer tid, int m, double ml, int maxLevel, HnswAllocator * alloc);
|
HnswElement HnswInitElement(char *base, ItemPointer tid, int m, double ml, int maxLevel, HnswAllocator * alloc);
|
||||||
HnswElement HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno);
|
HnswElement HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno);
|
||||||
void HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, HnswSupport * support, int m, int efConstruction, bool existing);
|
void HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, HnswSupport * support, int m, int efConstruction, bool existing, bool maintenance);
|
||||||
HnswSearchCandidate *HnswEntryCandidate(char *base, HnswElement entryPoint, HnswQuery * q, Relation index, HnswSupport * support, bool loadVec);
|
HnswSearchCandidate *HnswEntryCandidate(char *base, HnswElement entryPoint, HnswQuery * q, Relation index, HnswSupport * support, bool loadVec);
|
||||||
void HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum, bool building);
|
void HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum, bool building);
|
||||||
void HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m);
|
void HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m);
|
||||||
|
|||||||
@@ -470,7 +470,7 @@ InsertTupleInMemory(HnswBuildState * buildstate, HnswElement element)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Find neighbors for element */
|
/* Find neighbors for element */
|
||||||
HnswFindElementNeighbors(base, element, entryPoint, NULL, support, m, efConstruction, false);
|
HnswFindElementNeighbors(base, element, entryPoint, NULL, support, m, efConstruction, false, true);
|
||||||
|
|
||||||
/* Update graph in memory */
|
/* Update graph in memory */
|
||||||
UpdateGraphInMemory(support, element, m, entryPoint, buildstate);
|
UpdateGraphInMemory(support, element, m, entryPoint, buildstate);
|
||||||
|
|||||||
@@ -731,7 +731,7 @@ HnswInsertTupleOnDisk(Relation index, HnswSupport * support, Datum value, ItemPo
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Find neighbors for element */
|
/* Find neighbors for element */
|
||||||
HnswFindElementNeighbors(base, element, entryPoint, index, support, m, efConstruction, false);
|
HnswFindElementNeighbors(base, element, entryPoint, index, support, m, efConstruction, false, building);
|
||||||
|
|
||||||
/* Update graph on disk */
|
/* Update graph on disk */
|
||||||
UpdateGraphOnDisk(index, support, element, m, entryPoint, building);
|
UpdateGraphOnDisk(index, support, element, m, entryPoint, building);
|
||||||
|
|||||||
@@ -48,11 +48,11 @@ GetScanItems(IndexScanDesc scan, Datum value)
|
|||||||
|
|
||||||
for (int lc = entryPoint->level; lc >= 1; lc--)
|
for (int lc = entryPoint->level; lc >= 1; lc--)
|
||||||
{
|
{
|
||||||
w = HnswSearchLayer(base, q, ep, 1, lc, index, support, m, false, NULL, NULL, NULL, true, NULL);
|
w = HnswSearchLayer(base, q, ep, 1, lc, index, support, m, false, NULL, NULL, NULL, true, NULL, false);
|
||||||
ep = w;
|
ep = w;
|
||||||
}
|
}
|
||||||
|
|
||||||
return HnswSearchLayer(base, q, ep, hnsw_ef_search, 0, index, support, m, false, NULL, &so->v, hnsw_iterative_scan != HNSW_ITERATIVE_SCAN_OFF ? &so->discarded : NULL, true, &so->tuples);
|
return HnswSearchLayer(base, q, ep, hnsw_ef_search, 0, index, support, m, false, NULL, &so->v, hnsw_iterative_scan != HNSW_ITERATIVE_SCAN_OFF ? &so->discarded : NULL, true, &so->tuples, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -83,7 +83,7 @@ ResumeScanItems(IndexScanDesc scan)
|
|||||||
ep = lappend(ep, sc);
|
ep = lappend(ep, sc);
|
||||||
}
|
}
|
||||||
|
|
||||||
return HnswSearchLayer(base, &so->q, ep, batch_size, 0, index, &so->support, so->m, false, NULL, &so->v, &so->discarded, false, &so->tuples);
|
return HnswSearchLayer(base, &so->q, ep, batch_size, 0, index, &so->support, so->m, false, NULL, &so->v, &so->discarded, false, &so->tuples, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|||||||
104
src/hnswutils.c
104
src/hnswutils.c
@@ -21,6 +21,10 @@
|
|||||||
#include "varatt.h"
|
#include "varatt.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 190000
|
||||||
|
#include "storage/read_stream.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
#if PG_VERSION_NUM < 170000
|
#if PG_VERSION_NUM < 170000
|
||||||
static inline uint64
|
static inline uint64
|
||||||
murmurhash64(uint64 data)
|
murmurhash64(uint64 data)
|
||||||
@@ -531,14 +535,12 @@ HnswGetDistance(Datum a, Datum b, HnswSupport * support)
|
|||||||
* Load an element and optionally get its distance from q
|
* Load an element and optionally get its distance from q
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
HnswLoadElementImpl(BlockNumber blkno, OffsetNumber offno, double *distance, HnswQuery * q, Relation index, HnswSupport * support, bool loadVec, double *maxDistance, HnswElement * element)
|
HnswLoadElementImpl(Buffer buf, OffsetNumber offno, double *distance, HnswQuery * q, Relation index, HnswSupport * support, bool loadVec, double *maxDistance, HnswElement * element)
|
||||||
{
|
{
|
||||||
Buffer buf;
|
|
||||||
Page page;
|
Page page;
|
||||||
HnswElementTuple etup;
|
HnswElementTuple etup;
|
||||||
|
|
||||||
/* Read vector */
|
/* Read vector */
|
||||||
buf = ReadBuffer(index, blkno);
|
|
||||||
LockBuffer(buf, BUFFER_LOCK_SHARE);
|
LockBuffer(buf, BUFFER_LOCK_SHARE);
|
||||||
page = BufferGetPage(buf);
|
page = BufferGetPage(buf);
|
||||||
|
|
||||||
@@ -559,7 +561,7 @@ HnswLoadElementImpl(BlockNumber blkno, OffsetNumber offno, double *distance, Hns
|
|||||||
if (distance == NULL || maxDistance == NULL || *distance < *maxDistance)
|
if (distance == NULL || maxDistance == NULL || *distance < *maxDistance)
|
||||||
{
|
{
|
||||||
if (*element == NULL)
|
if (*element == NULL)
|
||||||
*element = HnswInitElementFromBlock(blkno, offno);
|
*element = HnswInitElementFromBlock(BufferGetBlockNumber(buf), offno);
|
||||||
|
|
||||||
HnswLoadElementFromTuple(*element, etup, true, loadVec);
|
HnswLoadElementFromTuple(*element, etup, true, loadVec);
|
||||||
}
|
}
|
||||||
@@ -573,7 +575,9 @@ HnswLoadElementImpl(BlockNumber blkno, OffsetNumber offno, double *distance, Hns
|
|||||||
void
|
void
|
||||||
HnswLoadElement(HnswElement element, double *distance, HnswQuery * q, Relation index, HnswSupport * support, bool loadVec, double *maxDistance)
|
HnswLoadElement(HnswElement element, double *distance, HnswQuery * q, Relation index, HnswSupport * support, bool loadVec, double *maxDistance)
|
||||||
{
|
{
|
||||||
HnswLoadElementImpl(element->blkno, element->offno, distance, q, index, support, loadVec, maxDistance, &element);
|
Buffer buf = ReadBuffer(index, element->blkno);
|
||||||
|
|
||||||
|
HnswLoadElementImpl(buf, element->offno, distance, q, index, support, loadVec, maxDistance, &element);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -813,11 +817,31 @@ HnswLoadUnvisitedFromDisk(HnswElement element, HnswUnvisited * unvisited, int *u
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 190000
|
||||||
|
/*
|
||||||
|
* Get next block number for read stream
|
||||||
|
*/
|
||||||
|
static BlockNumber
|
||||||
|
HnswReadStreamNextBlock(ReadStream *stream, void *callback_private_data, void *per_buffer_data)
|
||||||
|
{
|
||||||
|
HnswReadStreamData *streamData = callback_private_data;
|
||||||
|
OffsetNumber *offno = per_buffer_data;
|
||||||
|
HnswUnvisited *uv;
|
||||||
|
|
||||||
|
if (streamData->visited == streamData->unvisitedLength)
|
||||||
|
return InvalidBlockNumber;
|
||||||
|
|
||||||
|
uv = &streamData->unvisited[streamData->visited++];
|
||||||
|
*offno = ItemPointerGetOffsetNumber(&uv->indextid);
|
||||||
|
return ItemPointerGetBlockNumber(&uv->indextid);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Algorithm 2 from paper
|
* Algorithm 2 from paper
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation index, HnswSupport * support, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples)
|
HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation index, HnswSupport * support, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples, bool maintenance)
|
||||||
{
|
{
|
||||||
List *w = NIL;
|
List *w = NIL;
|
||||||
pairingheap *C = pairingheap_allocate(CompareNearestCandidates, NULL);
|
pairingheap *C = pairingheap_allocate(CompareNearestCandidates, NULL);
|
||||||
@@ -832,6 +856,21 @@ HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation in
|
|||||||
int unvisitedLength;
|
int unvisitedLength;
|
||||||
bool inMemory = index == NULL;
|
bool inMemory = index == NULL;
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 190000
|
||||||
|
HnswReadStreamData streamData;
|
||||||
|
ReadStream *stream = NULL;
|
||||||
|
|
||||||
|
if (!inMemory)
|
||||||
|
{
|
||||||
|
int flags = READ_STREAM_USE_BATCHING;
|
||||||
|
|
||||||
|
if (maintenance)
|
||||||
|
flags |= READ_STREAM_MAINTENANCE;
|
||||||
|
|
||||||
|
stream = read_stream_begin_relation(flags, NULL, index, MAIN_FORKNUM, HnswReadStreamNextBlock, &streamData, sizeof(OffsetNumber));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (v == NULL)
|
if (v == NULL)
|
||||||
{
|
{
|
||||||
v = &vh;
|
v = &vh;
|
||||||
@@ -894,13 +933,23 @@ HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation in
|
|||||||
if (inMemory)
|
if (inMemory)
|
||||||
HnswLoadUnvisitedFromMemory(base, cElement, unvisited, &unvisitedLength, v, lc, localNeighborhood, neighborhoodSize);
|
HnswLoadUnvisitedFromMemory(base, cElement, unvisited, &unvisitedLength, v, lc, localNeighborhood, neighborhoodSize);
|
||||||
else
|
else
|
||||||
|
{
|
||||||
HnswLoadUnvisitedFromDisk(cElement, unvisited, &unvisitedLength, v, index, m, lm, lc);
|
HnswLoadUnvisitedFromDisk(cElement, unvisited, &unvisitedLength, v, index, m, lm, lc);
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 190000
|
||||||
|
read_stream_resume(stream);
|
||||||
|
|
||||||
|
streamData.unvisited = unvisited;
|
||||||
|
streamData.unvisitedLength = unvisitedLength;
|
||||||
|
streamData.visited = 0;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
/* OK to count elements instead of tuples */
|
/* OK to count elements instead of tuples */
|
||||||
if (tuples != NULL)
|
if (tuples != NULL)
|
||||||
(*tuples) += unvisitedLength;
|
(*tuples) += unvisitedLength;
|
||||||
|
|
||||||
for (int i = 0; i < unvisitedLength; i++)
|
for (int i = 0;; i++)
|
||||||
{
|
{
|
||||||
HnswElement eElement;
|
HnswElement eElement;
|
||||||
HnswSearchCandidate *e;
|
HnswSearchCandidate *e;
|
||||||
@@ -911,18 +960,40 @@ HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation in
|
|||||||
|
|
||||||
if (inMemory)
|
if (inMemory)
|
||||||
{
|
{
|
||||||
|
if (i == unvisitedLength)
|
||||||
|
break;
|
||||||
|
|
||||||
eElement = unvisited[i].element;
|
eElement = unvisited[i].element;
|
||||||
eDistance = GetElementDistance(base, eElement, q, support);
|
eDistance = GetElementDistance(base, eElement, q, support);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ItemPointer indextid = &unvisited[i].indextid;
|
Buffer buf;
|
||||||
BlockNumber blkno = ItemPointerGetBlockNumber(indextid);
|
OffsetNumber offno;
|
||||||
OffsetNumber offno = ItemPointerGetOffsetNumber(indextid);
|
|
||||||
|
#if PG_VERSION_NUM >= 190000
|
||||||
|
void *offnoPtr;
|
||||||
|
|
||||||
|
buf = read_stream_next_buffer(stream, &offnoPtr);
|
||||||
|
|
||||||
|
if (!BufferIsValid(buf))
|
||||||
|
break;
|
||||||
|
|
||||||
|
offno = *((OffsetNumber *) offnoPtr);
|
||||||
|
#else
|
||||||
|
ItemPointer indextid;
|
||||||
|
|
||||||
|
if (i == unvisitedLength)
|
||||||
|
break;
|
||||||
|
|
||||||
|
indextid = &unvisited[i].indextid;
|
||||||
|
buf = ReadBuffer(index, ItemPointerGetBlockNumber(indextid));
|
||||||
|
offno = ItemPointerGetOffsetNumber(indextid);
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Avoid any allocations if not adding */
|
/* Avoid any allocations if not adding */
|
||||||
eElement = NULL;
|
eElement = NULL;
|
||||||
HnswLoadElementImpl(blkno, offno, &eDistance, q, index, support, inserting, alwaysAdd || discarded != NULL ? NULL : &f->distance, &eElement);
|
HnswLoadElementImpl(buf, offno, &eDistance, q, index, support, inserting, alwaysAdd || discarded != NULL ? NULL : &f->distance, &eElement);
|
||||||
|
|
||||||
if (eElement == NULL)
|
if (eElement == NULL)
|
||||||
continue;
|
continue;
|
||||||
@@ -978,6 +1049,11 @@ HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation in
|
|||||||
w = lappend(w, sc);
|
w = lappend(w, sc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 190000
|
||||||
|
if (!inMemory)
|
||||||
|
read_stream_end(stream);
|
||||||
|
#endif
|
||||||
|
|
||||||
return w;
|
return w;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1273,7 +1349,7 @@ PrecomputeHash(char *base, HnswElement element)
|
|||||||
* Algorithm 1 from paper
|
* Algorithm 1 from paper
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, HnswSupport * support, int m, int efConstruction, bool existing)
|
HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, HnswSupport * support, int m, int efConstruction, bool existing, bool maintenance)
|
||||||
{
|
{
|
||||||
List *ep;
|
List *ep;
|
||||||
List *w;
|
List *w;
|
||||||
@@ -1300,7 +1376,7 @@ HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint
|
|||||||
/* 1st phase: greedy search to insert level */
|
/* 1st phase: greedy search to insert level */
|
||||||
for (int lc = entryLevel; lc >= level + 1; lc--)
|
for (int lc = entryLevel; lc >= level + 1; lc--)
|
||||||
{
|
{
|
||||||
w = HnswSearchLayer(base, &q, ep, 1, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL);
|
w = HnswSearchLayer(base, &q, ep, 1, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL, maintenance);
|
||||||
ep = w;
|
ep = w;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1319,7 +1395,7 @@ HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint
|
|||||||
List *lw = NIL;
|
List *lw = NIL;
|
||||||
ListCell *lc2;
|
ListCell *lc2;
|
||||||
|
|
||||||
w = HnswSearchLayer(base, &q, ep, efConstruction, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL);
|
w = HnswSearchLayer(base, &q, ep, efConstruction, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL, maintenance);
|
||||||
|
|
||||||
/* Convert search candidates to candidates */
|
/* Convert search candidates to candidates */
|
||||||
foreach(lc2, w)
|
foreach(lc2, w)
|
||||||
|
|||||||
@@ -218,7 +218,7 @@ RepairGraphElement(HnswVacuumState * vacuumstate, HnswElement element, HnswEleme
|
|||||||
element->heaptidsLength = 0;
|
element->heaptidsLength = 0;
|
||||||
|
|
||||||
/* Find neighbors for element, skipping itself */
|
/* Find neighbors for element, skipping itself */
|
||||||
HnswFindElementNeighbors(base, element, entryPoint, index, support, m, efConstruction, true);
|
HnswFindElementNeighbors(base, element, entryPoint, index, support, m, efConstruction, true, true);
|
||||||
|
|
||||||
/* Zero memory for each element */
|
/* Zero memory for each element */
|
||||||
MemSet(ntup, 0, HNSW_TUPLE_ALLOC_SIZE);
|
MemSet(ntup, 0, HNSW_TUPLE_ALLOC_SIZE);
|
||||||
|
|||||||
Reference in New Issue
Block a user