diff --git a/src/hnsw.h b/src/hnsw.h index 6c0fc0d..ea0efb6 100644 --- a/src/hnsw.h +++ b/src/hnsw.h @@ -378,6 +378,7 @@ typedef struct HnswVacuumState int HnswGetM(Relation index); int HnswGetEfConstruction(Relation index); FmgrInfo *HnswOptionalProcInfo(Relation index, uint16 procnum); +void HnswSetProcinfo(Relation index, FmgrInfo **procinfo, FmgrInfo **normprocinfo, Oid **collation); Datum HnswNormValue(const HnswTypeInfo * typeInfo, Oid collation, Datum value); bool HnswCheckNorm(FmgrInfo *procinfo, Oid collation, Datum value); Buffer HnswNewBuffer(Relation index, ForkNumber forkNum); @@ -396,17 +397,17 @@ void HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, in void HnswAddHeapTid(HnswElement element, ItemPointer heaptid); HnswNeighborArray *HnswInitNeighborArray(int lm, HnswAllocator * allocator); void HnswInitNeighbors(char *base, HnswElement element, int m, HnswAllocator * alloc); -bool HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, ItemPointer heap_tid, bool building); +bool HnswInsertTupleOnDisk(Relation index, IndexTuple itup, ItemPointer heaptid, bool building); void HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo **procinfo, Oid *collation, HnswElement e, int m, bool checkExisting, bool building); void HnswLoadElementFromTuple(HnswElement element, HnswElementTuple etup, bool loadHeaptids, bool loadVec, Relation index); void HnswLoadElement(HnswElement element, double *distance, bool *matches, Datum *q, IndexTuple qtup, ScanKeyData *keyData, Relation index, FmgrInfo **procinfo, Oid *collation, bool loadVec, double *maxDistance); void HnswSetElementTuple(char *base, HnswElementTuple etup, HnswElement element, bool useIndexTuple); void HnswUpdateConnection(char *base, HnswNeighborArray * neighbors, HnswElement newElement, float distance, int lm, int *updateIdx, Relation index, FmgrInfo **procinfo, Oid *collation); +bool HnswFormIndexTuple(IndexTuple *out, Datum *values, bool *isnull, const HnswTypeInfo * typeInfo, FmgrInfo *normprocinfo, Oid collation, TupleDesc tupdesc); bool HnswLoadNeighborTids(HnswElement element, ItemPointerData *indextids, Relation index, int m, int lm, int lc); void HnswInitLockTranche(void); const HnswTypeInfo *HnswGetTypeInfo(Relation index); PGDLLEXPORT void HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc); -void HnswInitProcinfo(FmgrInfo **procinfo, Oid **collation, Relation index); Size HnswGetElementTupleSize(char *base, HnswElement element, bool useIndexTuple); bool HnswIndexTupleIsEqual(IndexTuple a, IndexTuple b, TupleDesc tupdesc); diff --git a/src/hnswbuild.c b/src/hnswbuild.c index 7fc77dc..c77cdf7 100644 --- a/src/hnswbuild.c +++ b/src/hnswbuild.c @@ -483,38 +483,23 @@ InsertTupleInMemory(HnswBuildState * buildstate, HnswElement element) static bool InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, HnswBuildState * buildstate) { - const HnswTypeInfo *typeInfo = buildstate->typeInfo; HnswGraph *graph = buildstate->graph; HnswElement element; HnswAllocator *allocator = &buildstate->allocator; - Size valueSize; - Pointer valuePtr; LWLock *flushLock = &graph->flushLock; char *base = buildstate->hnswarea; - bool useIndexTuple = buildstate->useIndexTuple; TupleDesc tupdesc = buildstate->tupdesc; IndexTuple itup; Size itupSize; - IndexTuple itupPtr; + IndexTuple itupShared; + bool unused; - /* Detoast once for all calls */ - Datum value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); + /* Form index tuple */ + if (!HnswFormIndexTuple(&itup, values, isnull, buildstate->typeInfo, buildstate->normprocinfo, buildstate->collation[0], tupdesc)) + return false; - /* Check value */ - if (typeInfo->checkValue != NULL) - typeInfo->checkValue(DatumGetPointer(value)); - - /* Normalize if needed */ - if (buildstate->normprocinfo != NULL) - { - if (!HnswCheckNorm(buildstate->normprocinfo, buildstate->collation[0], value)) - return false; - - value = HnswNormValue(typeInfo, buildstate->collation[0], value); - } - - /* Get datum size */ - valueSize = VARSIZE_ANY(DatumGetPointer(value)); + /* Get tuple size */ + itupSize = IndexTupleSize(itup); /* Ensure graph not flushed when inserting */ LWLockAcquire(flushLock, LW_SHARED); @@ -524,7 +509,7 @@ InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, Hn { LWLockRelease(flushLock); - return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, true); + return HnswInsertTupleOnDisk(index, itup, heaptid, true); } /* @@ -556,22 +541,12 @@ InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, Hn LWLockRelease(flushLock); - return HnswInsertTupleOnDisk(index, value, values, isnull, heaptid, true); + return HnswInsertTupleOnDisk(index, itup, heaptid, true); } /* Ok, we can proceed to allocate the element */ element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator); - - if (useIndexTuple) - { - /* TODO fix */ - values[0] = value; - itup = index_form_tuple(tupdesc, values, isnull); - itupSize = IndexTupleSize(itup); - itupPtr = HnswAlloc(allocator, itupSize); - } - else - valuePtr = HnswAlloc(allocator, valueSize); + itupShared = HnswAlloc(allocator, itupSize); /* * We have now allocated the space needed for the element, so we don't @@ -580,20 +555,10 @@ InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, Hn */ LWLockRelease(&graph->allocatorLock); - /* Copy the datum */ - if (useIndexTuple) - { - bool unused; - - memcpy(itupPtr, itup, itupSize); - HnswPtrStore(base, element->itup, itupPtr); - HnswPtrStore(base, element->value, DatumGetPointer(index_getattr(itupPtr, 1, tupdesc, &unused))); - } - else - { - memcpy(valuePtr, DatumGetPointer(value), valueSize); - HnswPtrStore(base, element->value, valuePtr); - } + /* Copy the tuple */ + memcpy(itupShared, itup, itupSize); + HnswPtrStore(base, element->itup, itupShared); + HnswPtrStore(base, element->value, DatumGetPointer(index_getattr(itupShared, 1, tupdesc, &unused))); /* Create a lock for the element */ LWLockInitialize(&element->lock, hnsw_lock_tranche_id); @@ -753,8 +718,7 @@ InitBuildState(HnswBuildState * buildstate, Relation heap, Relation index, Index buildstate->indtuples = 0; /* Get support functions */ - HnswInitProcinfo(buildstate->procinfo, &buildstate->collation, index); - buildstate->normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); + HnswSetProcinfo(index, buildstate->procinfo, &buildstate->normprocinfo, &buildstate->collation); InitGraph(&buildstate->graphData, NULL, (Size) maintenance_work_mem * 1024L); buildstate->graph = &buildstate->graphData; diff --git a/src/hnswinsert.c b/src/hnswinsert.c index 7467117..2734955 100644 --- a/src/hnswinsert.c +++ b/src/hnswinsert.c @@ -692,7 +692,7 @@ UpdateGraphOnDisk(Relation index, FmgrInfo **procinfo, Oid *collation, HnswEleme * Insert a tuple into the index */ bool -HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, ItemPointer heap_tid, bool building) +HnswInsertTupleOnDisk(Relation index, IndexTuple itup, ItemPointer heaptid, bool building) { HnswElement entryPoint; HnswElement element; @@ -702,8 +702,10 @@ HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, Oid *collation; LOCKMODE lockmode = ShareLock; char *base = NULL; + TupleDesc tupdesc = RelationGetDescr(index); + bool unused; - HnswInitProcinfo(procinfo, &collation, index); + HnswSetProcinfo(index, procinfo, NULL, &collation); /* * Get a shared lock. This allows vacuum to ensure no in-flight inserts @@ -716,24 +718,9 @@ HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, HnswGetMetaPageInfo(index, &m, &entryPoint); /* Create an element */ - element = HnswInitElement(base, heap_tid, m, HnswGetMl(m), HnswGetMaxLevel(m), NULL); - if (HnswUseIndexTuple(index)) - { - /* TODO no toast */ - TupleDesc tupdesc = RelationGetDescr(index); - IndexTuple itup; - bool unused; - - /* TODO fix */ - values[0] = value; - itup = index_form_tuple(tupdesc, values, isnull); - - HnswPtrStore(base, element->itup, itup); - HnswPtrStore(base, element->value, DatumGetPointer(index_getattr(itup, 1, tupdesc, &unused))); - - } - else - HnswPtrStore(base, element->value, DatumGetPointer(value)); + element = HnswInitElement(base, heaptid, m, HnswGetMl(m), HnswGetMaxLevel(m), NULL); + HnswPtrStore(base, element->itup, itup); + HnswPtrStore(base, element->value, DatumGetPointer(index_getattr(itup, 1, tupdesc, &unused))); /* Prevent concurrent inserts when likely updating entry point */ if (entryPoint == NULL || element->level > entryPoint->level) @@ -765,31 +752,19 @@ HnswInsertTupleOnDisk(Relation index, Datum value, Datum *values, bool *isnull, * Insert a tuple into the index */ static void -HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid) +HnswInsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid) { - Datum value; + IndexTuple itup; const HnswTypeInfo *typeInfo = HnswGetTypeInfo(index); - FmgrInfo *normprocinfo; - Oid *collation = index->rd_indcollation; + FmgrInfo *normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); + Oid collation = index->rd_indcollation[0]; + TupleDesc tupdesc = RelationGetDescr(index); - /* Detoast once for all calls */ - value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); + /* Form index tuple */ + if (!HnswFormIndexTuple(&itup, values, isnull, typeInfo, normprocinfo, collation, tupdesc)) + return; - /* Check value */ - if (typeInfo->checkValue != NULL) - typeInfo->checkValue(DatumGetPointer(value)); - - /* Normalize if needed */ - normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); - if (normprocinfo != NULL) - { - if (!HnswCheckNorm(normprocinfo, collation[0], value)) - return; - - value = HnswNormValue(typeInfo, collation[0], value); - } - - HnswInsertTupleOnDisk(index, value, values, isnull, heap_tid, false); + HnswInsertTupleOnDisk(index, itup, heaptid, false); } /* diff --git a/src/hnswscan.c b/src/hnswscan.c index 2925060..672f422 100644 --- a/src/hnswscan.c +++ b/src/hnswscan.c @@ -88,8 +88,7 @@ hnswbeginscan(Relation index, int nkeys, int norderbys) ALLOCSET_DEFAULT_SIZES); /* Set support functions */ - HnswInitProcinfo(so->procinfo, &so->collation, index); - so->normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); + HnswSetProcinfo(index, so->procinfo, &so->normprocinfo, &so->collation); scan->opaque = so; diff --git a/src/hnswutils.c b/src/hnswutils.c index 4e86e1c..ea94155 100644 --- a/src/hnswutils.c +++ b/src/hnswutils.c @@ -154,10 +154,10 @@ HnswOptionalProcInfo(Relation index, uint16 procnum) } /* - * Init procinfo + * Set procinfo */ void -HnswInitProcinfo(FmgrInfo **procinfo, Oid **collation, Relation index) +HnswSetProcinfo(Relation index, FmgrInfo **procinfo, FmgrInfo **normprocinfo, Oid **collation) { procinfo[0] = index_getprocinfo(index, 1, HNSW_DISTANCE_PROC); @@ -165,6 +165,9 @@ HnswInitProcinfo(FmgrInfo **procinfo, Oid **collation, Relation index) procinfo[1] = index_getprocinfo(index, 2, HNSW_ATTRIBUTE_DISTANCE_PROC); *collation = index->rd_indcollation; + + if (normprocinfo != NULL) + *normprocinfo = HnswOptionalProcInfo(index, HNSW_NORM_PROC); } /* @@ -465,6 +468,39 @@ HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, Bloc UnlockReleaseBuffer(buf); } +/* + * Form index tuple + */ +bool +HnswFormIndexTuple(IndexTuple *out, Datum *values, bool *isnull, const HnswTypeInfo * typeInfo, FmgrInfo *normprocinfo, Oid collation, TupleDesc tupdesc) +{ + Datum newValues[2]; + + /* Detoast once for all calls */ + Datum value = PointerGetDatum(PG_DETOAST_DATUM(values[0])); + + /* Check value */ + if (typeInfo->checkValue != NULL) + typeInfo->checkValue(DatumGetPointer(value)); + + /* Normalize if needed */ + if (normprocinfo != NULL) + { + if (!HnswCheckNorm(normprocinfo, collation, value)) + return false; + + value = HnswNormValue(typeInfo, collation, value); + } + + newValues[0] = value; + for (int i = 1; i < tupdesc->natts; i++) + newValues[i] = values[i]; + + *out = index_form_tuple(tupdesc, newValues, isnull); + + return true; +} + /* * Set element tuple, except for neighbor info */ diff --git a/src/hnswvacuum.c b/src/hnswvacuum.c index 00e1494..d8ad357 100644 --- a/src/hnswvacuum.c +++ b/src/hnswvacuum.c @@ -581,12 +581,13 @@ InitVacuumState(HnswVacuumState * vacuumstate, IndexVacuumInfo *info, IndexBulkD vacuumstate->callback_state = callback_state; vacuumstate->efConstruction = HnswGetEfConstruction(index); vacuumstate->bas = GetAccessStrategy(BAS_BULKREAD); - HnswInitProcinfo(vacuumstate->procinfo, &vacuumstate->collation, index); vacuumstate->ntup = palloc0(HNSW_TUPLE_ALLOC_SIZE); vacuumstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext, "Hnsw vacuum temporary context", ALLOCSET_DEFAULT_SIZES); + HnswSetProcinfo(index, vacuumstate->procinfo, NULL, &vacuumstate->collation); + /* Get m from metapage */ HnswGetMetaPageInfo(index, &vacuumstate->m, NULL);