Improved concurrent inserts

This commit is contained in:
Andrew Kane
2023-08-09 23:51:35 -07:00
parent d63d430af8
commit 7c0d94c99c
3 changed files with 151 additions and 162 deletions

View File

@@ -106,13 +106,6 @@ typedef struct HnswNeighborArray
HnswCandidate *items;
} HnswNeighborArray;
typedef struct HnswUpdate
{
HnswCandidate hc;
int level;
int index;
} HnswUpdate;
typedef struct HnswPairingHeapNode
{
pairingheap_node ph_node;
@@ -265,7 +258,7 @@ List *HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, Fmgr
HnswElement HnswGetEntryPoint(Relation index);
HnswElement HnswInitElement(ItemPointer tid, int m, double ml, int maxLevel);
void HnswFreeElement(HnswElement element);
HnswElement HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, List **updates, bool vacuuming);
HnswElement HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, List ***updateNeighbors, bool vacuuming);
HnswCandidate *HnswEntryCandidate(HnswElement em, Datum q, Relation rel, FmgrInfo *procinfo, Oid collation, bool loadvec);
void HnswUpdateMetaPage(Relation index, bool updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum);
void HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m);
@@ -274,6 +267,8 @@ void HnswInitNeighbors(HnswElement element, int m);
bool HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel);
void HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index, FmgrInfo *procinfo, Oid collation, bool loadVec);
void HnswSetElementTuple(HnswElementTuple etup, HnswElement element);
void HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int m, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation);
void HnswLoadNeighbors(HnswElement element, Relation index);
/* Index access methods */
IndexBuildResult *hnswbuild(Relation heap, Relation index, IndexInfo *indexInfo);

View File

@@ -269,71 +269,73 @@ WriteNewElementPages(Relation index, HnswElement e, int m)
HnswUpdateMetaPage(index, false, NULL, insertPage, MAIN_FORKNUM);
}
/*
* Calculate index for update
*/
static int
HnswGetIndex(HnswUpdate * update, int m)
{
return (update->hc.element->level - update->level) * m + update->index;
}
/*
* Update neighbors
*/
static void
UpdateNeighborPages(Relation index, HnswElement e, int m, List *updates)
UpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement e, int m, List **neighbors)
{
ListCell *lc;
/* Could update multiple at once for same element */
/* but should only happen a low percent of time, so keep simple for now */
foreach(lc, updates)
for (int lc = e->level; lc >= 0; lc--)
{
Buffer buf;
Page page;
GenericXLogState *state;
HnswUpdate *update = lfirst(lc);
ItemId itemid;
HnswNeighborTuple ntup;
Size ntupSize;
int idx;
OffsetNumber offno = update->hc.element->neighborOffno;
int lm = HnswGetLayerM(m, lc);
List *levelNeighbors = neighbors[lc];
ListCell *lc2;
/* Register page */
buf = ReadBuffer(index, update->hc.element->neighborPage);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buf, 0);
/* Get tuple */
itemid = PageGetItemId(page, offno);
ntup = (HnswNeighborTuple) PageGetItem(page, itemid);
ntupSize = ItemIdGetLength(itemid);
/* Calculate index */
idx = HnswGetIndex(update, m);
/* Make robust to issues */
if (idx < ntup->count)
foreach(lc2, levelNeighbors)
{
ItemPointer indextid = &ntup->indextids[idx];
HnswCandidate *hc = lfirst(lc2);
Buffer buf;
Page page;
GenericXLogState *state;
ItemId itemid;
HnswNeighborTuple ntup;
Size ntupSize;
int idx = -1;
OffsetNumber offno = hc->element->neighborOffno;
/* Update neighbor */
ItemPointerSet(indextid, e->blkno, e->offno);
/* Get latest neighbors */
HnswLoadNeighbors(hc->element, index);
/* Overwrite tuple */
if (!PageIndexTupleOverwrite(page, offno, (Item) ntup, ntupSize))
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
HnswUpdateConnection(e, hc, lm, lc, &idx, index, procinfo, collation);
/* Commit */
MarkBufferDirty(buf);
GenericXLogFinish(state);
if (idx == -1)
continue;
/* Register page */
buf = ReadBuffer(index, hc->element->neighborPage);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buf, 0);
/* Get tuple */
itemid = PageGetItemId(page, offno);
ntup = (HnswNeighborTuple) PageGetItem(page, itemid);
ntupSize = ItemIdGetLength(itemid);
/* Calculate index for update */
idx += (hc->element->level - lc) * m;
/* Make robust to issues */
if (idx < ntup->count)
{
ItemPointer indextid = &ntup->indextids[idx];
/* Update neighbor */
ItemPointerSet(indextid, e->blkno, e->offno);
/* Overwrite tuple */
if (!PageIndexTupleOverwrite(page, offno, (Item) ntup, ntupSize))
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
/* Commit */
MarkBufferDirty(buf);
GenericXLogFinish(state);
}
else
GenericXLogAbort(state);
UnlockReleaseBuffer(buf);
}
else
GenericXLogAbort(state);
UnlockReleaseBuffer(buf);
}
}
@@ -391,7 +393,7 @@ HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup)
* Write changes to disk
*/
static void
WriteElement(Relation index, HnswElement element, int m, List *updates, HnswElement dup, HnswElement entryPoint)
WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement element, int m, List **neighbors, HnswElement dup, HnswElement entryPoint)
{
/* Try to add to existing page */
if (dup != NULL)
@@ -402,7 +404,7 @@ WriteElement(Relation index, HnswElement element, int m, List *updates, HnswElem
/* If fails, take this path */
WriteNewElementPages(index, element, m);
UpdateNeighborPages(index, element, m, updates);
UpdateNeighborPages(index, procinfo, collation, element, m, neighbors);
/* Update metapage if needed */
if (entryPoint == NULL || element->level > entryPoint->level)
@@ -424,7 +426,7 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti
double ml = HnswGetMl(m);
FmgrInfo *procinfo = index_getprocinfo(index, 1, HNSW_DISTANCE_PROC);
Oid collation = index->rd_indcollation[0];
List *updates = NIL;
List **neighbors;
HnswElement dup;
/* Detoast once for all calls */
@@ -446,10 +448,10 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti
entryPoint = HnswGetEntryPoint(index);
/* Insert element in graph */
dup = HnswInsertElement(element, entryPoint, index, procinfo, collation, m, efConstruction, &updates, false);
dup = HnswInsertElement(element, entryPoint, index, procinfo, collation, m, efConstruction, &neighbors, false);
/* Write to disk */
WriteElement(index, element, m, updates, dup, entryPoint);
WriteElement(index, procinfo, collation, element, m, neighbors, dup, entryPoint);
return true;
}

View File

@@ -372,8 +372,8 @@ LoadNeighborsFromPage(HnswElement element, Relation index, Page page)
/*
* Load neighbors
*/
static void
LoadNeighbors(HnswElement element, Relation index)
void
HnswLoadNeighbors(HnswElement element, Relation index)
{
Buffer buf;
Page page;
@@ -571,7 +571,7 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
break;
if (c->element->neighbors == NULL)
LoadNeighbors(c->element, index);
HnswLoadNeighbors(c->element, index);
/* Get the neighborhood at layer lc */
neighborhood = &c->element->neighbors[lc];
@@ -799,105 +799,85 @@ CompareCandidateDistances(const void *a, const void *b)
return 0;
}
/*
* Create update
*/
static HnswUpdate *
CreateUpdate(HnswCandidate * hc, int level, int index)
{
HnswUpdate *update = palloc(sizeof(HnswUpdate));
update->hc = *hc;
update->level = level;
update->index = index;
return update;
}
/*
* Update connections
*/
static void
UpdateConnections(HnswElement element, List *neighbors, int m, int lc, List **updates, Relation index, FmgrInfo *procinfo, Oid collation)
void
HnswUpdateConnection(HnswElement element, HnswCandidate * hc, int m, int lc, int *updateIdx, Relation index, FmgrInfo *procinfo, Oid collation)
{
ListCell *lc2;
HnswNeighborArray *currentNeighbors = &hc->element->neighbors[lc];
foreach(lc2, neighbors)
HnswCandidate hc2;
hc2.element = element;
hc2.distance = hc->distance;
if (currentNeighbors->length < m)
{
HnswCandidate *hc = (HnswCandidate *) lfirst(lc2);
HnswNeighborArray *currentNeighbors = &hc->element->neighbors[lc];
currentNeighbors->items[currentNeighbors->length++] = hc2;
HnswCandidate hc2;
/* Track update */
if (updateIdx != NULL)
*updateIdx = currentNeighbors->length - 1;
}
else
{
/* Shrink connections */
HnswCandidate *pruned = NULL;
List *c = NIL;
hc2.element = element;
hc2.distance = hc->distance;
if (currentNeighbors->length < m)
/* Load elements on insert */
if (index != NULL)
{
currentNeighbors->items[currentNeighbors->length++] = hc2;
Datum q = PointerGetDatum(hc->element->vec);
/* Track updates */
if (updates != NULL)
*updates = lappend(*updates, CreateUpdate(hc, lc, currentNeighbors->length - 1));
}
else
{
/* Shrink connections */
HnswCandidate *pruned = NULL;
List *c = NIL;
/* Load elements on insert */
if (index != NULL)
{
Datum q = PointerGetDatum(hc->element->vec);
for (int i = 0; i < currentNeighbors->length; i++)
{
HnswCandidate *hc3 = &currentNeighbors->items[i];
if (hc3->element->vec == NULL)
HnswLoadElement(hc3->element, &hc3->distance, &q, index, procinfo, collation, true);
else
hc3->distance = GetCandidateDistance(hc3, q, procinfo, collation);
/* Prune element if being deleted */
if (list_length(hc3->element->heaptids) == 0)
{
pruned = &currentNeighbors->items[i];
break;
}
}
}
if (pruned == NULL)
{
/* Add and sort candidates */
for (int i = 0; i < currentNeighbors->length; i++)
c = lappend(c, &currentNeighbors->items[i]);
c = lappend(c, &hc2);
list_sort(c, CompareCandidateDistances);
SelectNeighbors(c, m, lc, procinfo, collation, &pruned);
/* Should not happen */
if (pruned == NULL)
continue;
}
/* Find and replace the pruned element */
for (int i = 0; i < currentNeighbors->length; i++)
{
if (currentNeighbors->items[i].element == pruned->element)
HnswCandidate *hc3 = &currentNeighbors->items[i];
if (hc3->element->vec == NULL)
HnswLoadElement(hc3->element, &hc3->distance, &q, index, procinfo, collation, true);
else
hc3->distance = GetCandidateDistance(hc3, q, procinfo, collation);
/* Prune element if being deleted */
if (list_length(hc3->element->heaptids) == 0)
{
currentNeighbors->items[i] = hc2;
/* Track updates */
if (updates != NULL)
*updates = lappend(*updates, CreateUpdate(hc, lc, i));
pruned = &currentNeighbors->items[i];
break;
}
}
}
if (pruned == NULL)
{
/* Add and sort candidates */
for (int i = 0; i < currentNeighbors->length; i++)
c = lappend(c, &currentNeighbors->items[i]);
c = lappend(c, &hc2);
list_sort(c, CompareCandidateDistances);
SelectNeighbors(c, m, lc, procinfo, collation, &pruned);
/* Should not happen */
if (pruned == NULL)
return;
}
/* Find and replace the pruned element */
for (int i = 0; i < currentNeighbors->length; i++)
{
if (currentNeighbors->items[i].element == pruned->element)
{
currentNeighbors->items[i] = hc2;
/* Track update */
if (updateIdx != NULL)
*updateIdx = i;
break;
}
}
}
}
@@ -905,13 +885,13 @@ UpdateConnections(HnswElement element, List *neighbors, int m, int lc, List **up
* Algorithm 1 from paper
*/
HnswElement
HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, List **updates, bool vacuuming)
HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, List ***updateNeighbors, bool vacuuming)
{
List *ep = NIL;
List *w;
int level = element->level;
int entryLevel;
List **ws = palloc(sizeof(List *) * (level + 1));
List **neighbors = palloc(sizeof(List *) * (level + 1));
Datum q = PointerGetDatum(element->vec);
HnswElement dup;
BlockNumber *skipPage = vacuuming ? &element->neighborPage : NULL;
@@ -940,20 +920,25 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F
ep = w;
}
if (level > entryLevel)
level = entryLevel;
while (level > entryLevel)
{
neighbors[level] = NIL;
level--;
}
/* 2nd phase */
for (int lc = level; lc >= 0; lc--)
{
int lm = HnswGetLayerM(m, lc);
w = HnswSearchLayer(q, ep, efConstruction, lc, index, procinfo, collation, true, skipPage, skipOffno);
/* Remove entry point if it's being deleted */
if (removeEntryPoint)
w = list_delete_ptr(w, entryCandidate);
/* Save w for SelectNeighbors */
ws[lc] = w;
/* Always call on inserts since duplicate update can fail */
neighbors[lc] = SelectNeighbors(w, lm, lc, procinfo, collation, NULL);
ep = w;
}
@@ -961,7 +946,7 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F
/* Look for duplicate */
if (level >= 0 && !vacuuming)
{
dup = HnswFindDuplicate(element, ws[0]);
dup = HnswFindDuplicate(element, neighbors[0]);
if (dup != NULL)
return dup;
}
@@ -970,13 +955,20 @@ HnswInsertElement(HnswElement element, HnswElement entryPoint, Relation index, F
for (int lc = level; lc >= 0; lc--)
{
int lm = HnswGetLayerM(m, lc);
List *newNeighbors = SelectNeighbors(ws[lc], lm, lc, procinfo, collation, NULL);
AddConnections(element, newNeighbors, lm, lc);
AddConnections(element, neighbors[lc], lm, lc);
if (!vacuuming)
UpdateConnections(element, newNeighbors, lm, lc, updates, index, procinfo, collation);
if (!vacuuming && updateNeighbors == NULL)
{
ListCell *lc2;
foreach(lc2, neighbors[lc])
HnswUpdateConnection(element, lfirst(lc2), lm, lc, NULL, index, procinfo, collation);
}
}
if (updateNeighbors != NULL)
*updateNeighbors = neighbors;
return NULL;
}