Use pairing heap [skip ci]

This commit is contained in:
Andrew Kane
2023-04-29 12:02:54 -07:00
parent a445355a48
commit a3c1eab27a
2 changed files with 89 additions and 65 deletions

View File

@@ -187,6 +187,14 @@ typedef struct IvfflatScanList
double distance;
} IvfflatScanList;
typedef struct IvfflatScanItem
{
pairingheap_node ph_node;
BlockNumber searchPage;
double distance;
ItemPointerData tid;
} IvfflatScanItem;
typedef struct IvfflatScanOpaqueData
{
int probes;
@@ -204,6 +212,13 @@ typedef struct IvfflatScanOpaqueData
FmgrInfo *normprocinfo;
Oid collation;
/* Items */
int maxItems;
int itemCount;
pairingheap *itemQueue;
IvfflatScanItem *items;
IvfflatScanItem **sortedItems;
/* Lists */
pairingheap *listQueue;
IvfflatScanList lists[FLEXIBLE_ARRAY_MEMBER]; /* must come last */

View File

@@ -26,6 +26,21 @@ CompareLists(const pairingheap_node *a, const pairingheap_node *b, void *arg)
return 0;
}
/*
* Compare item distances
*/
static int
CompareItems(const pairingheap_node *a, const pairingheap_node *b, void *arg)
{
if (((const IvfflatScanItem *) a)->distance > ((const IvfflatScanItem *) b)->distance)
return 1;
if (((const IvfflatScanItem *) a)->distance < ((const IvfflatScanItem *) b)->distance)
return -1;
return 0;
}
/*
* Get lists and sort by distance
*/
@@ -111,13 +126,10 @@ GetScanItems(IndexScanDesc scan, Datum value)
Datum datum;
bool isnull;
TupleDesc tupdesc = RelationGetDescr(scan->indexRelation);
double tuples = 0;
#if PG_VERSION_NUM >= 120000
TupleTableSlot *slot = MakeSingleTupleTableSlot(so->tupdesc, &TTSOpsVirtual);
#else
TupleTableSlot *slot = MakeSingleTupleTableSlot(so->tupdesc);
#endif
int i;
double distance;
IvfflatScanItem *scanitem;
double maxDistance = DBL_MAX;
/*
* Reuse same set of shared buffers for scan
@@ -143,25 +155,40 @@ GetScanItems(IndexScanDesc scan, Datum value)
{
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offno));
datum = index_getattr(itup, 1, tupdesc, &isnull);
distance = DatumGetFloat8(FunctionCall2Coll(so->procinfo, so->collation, datum, value));
/*
* Add virtual tuple
*
* Use procinfo from the index instead of scan key for
* performance
*/
ExecClearTuple(slot);
slot->tts_values[0] = FunctionCall2Coll(so->procinfo, so->collation, datum, value);
slot->tts_isnull[0] = false;
slot->tts_values[1] = PointerGetDatum(&itup->t_tid);
slot->tts_isnull[1] = false;
slot->tts_values[2] = Int32GetDatum((int) searchPage);
slot->tts_isnull[2] = false;
ExecStoreVirtualTuple(slot);
if (so->itemCount < so->maxItems)
{
scanitem = &so->items[so->itemCount];
scanitem->searchPage = searchPage;
scanitem->tid = itup->t_tid;
scanitem->distance = distance;
so->itemCount++;
tuplesort_puttupleslot(so->sortstate, slot);
/* Add to heap */
pairingheap_add(so->itemQueue, &scanitem->ph_node);
tuples++;
/* Calculate max distance */
if (so->itemCount == so->maxItems)
{
maxDistance = ((IvfflatScanItem *) pairingheap_first(so->itemQueue))->distance;
scanitem = &so->items[so->itemCount];
}
}
else if (distance < maxDistance)
{
/* Reuse */
scanitem->searchPage = searchPage;
scanitem->tid = itup->t_tid;
scanitem->distance = distance;
pairingheap_add(so->itemQueue, &scanitem->ph_node);
/* Remove */
scanitem = (IvfflatScanItem *) pairingheap_remove_first(so->itemQueue);
/* Update max distance */
maxDistance = ((IvfflatScanItem *) pairingheap_first(so->itemQueue))->distance;
}
}
searchPage = IvfflatPageGetOpaque(page)->nextblkno;
@@ -170,14 +197,10 @@ GetScanItems(IndexScanDesc scan, Datum value)
}
}
/* TODO Scan more lists */
if (tuples < 100)
ereport(DEBUG1,
(errmsg("index scan found few tuples"),
errdetail("index may have been created without data or lists is too high"),
errhint("recreate the index and possibly decrease lists")));
for (i = 0; i < so->itemCount; i++)
so->sortedItems[i] = (IvfflatScanItem *) pairingheap_remove_first(so->itemQueue);
tuplesort_performsort(so->sortstate);
Assert(pairingheap_is_empty(so->itemQueue));
}
/*
@@ -189,10 +212,6 @@ ivfflatbeginscan(Relation index, int nkeys, int norderbys)
IndexScanDesc scan;
IvfflatScanOpaque so;
int lists;
AttrNumber attNums[] = {1};
Oid sortOperators[] = {Float8LessOperator};
Oid sortCollations[] = {InvalidOid};
bool nullsFirstFlags[] = {false};
int probes = ivfflat_probes;
scan = RelationGetIndexScan(index, nkeys, norderbys);
@@ -211,27 +230,14 @@ ivfflatbeginscan(Relation index, int nkeys, int norderbys)
so->normprocinfo = IvfflatOptionalProcInfo(index, IVFFLAT_NORM_PROC);
so->collation = index->rd_indcollation[0];
/* Create tuple description for sorting */
#if PG_VERSION_NUM >= 120000
so->tupdesc = CreateTemplateTupleDesc(3);
#else
so->tupdesc = CreateTemplateTupleDesc(3, false);
#endif
TupleDescInitEntry(so->tupdesc, (AttrNumber) 1, "distance", FLOAT8OID, -1, 0);
TupleDescInitEntry(so->tupdesc, (AttrNumber) 2, "tid", TIDOID, -1, 0);
TupleDescInitEntry(so->tupdesc, (AttrNumber) 3, "indexblkno", INT4OID, -1, 0);
/* Prep sort */
so->sortstate = tuplesort_begin_heap(so->tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, work_mem, NULL, false);
#if PG_VERSION_NUM >= 120000
so->slot = MakeSingleTupleTableSlot(so->tupdesc, &TTSOpsMinimalTuple);
#else
so->slot = MakeSingleTupleTableSlot(so->tupdesc);
#endif
so->listQueue = pairingheap_allocate(CompareLists, scan);
so->maxItems = 1024;
so->itemCount = 0;
so->itemQueue = pairingheap_allocate(CompareItems, scan);
so->items = palloc(sizeof(IvfflatScanItem) * (so->maxItems + 1));
so->sortedItems = palloc(sizeof(IvfflatScanItem *) * so->maxItems);
scan->opaque = so;
return scan;
@@ -245,13 +251,10 @@ ivfflatrescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int
{
IvfflatScanOpaque so = (IvfflatScanOpaque) scan->opaque;
#if PG_VERSION_NUM >= 130000
if (!so->first)
tuplesort_reset(so->sortstate);
#endif
so->first = true;
pairingheap_reset(so->listQueue);
pairingheap_reset(so->itemQueue);
so->itemCount = 0;
if (keys && scan->numberOfKeys > 0)
memmove(scan->keyData, keys, scan->numberOfKeys * sizeof(ScanKeyData));
@@ -311,15 +314,18 @@ ivfflatgettuple(IndexScanDesc scan, ScanDirection dir)
pfree(DatumGetPointer(value));
}
if (tuplesort_gettupleslot(so->sortstate, true, false, so->slot, NULL))
if (so->itemCount > 0)
{
ItemPointer tid = (ItemPointer) DatumGetPointer(slot_getattr(so->slot, 2, &so->isnull));
BlockNumber indexblkno = DatumGetInt32(slot_getattr(so->slot, 3, &so->isnull));
IvfflatScanItem *scanitem;
so->itemCount--;
scanitem = so->sortedItems[so->itemCount];
#if PG_VERSION_NUM >= 120000
scan->xs_heaptid = *tid;
scan->xs_heaptid = scanitem->tid;
#else
scan->xs_ctup.t_self = *tid;
scan->xs_ctup.t_self = scanitem->tid;
#endif
if (BufferIsValid(so->buf))
@@ -331,7 +337,7 @@ ivfflatgettuple(IndexScanDesc scan, ScanDirection dir)
*
* https://www.postgresql.org/docs/current/index-locking.html
*/
so->buf = ReadBuffer(scan->indexRelation, indexblkno);
so->buf = ReadBuffer(scan->indexRelation, scanitem->searchPage);
scan->xs_recheckorderby = false;
return true;
@@ -353,7 +359,10 @@ ivfflatendscan(IndexScanDesc scan)
ReleaseBuffer(so->buf);
pairingheap_free(so->listQueue);
tuplesort_end(so->sortstate);
pairingheap_free(so->itemQueue);
pfree(so->items);
pfree(so->sortedItems);
pfree(so);
scan->opaque = NULL;