Added version to reduce stale reads and writes and prepare for optimistic locking

This commit is contained in:
Andrew Kane
2023-08-20 17:08:20 -07:00
parent 687263ccd4
commit ef1209eaf4
4 changed files with 16 additions and 6 deletions

View File

@@ -85,6 +85,7 @@ typedef struct HnswNeighborArray HnswNeighborArray;
typedef struct HnswElementData
{
List *heaptids;
uint8 version;
uint8 level;
uint8 deleted;
HnswNeighborArray *neighbors;
@@ -185,9 +186,9 @@ typedef HnswPageOpaqueData * HnswPageOpaque;
typedef struct HnswElementTupleData
{
uint8 type;
uint8 version;
uint8 level;
uint8 deleted;
uint8 unused;
ItemPointerData heaptids[HNSW_HEAPTIDS];
ItemPointerData neighbortid;
uint16 unused2;
@@ -199,7 +200,7 @@ typedef HnswElementTupleData * HnswElementTuple;
typedef struct HnswNeighborTupleData
{
uint8 type;
uint8 unused;
uint8 version;
uint16 count;
ItemPointerData indextids[FLEXIBLE_ARRAY_MEMBER];
} HnswNeighborTupleData;

View File

@@ -359,8 +359,11 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE
/* Calculate index for update */
startIdx = (hc->element->level - lc) * m;
/* Check for existing connection */
if (checkExisting && ConnectionExists(e, ntup, startIdx, lm))
/* TODO Skip when deleting */
/* TODO Resolve issue with connections from element neighbor tuple */
if (ntup->version != hc->element->version)
idx = -1;
else if (checkExisting && ConnectionExists(e, ntup, startIdx, lm))
idx = -1;
else if (idx == -2)
{
@@ -474,7 +477,7 @@ HnswAddDuplicate(Relation index, HnswElement element, HnswElement dup)
}
/* Either being deleted or we lost our chance to another backend */
if (i == 0 || i == HNSW_HEAPTIDS)
if (i == 0 || i == HNSW_HEAPTIDS || etup->version != dup->version)
{
GenericXLogAbort(state);
UnlockReleaseBuffer(buf);

View File

@@ -160,6 +160,7 @@ HnswInitElement(ItemPointer heaptid, int m, double ml, int maxLevel)
element->heaptids = NIL;
HnswAddHeapTid(element, heaptid);
element->version = 1;
element->level = level;
element->deleted = 0;
@@ -288,6 +289,7 @@ void
HnswSetElementTuple(HnswElementTuple etup, HnswElement element)
{
etup->type = HNSW_ELEMENT_TUPLE_TYPE;
etup->version = element->version;
etup->level = element->level;
etup->deleted = 0;
for (int i = 0; i < HNSW_HEAPTIDS; i++)
@@ -309,6 +311,7 @@ HnswSetNeighborTuple(HnswNeighborTuple ntup, HnswElement e, int m)
int idx = 0;
ntup->type = HNSW_NEIGHBOR_TUPLE_TYPE;
ntup->version = e->version;
for (int lc = e->level; lc >= 0; lc--)
{
@@ -348,7 +351,7 @@ LoadNeighborsFromPage(HnswElement element, Relation index, Page page)
HnswInitNeighbors(element, m);
/* Ensure expected neighbors */
if (ntup->count != neighborCount)
if (ntup->version != element->version || ntup->count != neighborCount)
return;
for (int i = 0; i < neighborCount; i++)
@@ -401,6 +404,7 @@ HnswLoadNeighbors(HnswElement element, Relation index)
void
HnswLoadElementFromTuple(HnswElement element, HnswElementTuple etup, bool loadHeaptids, bool loadVec)
{
element->version = etup->version;
element->level = etup->level;
element->deleted = etup->deleted;
element->neighborPage = ItemPointerGetBlockNumber(&etup->neighbortid);

View File

@@ -476,10 +476,12 @@ MarkDeleted(HnswVacuumState * vacuumstate)
ntup = (HnswNeighborTuple) PageGetItem(npage, PageGetItemId(npage, neighborOffno));
/* Overwrite element */
etup->version++;
etup->deleted = 1;
MemSet(&etup->vec.x, 0, etup->vec.dim * sizeof(float));
/* Overwrite neighbors */
ntup->version = etup->version;
for (int i = 0; i < ntup->count; i++)
ItemPointerSetInvalid(&ntup->indextids[i]);