mirror of
https://github.com/pgvector/pgvector.git
synced 2026-06-06 14:01:31 +08:00
1066 lines
30 KiB
C
1066 lines
30 KiB
C
#include "postgres.h"
|
|
|
|
#include <float.h>
|
|
|
|
#include "access/table.h"
|
|
#include "access/tableam.h"
|
|
#include "access/parallel.h"
|
|
#include "access/xact.h"
|
|
#include "bitvector.h"
|
|
#include "catalog/index.h"
|
|
#include "catalog/pg_operator_d.h"
|
|
#include "catalog/pg_type_d.h"
|
|
#include "commands/progress.h"
|
|
#include "halfvec.h"
|
|
#include "ivfflat.h"
|
|
#include "miscadmin.h"
|
|
#include "optimizer/optimizer.h"
|
|
#include "storage/bufmgr.h"
|
|
#include "tcop/tcopprot.h"
|
|
#include "utils/memutils.h"
|
|
#include "vector.h"
|
|
|
|
#if PG_VERSION_NUM >= 140000
|
|
#include "utils/backend_progress.h"
|
|
#else
|
|
#include "pgstat.h"
|
|
#endif
|
|
|
|
#if PG_VERSION_NUM >= 130000
|
|
#define CALLBACK_ITEM_POINTER ItemPointer tid
|
|
#else
|
|
#define CALLBACK_ITEM_POINTER HeapTuple hup
|
|
#endif
|
|
|
|
#if PG_VERSION_NUM >= 140000
|
|
#include "utils/backend_status.h"
|
|
#include "utils/wait_event.h"
|
|
#endif
|
|
|
|
#define PARALLEL_KEY_IVFFLAT_SHARED UINT64CONST(0xA000000000000001)
|
|
#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002)
|
|
#define PARALLEL_KEY_IVFFLAT_CENTERS UINT64CONST(0xA000000000000003)
|
|
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004)
|
|
|
|
/*
|
|
* Add sample
|
|
*/
|
|
static void
|
|
AddSample(Datum *values, IvfflatBuildState * buildstate)
|
|
{
|
|
VectorArray samples = buildstate->samples;
|
|
int targsamples = samples->maxlen;
|
|
|
|
/* Detoast once for all calls */
|
|
Datum value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
|
|
|
|
/*
|
|
* Normalize with KMEANS_NORM_PROC since spherical distance function
|
|
* expects unit vectors
|
|
*/
|
|
if (buildstate->kmeansnormprocinfo != NULL)
|
|
{
|
|
if (!IvfflatNormValue(buildstate->kmeansnormprocinfo, buildstate->collation, &value, buildstate->type))
|
|
return;
|
|
}
|
|
|
|
if (samples->length < targsamples)
|
|
{
|
|
VectorArraySet(samples, samples->length, DatumGetPointer(value));
|
|
samples->length++;
|
|
}
|
|
else
|
|
{
|
|
if (buildstate->rowstoskip < 0)
|
|
buildstate->rowstoskip = reservoir_get_next_S(&buildstate->rstate, samples->length, targsamples);
|
|
|
|
if (buildstate->rowstoskip <= 0)
|
|
{
|
|
#if PG_VERSION_NUM >= 150000
|
|
int k = (int) (targsamples * sampler_random_fract(&buildstate->rstate.randstate));
|
|
#else
|
|
int k = (int) (targsamples * sampler_random_fract(buildstate->rstate.randstate));
|
|
#endif
|
|
|
|
Assert(k >= 0 && k < targsamples);
|
|
VectorArraySet(samples, k, DatumGetPointer(value));
|
|
}
|
|
|
|
buildstate->rowstoskip -= 1;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Callback for sampling
|
|
*/
|
|
static void
|
|
SampleCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
|
|
bool *isnull, bool tupleIsAlive, void *state)
|
|
{
|
|
IvfflatBuildState *buildstate = (IvfflatBuildState *) state;
|
|
MemoryContext oldCtx;
|
|
|
|
/* Skip nulls */
|
|
if (isnull[0])
|
|
return;
|
|
|
|
/* Use memory context since detoast can allocate */
|
|
oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
|
|
|
|
/* Add sample */
|
|
AddSample(values, buildstate);
|
|
|
|
/* Reset memory context */
|
|
MemoryContextSwitchTo(oldCtx);
|
|
MemoryContextReset(buildstate->tmpCtx);
|
|
}
|
|
|
|
/*
|
|
* Sample rows with same logic as ANALYZE
|
|
*/
|
|
static void
|
|
SampleRows(IvfflatBuildState * buildstate)
|
|
{
|
|
int targsamples = buildstate->samples->maxlen;
|
|
BlockNumber totalblocks = RelationGetNumberOfBlocks(buildstate->heap);
|
|
|
|
buildstate->rowstoskip = -1;
|
|
|
|
BlockSampler_Init(&buildstate->bs, totalblocks, targsamples, RandomInt());
|
|
|
|
reservoir_init_selection_state(&buildstate->rstate, targsamples);
|
|
while (BlockSampler_HasMore(&buildstate->bs))
|
|
{
|
|
BlockNumber targblock = BlockSampler_Next(&buildstate->bs);
|
|
|
|
table_index_build_range_scan(buildstate->heap, buildstate->index, buildstate->indexInfo,
|
|
false, true, false, targblock, 1, SampleCallback, (void *) buildstate, NULL);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Add tuple to sort
|
|
*/
|
|
static void
|
|
AddTupleToSort(Relation index, ItemPointer tid, Datum *values, IvfflatBuildState * buildstate)
|
|
{
|
|
double distance;
|
|
double minDistance = DBL_MAX;
|
|
int closestCenter = 0;
|
|
VectorArray centers = buildstate->centers;
|
|
TupleTableSlot *slot = buildstate->slot;
|
|
|
|
/* Detoast once for all calls */
|
|
Datum value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
|
|
|
|
/* Normalize if needed */
|
|
if (buildstate->normprocinfo != NULL)
|
|
{
|
|
if (!IvfflatNormValue(buildstate->normprocinfo, buildstate->collation, &value, buildstate->type))
|
|
return;
|
|
}
|
|
|
|
/* Find the list that minimizes the distance */
|
|
for (int i = 0; i < centers->length; i++)
|
|
{
|
|
distance = DatumGetFloat8(FunctionCall2Coll(buildstate->procinfo, buildstate->collation, value, PointerGetDatum(VectorArrayGet(centers, i))));
|
|
|
|
if (distance < minDistance)
|
|
{
|
|
minDistance = distance;
|
|
closestCenter = i;
|
|
}
|
|
}
|
|
|
|
#ifdef IVFFLAT_KMEANS_DEBUG
|
|
buildstate->inertia += minDistance;
|
|
buildstate->listSums[closestCenter] += minDistance;
|
|
buildstate->listCounts[closestCenter]++;
|
|
#endif
|
|
|
|
/* Create a virtual tuple */
|
|
ExecClearTuple(slot);
|
|
slot->tts_values[0] = Int32GetDatum(closestCenter);
|
|
slot->tts_isnull[0] = false;
|
|
slot->tts_values[1] = PointerGetDatum(tid);
|
|
slot->tts_isnull[1] = false;
|
|
slot->tts_values[2] = value;
|
|
slot->tts_isnull[2] = false;
|
|
ExecStoreVirtualTuple(slot);
|
|
|
|
/*
|
|
* Add tuple to sort
|
|
*
|
|
* tuplesort_puttupleslot comment: Input data is always copied; the caller
|
|
* need not save it.
|
|
*/
|
|
tuplesort_puttupleslot(buildstate->sortstate, slot);
|
|
|
|
buildstate->indtuples++;
|
|
}
|
|
|
|
/*
|
|
* Callback for table_index_build_scan
|
|
*/
|
|
static void
|
|
BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *values,
|
|
bool *isnull, bool tupleIsAlive, void *state)
|
|
{
|
|
IvfflatBuildState *buildstate = (IvfflatBuildState *) state;
|
|
MemoryContext oldCtx;
|
|
|
|
#if PG_VERSION_NUM < 130000
|
|
ItemPointer tid = &hup->t_self;
|
|
#endif
|
|
|
|
/* Skip nulls */
|
|
if (isnull[0])
|
|
return;
|
|
|
|
/* Use memory context since detoast can allocate */
|
|
oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
|
|
|
|
/* Add tuple to sort */
|
|
AddTupleToSort(index, tid, values, buildstate);
|
|
|
|
/* Reset memory context */
|
|
MemoryContextSwitchTo(oldCtx);
|
|
MemoryContextReset(buildstate->tmpCtx);
|
|
}
|
|
|
|
/*
|
|
* Get index tuple from sort state
|
|
*/
|
|
static inline void
|
|
GetNextTuple(Tuplesortstate *sortstate, TupleDesc tupdesc, TupleTableSlot *slot, IndexTuple *itup, int *list)
|
|
{
|
|
Datum value;
|
|
bool isnull;
|
|
|
|
if (tuplesort_gettupleslot(sortstate, true, false, slot, NULL))
|
|
{
|
|
*list = DatumGetInt32(slot_getattr(slot, 1, &isnull));
|
|
value = slot_getattr(slot, 3, &isnull);
|
|
|
|
/* Form the index tuple */
|
|
*itup = index_form_tuple(tupdesc, &value, &isnull);
|
|
(*itup)->t_tid = *((ItemPointer) DatumGetPointer(slot_getattr(slot, 2, &isnull)));
|
|
}
|
|
else
|
|
*list = -1;
|
|
}
|
|
|
|
/*
|
|
* Create initial entry pages
|
|
*/
|
|
static void
|
|
InsertTuples(Relation index, IvfflatBuildState * buildstate, ForkNumber forkNum)
|
|
{
|
|
int list;
|
|
IndexTuple itup = NULL; /* silence compiler warning */
|
|
int64 inserted = 0;
|
|
|
|
TupleTableSlot *slot = MakeSingleTupleTableSlot(buildstate->tupdesc, &TTSOpsMinimalTuple);
|
|
TupleDesc tupdesc = RelationGetDescr(index);
|
|
|
|
pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_IVFFLAT_PHASE_LOAD);
|
|
|
|
pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_TOTAL, buildstate->indtuples);
|
|
|
|
GetNextTuple(buildstate->sortstate, tupdesc, slot, &itup, &list);
|
|
|
|
for (int i = 0; i < buildstate->centers->length; i++)
|
|
{
|
|
Buffer buf;
|
|
Page page;
|
|
GenericXLogState *state;
|
|
BlockNumber startPage;
|
|
BlockNumber insertPage;
|
|
|
|
/* Can take a while, so ensure we can interrupt */
|
|
/* Needs to be called when no buffer locks are held */
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
buf = IvfflatNewBuffer(index, forkNum);
|
|
IvfflatInitRegisterPage(index, &buf, &page, &state);
|
|
|
|
startPage = BufferGetBlockNumber(buf);
|
|
|
|
/* Get all tuples for list */
|
|
while (list == i)
|
|
{
|
|
/* Check for free space */
|
|
Size itemsz = MAXALIGN(IndexTupleSize(itup));
|
|
|
|
if (PageGetFreeSpace(page) < itemsz)
|
|
IvfflatAppendPage(index, &buf, &page, &state, forkNum);
|
|
|
|
/* Add the item */
|
|
if (PageAddItem(page, (Item) itup, itemsz, InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
|
|
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
|
|
|
|
pfree(itup);
|
|
|
|
pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE, ++inserted);
|
|
|
|
GetNextTuple(buildstate->sortstate, tupdesc, slot, &itup, &list);
|
|
}
|
|
|
|
insertPage = BufferGetBlockNumber(buf);
|
|
|
|
IvfflatCommitBuffer(buf, state);
|
|
|
|
/* Set the start and insert pages */
|
|
IvfflatUpdateList(index, buildstate->listInfo[i], insertPage, InvalidBlockNumber, startPage, forkNum);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Get max dimensions
|
|
*/
|
|
static int
|
|
GetMaxDimensions(IvfflatType type)
|
|
{
|
|
int maxDimensions = IVFFLAT_MAX_DIM;
|
|
|
|
if (type == IVFFLAT_TYPE_HALFVEC)
|
|
maxDimensions *= 2;
|
|
else if (type == IVFFLAT_TYPE_BIT)
|
|
maxDimensions *= 32;
|
|
|
|
return maxDimensions;
|
|
}
|
|
|
|
/*
|
|
* Get item size
|
|
*/
|
|
static Size
|
|
GetItemSize(IvfflatType type, int dimensions)
|
|
{
|
|
if (type == IVFFLAT_TYPE_VECTOR)
|
|
return VECTOR_SIZE(dimensions);
|
|
else if (type == IVFFLAT_TYPE_HALFVEC)
|
|
return HALFVEC_SIZE(dimensions);
|
|
else if (type == IVFFLAT_TYPE_BIT)
|
|
return VARBITTOTALLEN(dimensions);
|
|
else
|
|
elog(ERROR, "Unsupported type");
|
|
}
|
|
|
|
/*
|
|
* Initialize the build state
|
|
*/
|
|
static void
|
|
InitBuildState(IvfflatBuildState * buildstate, Relation heap, Relation index, IndexInfo *indexInfo)
|
|
{
|
|
int maxDimensions;
|
|
|
|
buildstate->heap = heap;
|
|
buildstate->index = index;
|
|
buildstate->indexInfo = indexInfo;
|
|
buildstate->type = IvfflatGetType(index);
|
|
|
|
buildstate->lists = IvfflatGetLists(index);
|
|
buildstate->dimensions = TupleDescAttr(index->rd_att, 0)->atttypmod;
|
|
|
|
maxDimensions = GetMaxDimensions(buildstate->type);
|
|
|
|
/* Require column to have dimensions to be indexed */
|
|
if (buildstate->dimensions < 0)
|
|
elog(ERROR, "column does not have dimensions");
|
|
|
|
if (buildstate->dimensions > maxDimensions)
|
|
elog(ERROR, "column cannot have more than %d dimensions for ivfflat index", maxDimensions);
|
|
|
|
buildstate->reltuples = 0;
|
|
buildstate->indtuples = 0;
|
|
|
|
/* Get support functions */
|
|
buildstate->procinfo = index_getprocinfo(index, 1, IVFFLAT_DISTANCE_PROC);
|
|
buildstate->normprocinfo = IvfflatOptionalProcInfo(index, IVFFLAT_NORM_PROC);
|
|
buildstate->kmeansnormprocinfo = IvfflatOptionalProcInfo(index, IVFFLAT_KMEANS_NORM_PROC);
|
|
buildstate->collation = index->rd_indcollation[0];
|
|
|
|
/* Require more than one dimension for spherical k-means */
|
|
if (buildstate->kmeansnormprocinfo != NULL && buildstate->dimensions == 1)
|
|
elog(ERROR, "dimensions must be greater than one for this opclass");
|
|
|
|
/* Create tuple description for sorting */
|
|
buildstate->tupdesc = CreateTemplateTupleDesc(3);
|
|
TupleDescInitEntry(buildstate->tupdesc, (AttrNumber) 1, "list", INT4OID, -1, 0);
|
|
TupleDescInitEntry(buildstate->tupdesc, (AttrNumber) 2, "tid", TIDOID, -1, 0);
|
|
TupleDescInitEntry(buildstate->tupdesc, (AttrNumber) 3, "vector", RelationGetDescr(index)->attrs[0].atttypid, -1, 0);
|
|
|
|
buildstate->slot = MakeSingleTupleTableSlot(buildstate->tupdesc, &TTSOpsVirtual);
|
|
|
|
buildstate->centers = VectorArrayInit(buildstate->lists, buildstate->dimensions, GetItemSize(buildstate->type, buildstate->dimensions));
|
|
buildstate->listInfo = palloc(sizeof(ListInfo) * buildstate->lists);
|
|
|
|
buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
|
|
"Ivfflat build temporary context",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
|
|
#ifdef IVFFLAT_KMEANS_DEBUG
|
|
buildstate->inertia = 0;
|
|
buildstate->listSums = palloc0(sizeof(double) * buildstate->lists);
|
|
buildstate->listCounts = palloc0(sizeof(int) * buildstate->lists);
|
|
#endif
|
|
|
|
buildstate->ivfleader = NULL;
|
|
}
|
|
|
|
/*
|
|
* Free resources
|
|
*/
|
|
static void
|
|
FreeBuildState(IvfflatBuildState * buildstate)
|
|
{
|
|
VectorArrayFree(buildstate->centers);
|
|
pfree(buildstate->listInfo);
|
|
|
|
#ifdef IVFFLAT_KMEANS_DEBUG
|
|
pfree(buildstate->listSums);
|
|
pfree(buildstate->listCounts);
|
|
#endif
|
|
|
|
MemoryContextDelete(buildstate->tmpCtx);
|
|
}
|
|
|
|
/*
|
|
* Compute centers
|
|
*/
|
|
static void
|
|
ComputeCenters(IvfflatBuildState * buildstate)
|
|
{
|
|
int numSamples;
|
|
|
|
pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_IVFFLAT_PHASE_KMEANS);
|
|
|
|
/* Target 50 samples per list, with at least 10000 samples */
|
|
/* The number of samples has a large effect on index build time */
|
|
numSamples = buildstate->lists * 50;
|
|
if (numSamples < 10000)
|
|
numSamples = 10000;
|
|
|
|
/* Skip samples for unlogged table */
|
|
if (buildstate->heap == NULL)
|
|
numSamples = 1;
|
|
|
|
/* Sample rows */
|
|
/* TODO Ensure within maintenance_work_mem */
|
|
buildstate->samples = VectorArrayInit(numSamples, buildstate->dimensions, buildstate->centers->itemsize);
|
|
if (buildstate->heap != NULL)
|
|
{
|
|
SampleRows(buildstate);
|
|
|
|
if (buildstate->samples->length < buildstate->lists)
|
|
{
|
|
ereport(NOTICE,
|
|
(errmsg("ivfflat index created with little data"),
|
|
errdetail("This will cause low recall."),
|
|
errhint("Drop the index until the table has more data.")));
|
|
}
|
|
}
|
|
|
|
/* Calculate centers */
|
|
IvfflatBench("k-means", IvfflatKmeans(buildstate->index, buildstate->samples, buildstate->centers, buildstate->type));
|
|
|
|
/* Free samples before we allocate more memory */
|
|
VectorArrayFree(buildstate->samples);
|
|
}
|
|
|
|
/*
|
|
* Create the metapage
|
|
*/
|
|
static void
|
|
CreateMetaPage(Relation index, int dimensions, int lists, ForkNumber forkNum)
|
|
{
|
|
Buffer buf;
|
|
Page page;
|
|
GenericXLogState *state;
|
|
IvfflatMetaPage metap;
|
|
|
|
buf = IvfflatNewBuffer(index, forkNum);
|
|
IvfflatInitRegisterPage(index, &buf, &page, &state);
|
|
|
|
/* Set metapage data */
|
|
metap = IvfflatPageGetMeta(page);
|
|
metap->magicNumber = IVFFLAT_MAGIC_NUMBER;
|
|
metap->version = IVFFLAT_VERSION;
|
|
metap->dimensions = dimensions;
|
|
metap->lists = lists;
|
|
((PageHeader) page)->pd_lower =
|
|
((char *) metap + sizeof(IvfflatMetaPageData)) - (char *) page;
|
|
|
|
IvfflatCommitBuffer(buf, state);
|
|
}
|
|
|
|
/*
|
|
* Create list pages
|
|
*/
|
|
static void
|
|
CreateListPages(Relation index, VectorArray centers, int dimensions,
|
|
int lists, ForkNumber forkNum, ListInfo * *listInfo)
|
|
{
|
|
Buffer buf;
|
|
Page page;
|
|
GenericXLogState *state;
|
|
Size listSize;
|
|
IvfflatList list;
|
|
|
|
listSize = MAXALIGN(IVFFLAT_LIST_SIZE(centers->itemsize));
|
|
list = palloc0(listSize);
|
|
|
|
buf = IvfflatNewBuffer(index, forkNum);
|
|
IvfflatInitRegisterPage(index, &buf, &page, &state);
|
|
|
|
for (int i = 0; i < lists; i++)
|
|
{
|
|
OffsetNumber offno;
|
|
|
|
/* Load list */
|
|
list->startPage = InvalidBlockNumber;
|
|
list->insertPage = InvalidBlockNumber;
|
|
memcpy(&list->center, VectorArrayGet(centers, i), centers->itemsize);
|
|
|
|
/* Ensure free space */
|
|
if (PageGetFreeSpace(page) < listSize)
|
|
IvfflatAppendPage(index, &buf, &page, &state, forkNum);
|
|
|
|
/* Add the item */
|
|
offno = PageAddItem(page, (Item) list, listSize, InvalidOffsetNumber, false, false);
|
|
if (offno == InvalidOffsetNumber)
|
|
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
|
|
|
|
/* Save location info */
|
|
(*listInfo)[i].blkno = BufferGetBlockNumber(buf);
|
|
(*listInfo)[i].offno = offno;
|
|
}
|
|
|
|
IvfflatCommitBuffer(buf, state);
|
|
|
|
pfree(list);
|
|
}
|
|
|
|
#ifdef IVFFLAT_KMEANS_DEBUG
|
|
/*
|
|
* Print k-means metrics
|
|
*/
|
|
static void
|
|
PrintKmeansMetrics(IvfflatBuildState * buildstate)
|
|
{
|
|
elog(INFO, "inertia: %.3e", buildstate->inertia);
|
|
|
|
/* Calculate Davies-Bouldin index */
|
|
if (buildstate->lists > 1 && !buildstate->ivfleader)
|
|
{
|
|
double db = 0.0;
|
|
|
|
/* Calculate average distance */
|
|
for (int i = 0; i < buildstate->lists; i++)
|
|
{
|
|
if (buildstate->listCounts[i] > 0)
|
|
buildstate->listSums[i] /= buildstate->listCounts[i];
|
|
}
|
|
|
|
for (int i = 0; i < buildstate->lists; i++)
|
|
{
|
|
double max = 0.0;
|
|
double distance;
|
|
|
|
for (int j = 0; j < buildstate->lists; j++)
|
|
{
|
|
if (j == i)
|
|
continue;
|
|
|
|
distance = DatumGetFloat8(FunctionCall2Coll(buildstate->procinfo, buildstate->collation, PointerGetDatum(VectorArrayGet(buildstate->centers, i)), PointerGetDatum(VectorArrayGet(buildstate->centers, j))));
|
|
distance = (buildstate->listSums[i] + buildstate->listSums[j]) / distance;
|
|
|
|
if (distance > max)
|
|
max = distance;
|
|
}
|
|
db += max;
|
|
}
|
|
db /= buildstate->lists;
|
|
elog(INFO, "davies-bouldin: %.3f", db);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
/*
|
|
* Within leader, wait for end of heap scan
|
|
*/
|
|
static double
|
|
ParallelHeapScan(IvfflatBuildState * buildstate)
|
|
{
|
|
IvfflatShared *ivfshared = buildstate->ivfleader->ivfshared;
|
|
int nparticipanttuplesorts;
|
|
double reltuples;
|
|
|
|
nparticipanttuplesorts = buildstate->ivfleader->nparticipanttuplesorts;
|
|
for (;;)
|
|
{
|
|
SpinLockAcquire(&ivfshared->mutex);
|
|
if (ivfshared->nparticipantsdone == nparticipanttuplesorts)
|
|
{
|
|
buildstate->indtuples = ivfshared->indtuples;
|
|
reltuples = ivfshared->reltuples;
|
|
#ifdef IVFFLAT_KMEANS_DEBUG
|
|
buildstate->inertia = ivfshared->inertia;
|
|
#endif
|
|
SpinLockRelease(&ivfshared->mutex);
|
|
break;
|
|
}
|
|
SpinLockRelease(&ivfshared->mutex);
|
|
|
|
ConditionVariableSleep(&ivfshared->workersdonecv,
|
|
WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
|
|
}
|
|
|
|
ConditionVariableCancelSleep();
|
|
|
|
return reltuples;
|
|
}
|
|
|
|
/*
|
|
* Perform a worker's portion of a parallel sort
|
|
*/
|
|
static void
|
|
IvfflatParallelScanAndSort(IvfflatSpool * ivfspool, IvfflatShared * ivfshared, Sharedsort *sharedsort, char *ivfcenters, int sortmem, bool progress)
|
|
{
|
|
SortCoordinate coordinate;
|
|
IvfflatBuildState buildstate;
|
|
TableScanDesc scan;
|
|
double reltuples;
|
|
IndexInfo *indexInfo;
|
|
|
|
/* Sort options, which must match AssignTuples */
|
|
AttrNumber attNums[] = {1};
|
|
Oid sortOperators[] = {Int4LessOperator};
|
|
Oid sortCollations[] = {InvalidOid};
|
|
bool nullsFirstFlags[] = {false};
|
|
|
|
/* Initialize local tuplesort coordination state */
|
|
coordinate = palloc0(sizeof(SortCoordinateData));
|
|
coordinate->isWorker = true;
|
|
coordinate->nParticipants = -1;
|
|
coordinate->sharedsort = sharedsort;
|
|
|
|
/* Join parallel scan */
|
|
indexInfo = BuildIndexInfo(ivfspool->index);
|
|
indexInfo->ii_Concurrent = ivfshared->isconcurrent;
|
|
InitBuildState(&buildstate, ivfspool->heap, ivfspool->index, indexInfo);
|
|
memcpy(buildstate.centers->items, ivfcenters, buildstate.centers->itemsize * buildstate.centers->maxlen);
|
|
buildstate.centers->length = buildstate.centers->maxlen;
|
|
ivfspool->sortstate = tuplesort_begin_heap(buildstate.tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, sortmem, coordinate, false);
|
|
buildstate.sortstate = ivfspool->sortstate;
|
|
scan = table_beginscan_parallel(ivfspool->heap,
|
|
ParallelTableScanFromIvfflatShared(ivfshared));
|
|
reltuples = table_index_build_scan(ivfspool->heap, ivfspool->index, indexInfo,
|
|
true, progress, BuildCallback,
|
|
(void *) &buildstate, scan);
|
|
|
|
/* Execute this worker's part of the sort */
|
|
tuplesort_performsort(ivfspool->sortstate);
|
|
|
|
/* Record statistics */
|
|
SpinLockAcquire(&ivfshared->mutex);
|
|
ivfshared->nparticipantsdone++;
|
|
ivfshared->reltuples += reltuples;
|
|
ivfshared->indtuples += buildstate.indtuples;
|
|
#ifdef IVFFLAT_KMEANS_DEBUG
|
|
ivfshared->inertia += buildstate.inertia;
|
|
#endif
|
|
SpinLockRelease(&ivfshared->mutex);
|
|
|
|
/* Log statistics */
|
|
if (progress)
|
|
ereport(DEBUG1, (errmsg("leader processed " INT64_FORMAT " tuples", (int64) reltuples)));
|
|
else
|
|
ereport(DEBUG1, (errmsg("worker processed " INT64_FORMAT " tuples", (int64) reltuples)));
|
|
|
|
/* Notify leader */
|
|
ConditionVariableSignal(&ivfshared->workersdonecv);
|
|
|
|
/* We can end tuplesorts immediately */
|
|
tuplesort_end(ivfspool->sortstate);
|
|
|
|
FreeBuildState(&buildstate);
|
|
}
|
|
|
|
/*
|
|
* Perform work within a launched parallel process
|
|
*/
|
|
void
|
|
IvfflatParallelBuildMain(dsm_segment *seg, shm_toc *toc)
|
|
{
|
|
char *sharedquery;
|
|
IvfflatSpool *ivfspool;
|
|
IvfflatShared *ivfshared;
|
|
Sharedsort *sharedsort;
|
|
char *ivfcenters;
|
|
Relation heapRel;
|
|
Relation indexRel;
|
|
LOCKMODE heapLockmode;
|
|
LOCKMODE indexLockmode;
|
|
int sortmem;
|
|
|
|
/* Set debug_query_string for individual workers first */
|
|
sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
|
|
debug_query_string = sharedquery;
|
|
|
|
/* Report the query string from leader */
|
|
pgstat_report_activity(STATE_RUNNING, debug_query_string);
|
|
|
|
/* Look up shared state */
|
|
ivfshared = shm_toc_lookup(toc, PARALLEL_KEY_IVFFLAT_SHARED, false);
|
|
|
|
/* Open relations using lock modes known to be obtained by index.c */
|
|
if (!ivfshared->isconcurrent)
|
|
{
|
|
heapLockmode = ShareLock;
|
|
indexLockmode = AccessExclusiveLock;
|
|
}
|
|
else
|
|
{
|
|
heapLockmode = ShareUpdateExclusiveLock;
|
|
indexLockmode = RowExclusiveLock;
|
|
}
|
|
|
|
/* Open relations within worker */
|
|
heapRel = table_open(ivfshared->heaprelid, heapLockmode);
|
|
indexRel = index_open(ivfshared->indexrelid, indexLockmode);
|
|
|
|
/* Initialize worker's own spool */
|
|
ivfspool = (IvfflatSpool *) palloc0(sizeof(IvfflatSpool));
|
|
ivfspool->heap = heapRel;
|
|
ivfspool->index = indexRel;
|
|
|
|
/* Look up shared state private to tuplesort.c */
|
|
sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
|
|
tuplesort_attach_shared(sharedsort, seg);
|
|
|
|
ivfcenters = shm_toc_lookup(toc, PARALLEL_KEY_IVFFLAT_CENTERS, false);
|
|
|
|
/* Perform sorting */
|
|
sortmem = maintenance_work_mem / ivfshared->scantuplesortstates;
|
|
IvfflatParallelScanAndSort(ivfspool, ivfshared, sharedsort, ivfcenters, sortmem, false);
|
|
|
|
/* Close relations within worker */
|
|
index_close(indexRel, indexLockmode);
|
|
table_close(heapRel, heapLockmode);
|
|
}
|
|
|
|
/*
|
|
* End parallel build
|
|
*/
|
|
static void
|
|
IvfflatEndParallel(IvfflatLeader * ivfleader)
|
|
{
|
|
/* Shutdown worker processes */
|
|
WaitForParallelWorkersToFinish(ivfleader->pcxt);
|
|
|
|
/* Free last reference to MVCC snapshot, if one was used */
|
|
if (IsMVCCSnapshot(ivfleader->snapshot))
|
|
UnregisterSnapshot(ivfleader->snapshot);
|
|
DestroyParallelContext(ivfleader->pcxt);
|
|
ExitParallelMode();
|
|
}
|
|
|
|
/*
|
|
* Return size of shared memory required for parallel index build
|
|
*/
|
|
static Size
|
|
ParallelEstimateShared(Relation heap, Snapshot snapshot)
|
|
{
|
|
return add_size(BUFFERALIGN(sizeof(IvfflatShared)), table_parallelscan_estimate(heap, snapshot));
|
|
}
|
|
|
|
/*
|
|
* Within leader, participate as a parallel worker
|
|
*/
|
|
static void
|
|
IvfflatLeaderParticipateAsWorker(IvfflatBuildState * buildstate)
|
|
{
|
|
IvfflatLeader *ivfleader = buildstate->ivfleader;
|
|
IvfflatSpool *leaderworker;
|
|
int sortmem;
|
|
|
|
/* Allocate memory and initialize private spool */
|
|
leaderworker = (IvfflatSpool *) palloc0(sizeof(IvfflatSpool));
|
|
leaderworker->heap = buildstate->heap;
|
|
leaderworker->index = buildstate->index;
|
|
|
|
/* Perform work common to all participants */
|
|
sortmem = maintenance_work_mem / ivfleader->nparticipanttuplesorts;
|
|
IvfflatParallelScanAndSort(leaderworker, ivfleader->ivfshared,
|
|
ivfleader->sharedsort, ivfleader->ivfcenters,
|
|
sortmem, true);
|
|
}
|
|
|
|
/*
|
|
* Begin parallel build
|
|
*/
|
|
static void
|
|
IvfflatBeginParallel(IvfflatBuildState * buildstate, bool isconcurrent, int request)
|
|
{
|
|
ParallelContext *pcxt;
|
|
int scantuplesortstates;
|
|
Snapshot snapshot;
|
|
Size estivfshared;
|
|
Size estsort;
|
|
Size estcenters;
|
|
IvfflatShared *ivfshared;
|
|
Sharedsort *sharedsort;
|
|
char *ivfcenters;
|
|
IvfflatLeader *ivfleader = (IvfflatLeader *) palloc0(sizeof(IvfflatLeader));
|
|
bool leaderparticipates = true;
|
|
int querylen;
|
|
|
|
#ifdef DISABLE_LEADER_PARTICIPATION
|
|
leaderparticipates = false;
|
|
#endif
|
|
|
|
/* Enter parallel mode and create context */
|
|
EnterParallelMode();
|
|
Assert(request > 0);
|
|
pcxt = CreateParallelContext("vector", "IvfflatParallelBuildMain", request);
|
|
|
|
scantuplesortstates = leaderparticipates ? request + 1 : request;
|
|
|
|
/* Get snapshot for table scan */
|
|
if (!isconcurrent)
|
|
snapshot = SnapshotAny;
|
|
else
|
|
snapshot = RegisterSnapshot(GetTransactionSnapshot());
|
|
|
|
/* Estimate size of workspaces */
|
|
estivfshared = ParallelEstimateShared(buildstate->heap, snapshot);
|
|
shm_toc_estimate_chunk(&pcxt->estimator, estivfshared);
|
|
estsort = tuplesort_estimate_shared(scantuplesortstates);
|
|
shm_toc_estimate_chunk(&pcxt->estimator, estsort);
|
|
estcenters = buildstate->centers->itemsize * buildstate->centers->maxlen;
|
|
shm_toc_estimate_chunk(&pcxt->estimator, estcenters);
|
|
shm_toc_estimate_keys(&pcxt->estimator, 3);
|
|
|
|
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
|
|
if (debug_query_string)
|
|
{
|
|
querylen = strlen(debug_query_string);
|
|
shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
|
|
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
|
}
|
|
else
|
|
querylen = 0; /* keep compiler quiet */
|
|
|
|
/* Everyone's had a chance to ask for space, so now create the DSM */
|
|
InitializeParallelDSM(pcxt);
|
|
|
|
/* If no DSM segment was available, back out (do serial build) */
|
|
if (pcxt->seg == NULL)
|
|
{
|
|
if (IsMVCCSnapshot(snapshot))
|
|
UnregisterSnapshot(snapshot);
|
|
DestroyParallelContext(pcxt);
|
|
ExitParallelMode();
|
|
return;
|
|
}
|
|
|
|
/* Store shared build state, for which we reserved space */
|
|
ivfshared = (IvfflatShared *) shm_toc_allocate(pcxt->toc, estivfshared);
|
|
/* Initialize immutable state */
|
|
ivfshared->heaprelid = RelationGetRelid(buildstate->heap);
|
|
ivfshared->indexrelid = RelationGetRelid(buildstate->index);
|
|
ivfshared->isconcurrent = isconcurrent;
|
|
ivfshared->scantuplesortstates = scantuplesortstates;
|
|
ConditionVariableInit(&ivfshared->workersdonecv);
|
|
SpinLockInit(&ivfshared->mutex);
|
|
/* Initialize mutable state */
|
|
ivfshared->nparticipantsdone = 0;
|
|
ivfshared->reltuples = 0;
|
|
ivfshared->indtuples = 0;
|
|
#ifdef IVFFLAT_KMEANS_DEBUG
|
|
ivfshared->inertia = 0;
|
|
#endif
|
|
table_parallelscan_initialize(buildstate->heap,
|
|
ParallelTableScanFromIvfflatShared(ivfshared),
|
|
snapshot);
|
|
|
|
/* Store shared tuplesort-private state, for which we reserved space */
|
|
sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
|
|
tuplesort_initialize_shared(sharedsort, scantuplesortstates,
|
|
pcxt->seg);
|
|
|
|
ivfcenters = shm_toc_allocate(pcxt->toc, estcenters);
|
|
memcpy(ivfcenters, buildstate->centers->items, estcenters);
|
|
|
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_IVFFLAT_SHARED, ivfshared);
|
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
|
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_IVFFLAT_CENTERS, ivfcenters);
|
|
|
|
/* Store query string for workers */
|
|
if (debug_query_string)
|
|
{
|
|
char *sharedquery;
|
|
|
|
sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
|
|
memcpy(sharedquery, debug_query_string, querylen + 1);
|
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
|
|
}
|
|
|
|
/* Launch workers, saving status for leader/caller */
|
|
LaunchParallelWorkers(pcxt);
|
|
ivfleader->pcxt = pcxt;
|
|
ivfleader->nparticipanttuplesorts = pcxt->nworkers_launched;
|
|
if (leaderparticipates)
|
|
ivfleader->nparticipanttuplesorts++;
|
|
ivfleader->ivfshared = ivfshared;
|
|
ivfleader->sharedsort = sharedsort;
|
|
ivfleader->snapshot = snapshot;
|
|
ivfleader->ivfcenters = ivfcenters;
|
|
|
|
/* If no workers were successfully launched, back out (do serial build) */
|
|
if (pcxt->nworkers_launched == 0)
|
|
{
|
|
IvfflatEndParallel(ivfleader);
|
|
return;
|
|
}
|
|
|
|
/* Log participants */
|
|
ereport(DEBUG1, (errmsg("using %d parallel workers", pcxt->nworkers_launched)));
|
|
|
|
/* Save leader state now that it's clear build will be parallel */
|
|
buildstate->ivfleader = ivfleader;
|
|
|
|
/* Join heap scan ourselves */
|
|
if (leaderparticipates)
|
|
IvfflatLeaderParticipateAsWorker(buildstate);
|
|
|
|
/* Wait for all launched workers */
|
|
WaitForParallelWorkersToAttach(pcxt);
|
|
}
|
|
|
|
/*
|
|
* Scan table for tuples to index
|
|
*/
|
|
static void
|
|
AssignTuples(IvfflatBuildState * buildstate)
|
|
{
|
|
int parallel_workers = 0;
|
|
SortCoordinate coordinate = NULL;
|
|
|
|
/* Sort options, which must match IvfflatParallelScanAndSort */
|
|
AttrNumber attNums[] = {1};
|
|
Oid sortOperators[] = {Int4LessOperator};
|
|
Oid sortCollations[] = {InvalidOid};
|
|
bool nullsFirstFlags[] = {false};
|
|
|
|
pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_IVFFLAT_PHASE_ASSIGN);
|
|
|
|
/* Calculate parallel workers */
|
|
if (buildstate->heap != NULL)
|
|
parallel_workers = plan_create_index_workers(RelationGetRelid(buildstate->heap), RelationGetRelid(buildstate->index));
|
|
|
|
/* Attempt to launch parallel worker scan when required */
|
|
if (parallel_workers > 0)
|
|
IvfflatBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers);
|
|
|
|
/* Set up coordination state if at least one worker launched */
|
|
if (buildstate->ivfleader)
|
|
{
|
|
coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
|
|
coordinate->isWorker = false;
|
|
coordinate->nParticipants = buildstate->ivfleader->nparticipanttuplesorts;
|
|
coordinate->sharedsort = buildstate->ivfleader->sharedsort;
|
|
}
|
|
|
|
/* Begin serial/leader tuplesort */
|
|
buildstate->sortstate = tuplesort_begin_heap(buildstate->tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, maintenance_work_mem, coordinate, false);
|
|
|
|
/* Add tuples to sort */
|
|
if (buildstate->heap != NULL)
|
|
{
|
|
if (buildstate->ivfleader)
|
|
buildstate->reltuples = ParallelHeapScan(buildstate);
|
|
else
|
|
buildstate->reltuples = table_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo,
|
|
true, true, BuildCallback, (void *) buildstate, NULL);
|
|
|
|
#ifdef IVFFLAT_KMEANS_DEBUG
|
|
PrintKmeansMetrics(buildstate);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Create entry pages
|
|
*/
|
|
static void
|
|
CreateEntryPages(IvfflatBuildState * buildstate, ForkNumber forkNum)
|
|
{
|
|
/* Assign */
|
|
IvfflatBench("assign tuples", AssignTuples(buildstate));
|
|
|
|
/* Sort */
|
|
IvfflatBench("sort tuples", tuplesort_performsort(buildstate->sortstate));
|
|
|
|
/* Load */
|
|
IvfflatBench("load tuples", InsertTuples(buildstate->index, buildstate, forkNum));
|
|
|
|
/* End sort */
|
|
tuplesort_end(buildstate->sortstate);
|
|
|
|
/* End parallel build */
|
|
if (buildstate->ivfleader)
|
|
IvfflatEndParallel(buildstate->ivfleader);
|
|
}
|
|
|
|
/*
|
|
* Build the index
|
|
*/
|
|
static void
|
|
BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
|
IvfflatBuildState * buildstate, ForkNumber forkNum)
|
|
{
|
|
InitBuildState(buildstate, heap, index, indexInfo);
|
|
|
|
ComputeCenters(buildstate);
|
|
|
|
/* Create pages */
|
|
CreateMetaPage(index, buildstate->dimensions, buildstate->lists, forkNum);
|
|
CreateListPages(index, buildstate->centers, buildstate->dimensions, buildstate->lists, forkNum, &buildstate->listInfo);
|
|
CreateEntryPages(buildstate, forkNum);
|
|
|
|
FreeBuildState(buildstate);
|
|
}
|
|
|
|
/*
|
|
* Build the index for a logged table
|
|
*/
|
|
IndexBuildResult *
|
|
ivfflatbuild(Relation heap, Relation index, IndexInfo *indexInfo)
|
|
{
|
|
IndexBuildResult *result;
|
|
IvfflatBuildState buildstate;
|
|
|
|
BuildIndex(heap, index, indexInfo, &buildstate, MAIN_FORKNUM);
|
|
|
|
result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
|
|
result->heap_tuples = buildstate.reltuples;
|
|
result->index_tuples = buildstate.indtuples;
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Build the index for an unlogged table
|
|
*/
|
|
void
|
|
ivfflatbuildempty(Relation index)
|
|
{
|
|
IndexInfo *indexInfo = BuildIndexInfo(index);
|
|
IvfflatBuildState buildstate;
|
|
|
|
BuildIndex(NULL, index, indexInfo, &buildstate, INIT_FORKNUM);
|
|
}
|