mirror of
https://github.com/pgvector/pgvector.git
synced 2026-06-29 17:21:16 +08:00
Skip dead tuples for search with HNSW
This commit is contained in:
@@ -219,6 +219,9 @@ typedef struct HnswScanOpaqueData
|
||||
{
|
||||
bool first;
|
||||
Buffer buf;
|
||||
ItemPointerData heaptid;
|
||||
OffsetNumber offno;
|
||||
int removedCount;
|
||||
List *w;
|
||||
MemoryContext tmpCtx;
|
||||
|
||||
|
||||
@@ -58,6 +58,72 @@ GetDimensions(Relation index)
|
||||
return dimensions;
|
||||
}
|
||||
|
||||
/*
|
||||
* Remove deleted heap TID
|
||||
*/
|
||||
static void
|
||||
RemoveHeapTid(IndexScanDesc scan)
|
||||
{
|
||||
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
|
||||
Relation index = scan->indexRelation;
|
||||
Buffer buf = so->buf;
|
||||
Page page;
|
||||
GenericXLogState *state;
|
||||
ItemId itemid;
|
||||
HnswElementTuple etup;
|
||||
Size etupSize;
|
||||
int idx = -1;
|
||||
|
||||
if (!BufferIsValid(buf) || !OffsetNumberIsValid(so->offno) || !ItemPointerIsValid(&so->heaptid))
|
||||
return;
|
||||
|
||||
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
|
||||
state = GenericXLogStart(index);
|
||||
page = GenericXLogRegisterBuffer(state, buf, 0);
|
||||
itemid = PageGetItemId(page, so->offno);
|
||||
etup = (HnswElementTuple) PageGetItem(page, itemid);
|
||||
etupSize = ItemIdGetLength(itemid);
|
||||
|
||||
Assert(HnswIsElementTuple(etup));
|
||||
|
||||
/* Find index */
|
||||
for (int i = 0; i < HNSW_HEAPTIDS; i++)
|
||||
{
|
||||
if (!ItemPointerIsValid(&etup->heaptids[i]))
|
||||
break;
|
||||
|
||||
if (ItemPointerEquals(&etup->heaptids[i], &so->heaptid))
|
||||
{
|
||||
idx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (idx == -1)
|
||||
GenericXLogAbort(state);
|
||||
else
|
||||
{
|
||||
/* Move pointers forward */
|
||||
for (int i = idx; i < HNSW_HEAPTIDS; i++)
|
||||
{
|
||||
if (i + 1 == HNSW_HEAPTIDS || !ItemPointerIsValid(&etup->heaptids[i + 1]))
|
||||
ItemPointerSetInvalid(&etup->heaptids[i]);
|
||||
else
|
||||
ItemPointerCopy(&etup->heaptids[i + 1], &etup->heaptids[i]);
|
||||
}
|
||||
|
||||
/* Overwrite tuple */
|
||||
if (!PageIndexTupleOverwrite(page, so->offno, (Item) etup, etupSize))
|
||||
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
|
||||
|
||||
/* Commit */
|
||||
MarkBufferDirty(buf);
|
||||
GenericXLogFinish(state);
|
||||
}
|
||||
|
||||
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare for an index scan
|
||||
*/
|
||||
@@ -71,6 +137,9 @@ hnswbeginscan(Relation index, int nkeys, int norderbys)
|
||||
|
||||
so = (HnswScanOpaque) palloc(sizeof(HnswScanOpaqueData));
|
||||
so->buf = InvalidBuffer;
|
||||
ItemPointerSetInvalid(&so->heaptid);
|
||||
so->offno = InvalidOffsetNumber;
|
||||
so->removedCount = 0;
|
||||
so->first = true;
|
||||
so->tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"Hnsw scan temporary context",
|
||||
@@ -95,6 +164,7 @@ hnswrescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int no
|
||||
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
|
||||
|
||||
so->first = true;
|
||||
ItemPointerSetInvalid(&so->heaptid);
|
||||
MemoryContextReset(so->tmpCtx);
|
||||
|
||||
if (keys && scan->numberOfKeys > 0)
|
||||
@@ -158,12 +228,25 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir)
|
||||
|
||||
so->first = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Remove dead tuples. kill_prior_tuple will only be true if not in
|
||||
* recovery. Limit the number removed per scan for performance.
|
||||
*/
|
||||
if (scan->kill_prior_tuple && so->removedCount < 3)
|
||||
{
|
||||
RemoveHeapTid(scan);
|
||||
so->removedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
while (list_length(so->w) > 0)
|
||||
{
|
||||
HnswCandidate *hc = llast(so->w);
|
||||
ItemPointer tid;
|
||||
BlockNumber indexblkno;
|
||||
OffsetNumber indexoffno;
|
||||
|
||||
/* Move to next element if no valid heap tids */
|
||||
if (list_length(hc->element->heaptids) == 0)
|
||||
@@ -174,6 +257,7 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir)
|
||||
|
||||
tid = llast(hc->element->heaptids);
|
||||
indexblkno = hc->element->blkno;
|
||||
indexoffno = hc->element->offno;
|
||||
|
||||
hc->element->heaptids = list_delete_last(hc->element->heaptids);
|
||||
|
||||
@@ -185,6 +269,10 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir)
|
||||
scan->xs_ctup.t_self = *tid;
|
||||
#endif
|
||||
|
||||
/* Keep track of info needed to remove dead tuples */
|
||||
so->heaptid = *tid;
|
||||
so->offno = indexoffno;
|
||||
|
||||
/* Unpin buffer */
|
||||
if (BufferIsValid(so->buf))
|
||||
ReleaseBuffer(so->buf);
|
||||
|
||||
@@ -551,6 +551,8 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
|
||||
pairingheap *C = pairingheap_allocate(CompareNearestCandidates, NULL);
|
||||
pairingheap *W = pairingheap_allocate(CompareFurthestCandidates, NULL);
|
||||
int wlen = 0;
|
||||
uint64 dead = 0;
|
||||
uint64 maxAdditional = skipElement == NULL ? ef : PG_UINT64_MAX;
|
||||
HASHCTL hash_ctl;
|
||||
HTAB *v;
|
||||
|
||||
@@ -579,13 +581,14 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
|
||||
pairingheap_add(C, &(CreatePairingHeapNode(hc)->ph_node));
|
||||
pairingheap_add(W, &(CreatePairingHeapNode(hc)->ph_node));
|
||||
|
||||
/*
|
||||
* Do not count elements being deleted towards ef when vacuuming. It
|
||||
* would be ideal to do this for inserts as well, but this could
|
||||
* affect insert performance.
|
||||
*/
|
||||
if (skipElement == NULL || list_length(hc->element->heaptids) != 0)
|
||||
wlen++;
|
||||
/* Do not count certain number of dead elements towards ef */
|
||||
if (list_length(hc->element->heaptids) == 0)
|
||||
{
|
||||
if ((++dead) <= maxAdditional)
|
||||
continue;
|
||||
}
|
||||
|
||||
wlen++;
|
||||
}
|
||||
|
||||
while (!pairingheap_is_empty(C))
|
||||
@@ -638,19 +641,18 @@ HnswSearchLayer(Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *pro
|
||||
pairingheap_add(C, &(CreatePairingHeapNode(ec)->ph_node));
|
||||
pairingheap_add(W, &(CreatePairingHeapNode(ec)->ph_node));
|
||||
|
||||
/*
|
||||
* Do not count elements being deleted towards ef when
|
||||
* vacuuming. It would be ideal to do this for inserts as
|
||||
* well, but this could affect insert performance.
|
||||
*/
|
||||
if (skipElement == NULL || list_length(e->element->heaptids) != 0)
|
||||
/* Do not count certain number of dead elements towards ef */
|
||||
if (list_length(e->element->heaptids) == 0)
|
||||
{
|
||||
wlen++;
|
||||
|
||||
/* No need to decrement wlen */
|
||||
if (wlen > ef)
|
||||
pairingheap_remove_first(W);
|
||||
if ((++dead) <= maxAdditional)
|
||||
continue;
|
||||
}
|
||||
|
||||
wlen++;
|
||||
|
||||
/* No need to decrement wlen */
|
||||
if (wlen > ef)
|
||||
pairingheap_remove_first(W);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,8 +38,9 @@ sub test_index_replay
|
||||
);
|
||||
|
||||
# Run test queries and compare their result
|
||||
my $primary_result = $node_primary->safe_psql("postgres", $queries);
|
||||
# Query replica first since index scan on primary can generate WAL removing tuples
|
||||
my $replica_result = $node_replica->safe_psql("postgres", $queries);
|
||||
my $primary_result = $node_primary->safe_psql("postgres", $queries);
|
||||
|
||||
is($primary_result, $replica_result, "$test_name: query result matches");
|
||||
return;
|
||||
|
||||
@@ -23,25 +23,27 @@ sub insert_vectors
|
||||
|
||||
sub test_duplicates
|
||||
{
|
||||
my ($exp) = @_;
|
||||
|
||||
my $res = $node->safe_psql("postgres", qq(
|
||||
SET enable_seqscan = off;
|
||||
SET hnsw.ef_search = 1;
|
||||
SELECT COUNT(*) FROM (SELECT * FROM tst ORDER BY v <-> '[1,1,1]') t;
|
||||
));
|
||||
is($res, 10);
|
||||
is($res, $exp);
|
||||
}
|
||||
|
||||
# Test duplicates with build
|
||||
insert_vectors();
|
||||
$node->safe_psql("postgres", "CREATE INDEX idx ON tst USING hnsw (v vector_l2_ops);");
|
||||
test_duplicates();
|
||||
test_duplicates(10);
|
||||
|
||||
# Reset
|
||||
$node->safe_psql("postgres", "TRUNCATE tst;");
|
||||
|
||||
# Test duplicates with inserts
|
||||
insert_vectors();
|
||||
test_duplicates();
|
||||
test_duplicates(10);
|
||||
|
||||
# Test fallback path for inserts
|
||||
$node->pgbench(
|
||||
@@ -55,4 +57,15 @@ $node->pgbench(
|
||||
}
|
||||
);
|
||||
|
||||
# Reset
|
||||
$node->safe_psql("postgres", "TRUNCATE tst;");
|
||||
|
||||
# Test deletes with index scan
|
||||
$node->safe_psql("postgres", "INSERT INTO tst SELECT '[1,1,1]' FROM generate_series(1, 10) i;");
|
||||
$node->safe_psql("postgres", "DELETE FROM tst WHERE ctid IN (SELECT ctid FROM tst ORDER BY random() LIMIT 5);");
|
||||
for (1 .. 3)
|
||||
{
|
||||
test_duplicates(5);
|
||||
}
|
||||
|
||||
done_testing();
|
||||
|
||||
Reference in New Issue
Block a user