From f7128477817b199858ca914fefb31b586ccfff63 Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Wed, 7 Feb 2024 13:56:41 -0800 Subject: [PATCH] Added duplicate checking for index tuples [skip ci] --- src/hnsw.h | 1 + src/hnswbuild.c | 8 +++----- src/hnswinsert.c | 4 +--- src/hnswutils.c | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 52 insertions(+), 8 deletions(-) diff --git a/src/hnsw.h b/src/hnsw.h index 1f3a838..19a5fa3 100644 --- a/src/hnsw.h +++ b/src/hnsw.h @@ -393,6 +393,7 @@ void HnswUpdateConnection(char *base, HnswElement element, HnswCandidate * hc, void HnswLoadNeighbors(HnswElement element, Relation index, int m); TupleDesc HnswTupleDesc(Relation index); IndexTuple HnswFormIndexTuple(Relation index, TupleDesc tupdesc, Datum value, Datum *values, bool *isnull); +bool HnswElementIsDuplicate(char *base, HnswElement a, HnswElement b, Relation index); PGDLLEXPORT void HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc); /* Index access methods */ diff --git a/src/hnswbuild.c b/src/hnswbuild.c index e961c2d..f7b0aaa 100644 --- a/src/hnswbuild.c +++ b/src/hnswbuild.c @@ -336,19 +336,17 @@ AddDuplicateInMemory(HnswElement element, HnswElement dup) * Find duplicate element */ static bool -FindDuplicateInMemory(char *base, HnswElement element) +FindDuplicateInMemory(char *base, HnswElement element, Relation index) { HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0); - Datum value = HnswGetValue(base, element); for (int i = 0; i < neighbors->length; i++) { HnswCandidate *neighbor = &neighbors->items[i]; HnswElement neighborElement = HnswPtrAccess(base, neighbor->element); - Datum neighborValue = HnswGetValue(base, neighborElement); /* Exit early since ordered by distance */ - if (!datumIsEqual(value, neighborValue, false, -1)) + if (!HnswElementIsDuplicate(base, element, neighborElement, index)) return false; /* Check for space */ @@ -408,7 +406,7 @@ UpdateGraphInMemory(FmgrInfo *procinfo, Oid collation, HnswElement element, int char *base = buildstate->hnswarea; /* Look for duplicate */ - if (FindDuplicateInMemory(base, element)) + if (FindDuplicateInMemory(base, element, buildstate->index)) return; /* Add element */ diff --git a/src/hnswinsert.c b/src/hnswinsert.c index f233f14..5024c68 100644 --- a/src/hnswinsert.c +++ b/src/hnswinsert.c @@ -507,16 +507,14 @@ FindDuplicateOnDisk(Relation index, HnswElement element, bool building) { char *base = NULL; HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0); - Datum value = HnswGetValue(base, element); for (int i = 0; i < neighbors->length; i++) { HnswCandidate *neighbor = &neighbors->items[i]; HnswElement neighborElement = HnswPtrAccess(base, neighbor->element); - Datum neighborValue = HnswGetValue(base, neighborElement); /* Exit early since ordered by distance */ - if (!datumIsEqual(value, neighborValue, false, -1)) + if (!HnswElementIsDuplicate(base, element, neighborElement, index)) return false; if (AddDuplicateOnDisk(index, element, neighborElement, building)) diff --git a/src/hnswutils.c b/src/hnswutils.c index c2f9433..99dc242 100644 --- a/src/hnswutils.c +++ b/src/hnswutils.c @@ -329,6 +329,53 @@ HnswFormIndexTuple(Relation index, TupleDesc tupdesc, Datum value, Datum *values return index_form_tuple(tupdesc, newValues, isnull); } +/* + * Check if elements are duplicates + */ +bool +HnswElementIsDuplicate(char *base, HnswElement a, HnswElement b, Relation index) +{ + if (IndexRelationGetNumberOfAttributes(index) == 1) + { + Datum value = HnswGetValue(base, a); + Datum value2 = HnswGetValue(base, b); + + return datumIsEqual(value, value2, false, -1); + } + else + { + TupleDesc tupdesc = RelationGetDescr(index); + IndexTuple itup = HnswPtrAccess(base, a->itup); + IndexTuple itup2 = HnswPtrAccess(base, b->itup); + + for (int i = 0; i < tupdesc->natts; i++) + { + Datum value; + Datum value2; + bool isnull; + bool isnull2; + + value = index_getattr(itup, i + 1, tupdesc, &isnull); + value2 = index_getattr(itup2, i + 1, tupdesc, &isnull2); + + if (isnull || isnull2) + { + if (isnull != isnull2) + return false; + } + else + { + Form_pg_attribute att = TupleDescAttr(tupdesc, i); + + if (!datumIsEqual(value, value2, att->attbyval, att->attlen)) + return false; + } + } + + return true; + } +} + /* * Get the metapage info */