Files
pgvector/src/ivfinsert.c
2024-10-11 15:01:54 -07:00

215 lines
5.4 KiB
C

#include "postgres.h"
#include <float.h>
#include "access/generic_xlog.h"
#include "ivfflat.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/memutils.h"
/*
* Find the list that minimizes the distance function
*/
static void
FindInsertPage(Relation index, Datum *values, BlockNumber *insertPage, ListInfo * listInfo)
{
double minDistance = DBL_MAX;
BlockNumber nextblkno = IVFFLAT_HEAD_BLKNO;
FmgrInfo *procinfo;
Oid collation;
/* Avoid compiler warning */
listInfo->blkno = nextblkno;
listInfo->offno = FirstOffsetNumber;
procinfo = index_getprocinfo(index, 1, IVFFLAT_DISTANCE_PROC);
collation = index->rd_indcollation[0];
/* Search all list pages */
while (BlockNumberIsValid(nextblkno))
{
Buffer cbuf;
Page cpage;
OffsetNumber maxoffno;
cbuf = ReadBuffer(index, nextblkno);
LockBuffer(cbuf, BUFFER_LOCK_SHARE);
cpage = BufferGetPage(cbuf);
maxoffno = PageGetMaxOffsetNumber(cpage);
for (OffsetNumber offno = FirstOffsetNumber; offno <= maxoffno; offno = OffsetNumberNext(offno))
{
IvfflatList list;
double distance;
list = (IvfflatList) PageGetItem(cpage, PageGetItemId(cpage, offno));
distance = DatumGetFloat8(FunctionCall2Coll(procinfo, collation, values[0], PointerGetDatum(&list->center)));
if (distance < minDistance || !BlockNumberIsValid(*insertPage))
{
*insertPage = list->insertPage;
listInfo->blkno = nextblkno;
listInfo->offno = offno;
minDistance = distance;
}
}
nextblkno = IvfflatPageGetOpaque(cpage)->nextblkno;
UnlockReleaseBuffer(cbuf);
}
}
/*
* Insert a tuple into the index
*/
static void
InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid, Relation heapRel)
{
const IvfflatTypeInfo *typeInfo = IvfflatGetTypeInfo(index);
IndexTuple itup;
Datum value;
FmgrInfo *normprocinfo;
Buffer buf;
Page page;
GenericXLogState *state;
Size itemsz;
BlockNumber insertPage = InvalidBlockNumber;
ListInfo listInfo;
BlockNumber originalInsertPage;
/* Detoast once for all calls */
value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
/* Normalize if needed */
normprocinfo = IvfflatOptionalProcInfo(index, IVFFLAT_NORM_PROC);
if (normprocinfo != NULL)
{
Oid collation = index->rd_indcollation[0];
if (!IvfflatCheckNorm(normprocinfo, collation, value))
return;
value = IvfflatNormValue(typeInfo, collation, value);
}
/* Ensure index is valid */
IvfflatGetMetaPageInfo(index, NULL, NULL);
/* Find the insert page - sets the page and list info */
FindInsertPage(index, &value, &insertPage, &listInfo);
Assert(BlockNumberIsValid(insertPage));
originalInsertPage = insertPage;
/* Form tuple */
itup = index_form_tuple(RelationGetDescr(index), &value, isnull);
itup->t_tid = *heap_tid;
/* Get tuple size */
itemsz = MAXALIGN(IndexTupleSize(itup));
Assert(itemsz <= BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(IvfflatPageOpaqueData)) - sizeof(ItemIdData));
/* Find a page to insert the item */
for (;;)
{
buf = ReadBuffer(index, insertPage);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buf, 0);
if (PageGetFreeSpace(page) >= itemsz)
break;
insertPage = IvfflatPageGetOpaque(page)->nextblkno;
if (BlockNumberIsValid(insertPage))
{
/* Move to next page */
GenericXLogAbort(state);
UnlockReleaseBuffer(buf);
}
else
{
Buffer newbuf;
Page newpage;
/* Add a new page */
LockRelationForExtension(index, ExclusiveLock);
newbuf = IvfflatNewBuffer(index, MAIN_FORKNUM);
UnlockRelationForExtension(index, ExclusiveLock);
/* Init new page */
newpage = GenericXLogRegisterBuffer(state, newbuf, GENERIC_XLOG_FULL_IMAGE);
IvfflatInitPage(newbuf, newpage);
/* Update insert page */
insertPage = BufferGetBlockNumber(newbuf);
/* Update previous buffer */
IvfflatPageGetOpaque(page)->nextblkno = insertPage;
/* Commit */
GenericXLogFinish(state);
/* Unlock previous buffer */
UnlockReleaseBuffer(buf);
/* Prepare new buffer */
state = GenericXLogStart(index);
buf = newbuf;
page = GenericXLogRegisterBuffer(state, buf, 0);
break;
}
}
/* Add to next offset */
if (PageAddItem(page, (Item) itup, itemsz, InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
IvfflatCommitBuffer(buf, state);
/* Update the insert page */
if (insertPage != originalInsertPage)
IvfflatUpdateList(index, listInfo, insertPage, originalInsertPage, InvalidBlockNumber, MAIN_FORKNUM);
}
/*
* Insert a tuple into the index
*/
bool
ivfflatinsert(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid,
Relation heap, IndexUniqueCheck checkUnique
#if PG_VERSION_NUM >= 140000
,bool indexUnchanged
#endif
,IndexInfo *indexInfo
)
{
MemoryContext oldCtx;
MemoryContext insertCtx;
/* Skip nulls */
if (isnull[0])
return false;
/*
* Use memory context since detoast, IvfflatNormValue, and
* index_form_tuple can allocate
*/
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
"Ivfflat insert temporary context",
ALLOCSET_DEFAULT_SIZES);
oldCtx = MemoryContextSwitchTo(insertCtx);
/* Insert tuple */
InsertTuple(index, values, isnull, heap_tid, heap);
/* Delete memory context */
MemoryContextSwitchTo(oldCtx);
MemoryContextDelete(insertCtx);
return false;
}