From d6ac7b93bb0e524c48e9b62dd359bbee304b6084 Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Sat, 2 Sep 2023 17:48:44 -0700 Subject: [PATCH] Skip dead tuples for search with HNSW --- src/hnsw.h | 3 ++ src/hnswscan.c | 88 +++++++++++++++++++++++++++++++++++ src/hnswutils.c | 38 ++++++++------- test/t/010_hnsw_wal.pl | 3 +- test/t/015_hnsw_duplicates.pl | 19 ++++++-- 5 files changed, 129 insertions(+), 22 deletions(-) diff --git a/src/hnsw.h b/src/hnsw.h index 3e8bdc2..8fec061 100644 --- a/src/hnsw.h +++ b/src/hnsw.h @@ -219,6 +219,9 @@ typedef struct HnswScanOpaqueData { bool first; Buffer buf; + ItemPointerData heaptid; + OffsetNumber offno; + int removedCount; List *w; MemoryContext tmpCtx; diff --git a/src/hnswscan.c b/src/hnswscan.c index 742779a..642a03d 100644 --- a/src/hnswscan.c +++ b/src/hnswscan.c @@ -58,6 +58,72 @@ GetDimensions(Relation index) return dimensions; } +/* + * Remove deleted heap TID + */ +static void +RemoveHeapTid(IndexScanDesc scan) +{ + HnswScanOpaque so = (HnswScanOpaque) scan->opaque; + Relation index = scan->indexRelation; + Buffer buf = so->buf; + Page page; + GenericXLogState *state; + ItemId itemid; + HnswElementTuple etup; + Size etupSize; + int idx = -1; + + if (!BufferIsValid(buf) || !OffsetNumberIsValid(so->offno) || !ItemPointerIsValid(&so->heaptid)) + return; + + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + state = GenericXLogStart(index); + page = GenericXLogRegisterBuffer(state, buf, 0); + itemid = PageGetItemId(page, so->offno); + etup = (HnswElementTuple) PageGetItem(page, itemid); + etupSize = ItemIdGetLength(itemid); + + Assert(HnswIsElementTuple(etup)); + + /* Find index */ + for (int i = 0; i < HNSW_HEAPTIDS; i++) + { + if (!ItemPointerIsValid(&etup->heaptids[i])) + break; + + if (ItemPointerEquals(&etup->heaptids[i], &so->heaptid)) + { + idx = i; + break; + } + } + + if (idx == -1) + GenericXLogAbort(state); + else + { + /* Move pointers forward */ + for (int i = idx; i < HNSW_HEAPTIDS; i++) + { + if (i + 1 == HNSW_HEAPTIDS || !ItemPointerIsValid(&etup->heaptids[i + 1])) + ItemPointerSetInvalid(&etup->heaptids[i]); + else + ItemPointerCopy(&etup->heaptids[i + 1], &etup->heaptids[i]); + } + + /* Overwrite tuple */ + if (!PageIndexTupleOverwrite(page, so->offno, (Item) etup, etupSize)) + elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index)); + + /* Commit */ + MarkBufferDirty(buf); + GenericXLogFinish(state); + } + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); +} + /* * Prepare for an index scan */ @@ -71,6 +137,9 @@ hnswbeginscan(Relation index, int nkeys, int norderbys) so = (HnswScanOpaque) palloc(sizeof(HnswScanOpaqueData)); so->buf = InvalidBuffer; + ItemPointerSetInvalid(&so->heaptid); + so->offno = InvalidOffsetNumber; + so->removedCount = 0; so->first = true; so->tmpCtx = AllocSetContextCreate(CurrentMemoryContext, "Hnsw scan temporary context", @@ -95,6 +164,7 @@ hnswrescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int no HnswScanOpaque so = (HnswScanOpaque) scan->opaque; so->first = true; + ItemPointerSetInvalid(&so->heaptid); MemoryContextReset(so->tmpCtx); if (keys && scan->numberOfKeys > 0) @@ -158,12 +228,25 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir) so->first = false; } + else + { + /* + * Remove dead tuples. kill_prior_tuple will only be true if not in + * recovery. Limit the number removed per scan for performance. + */ + if (scan->kill_prior_tuple && so->removedCount < 3) + { + RemoveHeapTid(scan); + so->removedCount++; + } + } while (list_length(so->w) > 0) { HnswCandidate *hc = llast(so->w); ItemPointer tid; BlockNumber indexblkno; + OffsetNumber indexoffno; /* Move to next element if no valid heap tids */ if (list_length(hc->element->heaptids) == 0) @@ -174,6 +257,7 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir) tid = llast(hc->element->heaptids); indexblkno = hc->element->blkno; + indexoffno = hc->element->offno; hc->element->heaptids = list_delete_last(hc->element->heaptids); @@ -185,6 +269,10 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir) scan->xs_ctup.t_self = *tid; #endif + /* Keep track of info needed to remove dead tuples */ + so->heaptid = *tid; + so->offno = indexoffno; + /* Unpin buffer */ if (BufferIsValid(so->buf)) ReleaseBuffer(so->buf); diff --git a/src/hnswutils.c b/src/hnswutils.c index 8e6f2a9..e0852d2 100644 --- a/src/hnswutils.c +++ b/src/hnswutils.c @@ -551,6 +551,8 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro pairingheap *C = pairingheap_allocate(CompareNearestCandidates, NULL); pairingheap *W = pairingheap_allocate(CompareFurthestCandidates, NULL); int wlen = 0; + uint64 dead = 0; + uint64 maxAdditional = skipElement == NULL ? ef : PG_UINT64_MAX; HASHCTL hash_ctl; HTAB *v; @@ -579,13 +581,14 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro pairingheap_add(C, &(CreatePairingHeapNode(hc)->ph_node)); pairingheap_add(W, &(CreatePairingHeapNode(hc)->ph_node)); - /* - * Do not count elements being deleted towards ef when vacuuming. It - * would be ideal to do this for inserts as well, but this could - * affect insert performance. - */ - if (skipElement == NULL || list_length(hc->element->heaptids) != 0) - wlen++; + /* Do not count certain number of dead elements towards ef */ + if (list_length(hc->element->heaptids) == 0) + { + if ((++dead) <= maxAdditional) + continue; + } + + wlen++; } while (!pairingheap_is_empty(C)) @@ -638,19 +641,18 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro pairingheap_add(C, &(CreatePairingHeapNode(ec)->ph_node)); pairingheap_add(W, &(CreatePairingHeapNode(ec)->ph_node)); - /* - * Do not count elements being deleted towards ef when - * vacuuming. It would be ideal to do this for inserts as - * well, but this could affect insert performance. - */ - if (skipElement == NULL || list_length(e->element->heaptids) != 0) + /* Do not count certain number of dead elements towards ef */ + if (list_length(e->element->heaptids) == 0) { - wlen++; - - /* No need to decrement wlen */ - if (wlen > ef) - pairingheap_remove_first(W); + if ((++dead) <= maxAdditional) + continue; } + + wlen++; + + /* No need to decrement wlen */ + if (wlen > ef) + pairingheap_remove_first(W); } } } diff --git a/test/t/010_hnsw_wal.pl b/test/t/010_hnsw_wal.pl index 422a10b..05cd545 100644 --- a/test/t/010_hnsw_wal.pl +++ b/test/t/010_hnsw_wal.pl @@ -38,8 +38,9 @@ sub test_index_replay ); # Run test queries and compare their result - my $primary_result = $node_primary->safe_psql("postgres", $queries); + # Query replica first since index scan on primary can generate WAL removing tuples my $replica_result = $node_replica->safe_psql("postgres", $queries); + my $primary_result = $node_primary->safe_psql("postgres", $queries); is($primary_result, $replica_result, "$test_name: query result matches"); return; diff --git a/test/t/015_hnsw_duplicates.pl b/test/t/015_hnsw_duplicates.pl index 7e11dee..5ebc4cc 100644 --- a/test/t/015_hnsw_duplicates.pl +++ b/test/t/015_hnsw_duplicates.pl @@ -23,25 +23,27 @@ sub insert_vectors sub test_duplicates { + my ($exp) = @_; + my $res = $node->safe_psql("postgres", qq( SET enable_seqscan = off; SET hnsw.ef_search = 1; SELECT COUNT(*) FROM (SELECT * FROM tst ORDER BY v <-> '[1,1,1]') t; )); - is($res, 10); + is($res, $exp); } # Test duplicates with build insert_vectors(); $node->safe_psql("postgres", "CREATE INDEX idx ON tst USING hnsw (v vector_l2_ops);"); -test_duplicates(); +test_duplicates(10); # Reset $node->safe_psql("postgres", "TRUNCATE tst;"); # Test duplicates with inserts insert_vectors(); -test_duplicates(); +test_duplicates(10); # Test fallback path for inserts $node->pgbench( @@ -55,4 +57,15 @@ $node->pgbench( } ); +# Reset +$node->safe_psql("postgres", "TRUNCATE tst;"); + +# Test deletes with index scan +$node->safe_psql("postgres", "INSERT INTO tst SELECT '[1,1,1]' FROM generate_series(1, 10) i;"); +$node->safe_psql("postgres", "DELETE FROM tst WHERE ctid IN (SELECT ctid FROM tst ORDER BY random() LIMIT 5);"); +for (1 .. 3) +{ + test_duplicates(5); +} + done_testing();