diff --git a/src/hnswinsert.c b/src/hnswinsert.c index f0c3d2a..ab5b98a 100644 --- a/src/hnswinsert.c +++ b/src/hnswinsert.c @@ -402,50 +402,6 @@ HnswUpdateNeighborPages(Relation index, FmgrInfo *procinfo, Oid collation, HnswE } } -/* - * Add the entry point - */ -static bool -HnswAddEntryPoint(Relation index, HnswElement element, int m, HnswElement * entryPoint) -{ - Buffer buf; - Page page; - GenericXLogState *state; - HnswMetaPage metap; - BlockNumber newInsertPage = InvalidBlockNumber; - - /* Lock the metapage to prevent concurrent inserts */ - buf = ReadBuffer(index, HNSW_METAPAGE_BLKNO); - LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - page = BufferGetPage(buf); - metap = HnswPageGetMeta(page); - - /* Check for new entry point after lock is acquired */ - if (BlockNumberIsValid(metap->entryBlkno)) - { - *entryPoint = HnswInitElementFromBlock(metap->entryBlkno, metap->entryOffno); - - UnlockReleaseBuffer(buf); - - return false; - } - - /* Write element and neighbor tuples */ - WriteNewElementPages(index, element, m, metap->insertPage, &newInsertPage); - - /* Start WAL */ - state = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(state, buf, 0); - - /* Update the metapage info */ - HnswUpdateMetaPageInfo(page, HNSW_UPDATE_ENTRY_ALWAYS, element, newInsertPage); - - /* Commit and unlock */ - HnswCommitBuffer(buf, state); - - return true; -} - /* * Add a heap TID to an existing element */ @@ -522,7 +478,7 @@ WriteElement(Relation index, FmgrInfo *procinfo, Oid collation, HnswElement elem HnswUpdateNeighborPages(index, procinfo, collation, element, m, false); /* Update metapage if needed */ - if (element->level > entryPoint->level) + if (entryPoint == NULL || element->level > entryPoint->level) HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_GREATER, element, InvalidBlockNumber, MAIN_FORKNUM); } @@ -542,6 +498,7 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti FmgrInfo *procinfo = index_getprocinfo(index, 1, HNSW_DISTANCE_PROC); Oid collation = index->rd_indcollation[0]; HnswElement dup; + LOCKMODE lockmode = ShareLock; /* Detoast once for all calls */ value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); @@ -562,19 +519,23 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti * Get a shared lock for the duration of the insert. Use a page lock so it * does not interfere with buffer lock (or reads when vacuuming). */ - LockPage(index, HNSW_METAPAGE_BLKNO, ShareLock); + LockPage(index, HNSW_METAPAGE_BLKNO, lockmode); /* Get entry point */ entryPoint = HnswGetEntryPoint(index); - /* Prevent concurrent inserts when no entry point */ - if (entryPoint == NULL) + /* Prevent concurrent inserts when likely updating entry point */ + if (entryPoint == NULL || element->level > entryPoint->level) { - if (HnswAddEntryPoint(index, element, m, &entryPoint)) - { - UnlockPage(index, HNSW_METAPAGE_BLKNO, ShareLock); - return true; - } + /* Release shared lock */ + UnlockPage(index, HNSW_METAPAGE_BLKNO, lockmode); + + /* Get exclusive lock */ + lockmode = ExclusiveLock; + LockPage(index, HNSW_METAPAGE_BLKNO, lockmode); + + /* Get latest entry point after lock is acquired */ + entryPoint = HnswGetEntryPoint(index); } /* Insert element in graph */ @@ -587,7 +548,7 @@ HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_ti WriteElement(index, procinfo, collation, element, m, efConstruction, dup, entryPoint); /* Release shared lock */ - UnlockPage(index, HNSW_METAPAGE_BLKNO, ShareLock); + UnlockPage(index, HNSW_METAPAGE_BLKNO, lockmode); return true; }