Added duplicate checking for index tuples [skip ci]

This commit is contained in:
Andrew Kane
2024-02-07 13:56:41 -08:00
parent e3c33c9ba2
commit f712847781
4 changed files with 52 additions and 8 deletions

View File

@@ -393,6 +393,7 @@ void HnswUpdateConnection(char *base, HnswElement element, HnswCandidate * hc,
void HnswLoadNeighbors(HnswElement element, Relation index, int m);
TupleDesc HnswTupleDesc(Relation index);
IndexTuple HnswFormIndexTuple(Relation index, TupleDesc tupdesc, Datum value, Datum *values, bool *isnull);
bool HnswElementIsDuplicate(char *base, HnswElement a, HnswElement b, Relation index);
PGDLLEXPORT void HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc);
/* Index access methods */

View File

@@ -336,19 +336,17 @@ AddDuplicateInMemory(HnswElement element, HnswElement dup)
* Find duplicate element
*/
static bool
FindDuplicateInMemory(char *base, HnswElement element)
FindDuplicateInMemory(char *base, HnswElement element, Relation index)
{
HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0);
Datum value = HnswGetValue(base, element);
for (int i = 0; i < neighbors->length; i++)
{
HnswCandidate *neighbor = &neighbors->items[i];
HnswElement neighborElement = HnswPtrAccess(base, neighbor->element);
Datum neighborValue = HnswGetValue(base, neighborElement);
/* Exit early since ordered by distance */
if (!datumIsEqual(value, neighborValue, false, -1))
if (!HnswElementIsDuplicate(base, element, neighborElement, index))
return false;
/* Check for space */
@@ -408,7 +406,7 @@ UpdateGraphInMemory(FmgrInfo *procinfo, Oid collation, HnswElement element, int
char *base = buildstate->hnswarea;
/* Look for duplicate */
if (FindDuplicateInMemory(base, element))
if (FindDuplicateInMemory(base, element, buildstate->index))
return;
/* Add element */

View File

@@ -507,16 +507,14 @@ FindDuplicateOnDisk(Relation index, HnswElement element, bool building)
{
char *base = NULL;
HnswNeighborArray *neighbors = HnswGetNeighbors(base, element, 0);
Datum value = HnswGetValue(base, element);
for (int i = 0; i < neighbors->length; i++)
{
HnswCandidate *neighbor = &neighbors->items[i];
HnswElement neighborElement = HnswPtrAccess(base, neighbor->element);
Datum neighborValue = HnswGetValue(base, neighborElement);
/* Exit early since ordered by distance */
if (!datumIsEqual(value, neighborValue, false, -1))
if (!HnswElementIsDuplicate(base, element, neighborElement, index))
return false;
if (AddDuplicateOnDisk(index, element, neighborElement, building))

View File

@@ -329,6 +329,53 @@ HnswFormIndexTuple(Relation index, TupleDesc tupdesc, Datum value, Datum *values
return index_form_tuple(tupdesc, newValues, isnull);
}
/*
* Check if elements are duplicates
*/
bool
HnswElementIsDuplicate(char *base, HnswElement a, HnswElement b, Relation index)
{
if (IndexRelationGetNumberOfAttributes(index) == 1)
{
Datum value = HnswGetValue(base, a);
Datum value2 = HnswGetValue(base, b);
return datumIsEqual(value, value2, false, -1);
}
else
{
TupleDesc tupdesc = RelationGetDescr(index);
IndexTuple itup = HnswPtrAccess(base, a->itup);
IndexTuple itup2 = HnswPtrAccess(base, b->itup);
for (int i = 0; i < tupdesc->natts; i++)
{
Datum value;
Datum value2;
bool isnull;
bool isnull2;
value = index_getattr(itup, i + 1, tupdesc, &isnull);
value2 = index_getattr(itup2, i + 1, tupdesc, &isnull2);
if (isnull || isnull2)
{
if (isnull != isnull2)
return false;
}
else
{
Form_pg_attribute att = TupleDescAttr(tupdesc, i);
if (!datumIsEqual(value, value2, att->attbyval, att->attlen))
return false;
}
}
return true;
}
}
/*
* Get the metapage info
*/