Added support for parallel index builds

This commit is contained in:
Andrew Kane
2023-07-15 19:52:25 -07:00
parent 21d5d7e934
commit b6a822918f
4 changed files with 568 additions and 27 deletions

View File

@@ -1,3 +1,7 @@
## 0.5.0 (unreleased)
- Added support for parallel index builds
## 0.4.4 (2023-06-12)
- Improved error message for malformed vector literal

View File

@@ -2,11 +2,14 @@
#include <float.h>
#include "access/parallel.h"
#include "access/xact.h"
#include "catalog/index.h"
#include "ivfflat.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/memutils.h"
#include "tcop/tcopprot.h"
#if PG_VERSION_NUM >= 140000
#include "utils/backend_progress.h"
@@ -38,6 +41,25 @@
#define UpdateProgress(index, val) ((void)val)
#endif
#if PG_VERSION_NUM >= 140000
#include "utils/backend_status.h"
#include "utils/wait_event.h"
#endif
#if PG_VERSION_NUM >= 120000
#include "access/table.h"
#include "optimizer/optimizer.h"
#else
#include "access/heapam.h"
#include "optimizer/planner.h"
#include "pgstat.h"
#endif
#define PARALLEL_KEY_IVFFLAT_SHARED UINT64CONST(0xA000000000000001)
#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002)
#define PARALLEL_KEY_IVFFLAT_CENTERS UINT64CONST(0xA000000000000003)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004)
/*
* Add sample
*/
@@ -386,6 +408,8 @@ InitBuildState(IvfflatBuildState * buildstate, Relation heap, Relation index, In
buildstate->listSums = palloc0(sizeof(double) * buildstate->lists);
buildstate->listCounts = palloc0(sizeof(int) * buildstate->lists);
#endif
buildstate->ivfleader = NULL;
}
/*
@@ -531,7 +555,7 @@ PrintKmeansMetrics(IvfflatBuildState * buildstate)
elog(INFO, "inertia: %.3e", buildstate->inertia);
/* Calculate Davies-Bouldin index */
if (buildstate->lists > 1)
if (buildstate->lists > 1 && !buildstate->ivfleader)
{
double db = 0.0;
@@ -566,19 +590,455 @@ PrintKmeansMetrics(IvfflatBuildState * buildstate)
}
#endif
/*
* Within leader, wait for end of heap scan
*/
static double
ParallelHeapScan(IvfflatBuildState * buildstate)
{
IvfflatShared *ivfshared = buildstate->ivfleader->ivfshared;
int nparticipanttuplesorts;
double reltuples;
nparticipanttuplesorts = buildstate->ivfleader->nparticipanttuplesorts;
for (;;)
{
SpinLockAcquire(&ivfshared->mutex);
if (ivfshared->nparticipantsdone == nparticipanttuplesorts)
{
buildstate->indtuples = ivfshared->indtuples;
reltuples = ivfshared->reltuples;
#ifdef IVFFLAT_KMEANS_DEBUG
buildstate->inertia = ivfshared->inertia;
#endif
SpinLockRelease(&ivfshared->mutex);
break;
}
SpinLockRelease(&ivfshared->mutex);
ConditionVariableSleep(&ivfshared->workersdonecv,
WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
}
ConditionVariableCancelSleep();
return reltuples;
}
/*
* Perform a worker's portion of a parallel sort
*/
static void
IvfflatParallelScanAndSort(IvfflatSpool * ivfspool, IvfflatShared * ivfshared, Sharedsort *sharedsort, Vector * ivfcenters, int sortmem, bool progress)
{
SortCoordinate coordinate;
IvfflatBuildState buildstate;
#if PG_VERSION_NUM >= 120000
TableScanDesc scan;
#else
HeapScanDesc scan;
#endif
double reltuples;
IndexInfo *indexInfo;
/* Sort options, which must match AssignTuples */
AttrNumber attNums[] = {1};
Oid sortOperators[] = {Int4LessOperator};
Oid sortCollations[] = {InvalidOid};
bool nullsFirstFlags[] = {false};
/* Initialize local tuplesort coordination state */
coordinate = palloc0(sizeof(SortCoordinateData));
coordinate->isWorker = true;
coordinate->nParticipants = -1;
coordinate->sharedsort = sharedsort;
/* Join parallel scan */
indexInfo = BuildIndexInfo(ivfspool->index);
indexInfo->ii_Concurrent = ivfshared->isconcurrent;
InitBuildState(&buildstate, ivfspool->heap, ivfspool->index, indexInfo);
memcpy(buildstate.centers->items, ivfcenters, VECTOR_SIZE(buildstate.centers->dim) * buildstate.centers->maxlen);
buildstate.centers->length = buildstate.centers->maxlen;
ivfspool->sortstate = tuplesort_begin_heap(buildstate.tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, sortmem, coordinate, false);
buildstate.sortstate = ivfspool->sortstate;
#if PG_VERSION_NUM >= 120000
scan = table_beginscan_parallel(ivfspool->heap,
ParallelTableScanFromIvfflatShared(ivfshared));
reltuples = table_index_build_scan(ivfspool->heap, ivfspool->index, indexInfo,
true, progress, BuildCallback,
(void *) &buildstate, scan);
#else
scan = heap_beginscan_parallel(ivfspool->heap, &ivfshared->heapdesc);
reltuples = IndexBuildHeapScan(ivfspool->heap, ivfspool->index, indexInfo,
true, BuildCallback,
(void *) &buildstate, scan);
#endif
/* Execute this worker's part of the sort */
tuplesort_performsort(ivfspool->sortstate);
/* Record statistics */
SpinLockAcquire(&ivfshared->mutex);
ivfshared->nparticipantsdone++;
ivfshared->reltuples += reltuples;
ivfshared->indtuples += buildstate.indtuples;
#ifdef IVFFLAT_KMEANS_DEBUG
ivfshared->inertia += buildstate.inertia;
#endif
SpinLockRelease(&ivfshared->mutex);
/* Log statistics */
if (progress)
ereport(DEBUG1, (errmsg("leader processed " INT64_FORMAT " tuples", (int64) reltuples)));
else
ereport(DEBUG1, (errmsg("worker processed " INT64_FORMAT " tuples", (int64) reltuples)));
/* Notify leader */
ConditionVariableSignal(&ivfshared->workersdonecv);
/* We can end tuplesorts immediately */
tuplesort_end(ivfspool->sortstate);
FreeBuildState(&buildstate);
}
/*
* Perform work within a launched parallel process
*/
void
IvfflatParallelBuildMain(dsm_segment *seg, shm_toc *toc)
{
char *sharedquery;
IvfflatSpool *ivfspool;
IvfflatShared *ivfshared;
Sharedsort *sharedsort;
Vector *ivfcenters;
Relation heapRel;
Relation indexRel;
LOCKMODE heapLockmode;
LOCKMODE indexLockmode;
int sortmem;
/* Set debug_query_string for individual workers first */
sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
debug_query_string = sharedquery;
/* Report the query string from leader */
pgstat_report_activity(STATE_RUNNING, debug_query_string);
/* Look up shared state */
ivfshared = shm_toc_lookup(toc, PARALLEL_KEY_IVFFLAT_SHARED, false);
/* Open relations using lock modes known to be obtained by index.c */
if (!ivfshared->isconcurrent)
{
heapLockmode = ShareLock;
indexLockmode = AccessExclusiveLock;
}
else
{
heapLockmode = ShareUpdateExclusiveLock;
indexLockmode = RowExclusiveLock;
}
/* Open relations within worker */
#if PG_VERSION_NUM >= 120000
heapRel = table_open(ivfshared->heaprelid, heapLockmode);
#else
heapRel = heap_open(ivfshared->heaprelid, heapLockmode);
#endif
indexRel = index_open(ivfshared->indexrelid, indexLockmode);
/* Initialize worker's own spool */
ivfspool = (IvfflatSpool *) palloc0(sizeof(IvfflatSpool));
ivfspool->heap = heapRel;
ivfspool->index = indexRel;
/* Look up shared state private to tuplesort.c */
sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
tuplesort_attach_shared(sharedsort, seg);
ivfcenters = shm_toc_lookup(toc, PARALLEL_KEY_IVFFLAT_CENTERS, false);
/* Perform sorting */
sortmem = maintenance_work_mem / ivfshared->scantuplesortstates;
IvfflatParallelScanAndSort(ivfspool, ivfshared, sharedsort, ivfcenters, sortmem, false);
/* Close relations within worker */
index_close(indexRel, indexLockmode);
#if PG_VERSION_NUM >= 120000
table_close(heapRel, heapLockmode);
#else
heap_close(heapRel, heapLockmode);
#endif
}
/*
* End parallel build
*/
static void
IvfflatEndParallel(IvfflatLeader * ivfleader)
{
/* Shutdown worker processes */
WaitForParallelWorkersToFinish(ivfleader->pcxt);
/* Free last reference to MVCC snapshot, if one was used */
if (IsMVCCSnapshot(ivfleader->snapshot))
UnregisterSnapshot(ivfleader->snapshot);
DestroyParallelContext(ivfleader->pcxt);
ExitParallelMode();
}
/*
* Return size of shared memory required for parallel index build
*/
static Size
ParallelEstimateShared(Relation heap, Snapshot snapshot)
{
#if PG_VERSION_NUM >= 120000
return add_size(BUFFERALIGN(sizeof(IvfflatShared)), table_parallelscan_estimate(heap, snapshot));
#else
if (!IsMVCCSnapshot(snapshot))
{
Assert(snapshot == SnapshotAny);
return sizeof(IvfflatShared);
}
return add_size(offsetof(IvfflatShared, heapdesc) +
offsetof(ParallelHeapScanDescData, phs_snapshot_data),
EstimateSnapshotSpace(snapshot));
#endif
}
/*
* Within leader, participate as a parallel worker
*/
static void
IvfflatLeaderParticipateAsWorker(IvfflatBuildState * buildstate)
{
IvfflatLeader *ivfleader = buildstate->ivfleader;
IvfflatSpool *leaderworker;
int sortmem;
/* Allocate memory and initialize private spool */
leaderworker = (IvfflatSpool *) palloc0(sizeof(IvfflatSpool));
leaderworker->heap = buildstate->heap;
leaderworker->index = buildstate->index;
/* Perform work common to all participants */
sortmem = maintenance_work_mem / ivfleader->nparticipanttuplesorts;
IvfflatParallelScanAndSort(leaderworker, ivfleader->ivfshared,
ivfleader->sharedsort, ivfleader->ivfcenters,
sortmem, true);
}
/*
* Begin parallel build
*/
static void
IvfflatBeginParallel(IvfflatBuildState * buildstate, bool isconcurrent, int request)
{
ParallelContext *pcxt;
int scantuplesortstates;
Snapshot snapshot;
Size estivfshared;
Size estsort;
Size estcenters;
IvfflatShared *ivfshared;
Sharedsort *sharedsort;
Vector *ivfcenters;
IvfflatLeader *ivfleader = (IvfflatLeader *) palloc0(sizeof(IvfflatLeader));
bool leaderparticipates = true;
int querylen;
#ifdef DISABLE_LEADER_PARTICIPATION
leaderparticipates = false;
#endif
/* Enter parallel mode and create context */
EnterParallelMode();
Assert(request > 0);
#if PG_VERSION_NUM >= 120000
pcxt = CreateParallelContext("vector", "IvfflatParallelBuildMain", request);
#else
pcxt = CreateParallelContext("vector", "IvfflatParallelBuildMain", request, true);
#endif
scantuplesortstates = leaderparticipates ? request + 1 : request;
/* Get snapshot for table scan */
if (!isconcurrent)
snapshot = SnapshotAny;
else
snapshot = RegisterSnapshot(GetTransactionSnapshot());
/* Estimate size of workspaces */
estivfshared = ParallelEstimateShared(buildstate->heap, snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, estivfshared);
estsort = tuplesort_estimate_shared(scantuplesortstates);
shm_toc_estimate_chunk(&pcxt->estimator, estsort);
estcenters = VECTOR_SIZE(buildstate->dimensions) * buildstate->lists;
shm_toc_estimate_chunk(&pcxt->estimator, estcenters);
shm_toc_estimate_keys(&pcxt->estimator, 3);
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
if (debug_query_string)
{
querylen = strlen(debug_query_string);
shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
else
querylen = 0; /* keep compiler quiet */
/* Everyone's had a chance to ask for space, so now create the DSM */
InitializeParallelDSM(pcxt);
/* If no DSM segment was available, back out (do serial build) */
if (pcxt->seg == NULL)
{
if (IsMVCCSnapshot(snapshot))
UnregisterSnapshot(snapshot);
DestroyParallelContext(pcxt);
ExitParallelMode();
return;
}
/* Store shared build state, for which we reserved space */
ivfshared = (IvfflatShared *) shm_toc_allocate(pcxt->toc, estivfshared);
/* Initialize immutable state */
ivfshared->heaprelid = RelationGetRelid(buildstate->heap);
ivfshared->indexrelid = RelationGetRelid(buildstate->index);
ivfshared->isconcurrent = isconcurrent;
ivfshared->scantuplesortstates = scantuplesortstates;
ConditionVariableInit(&ivfshared->workersdonecv);
SpinLockInit(&ivfshared->mutex);
/* Initialize mutable state */
ivfshared->nparticipantsdone = 0;
ivfshared->reltuples = 0;
ivfshared->indtuples = 0;
#ifdef IVFFLAT_KMEANS_DEBUG
ivfshared->inertia = 0;
#endif
#if PG_VERSION_NUM >= 120000
table_parallelscan_initialize(buildstate->heap,
ParallelTableScanFromIvfflatShared(ivfshared),
snapshot);
#else
heap_parallelscan_initialize(&ivfshared->heapdesc, buildstate->heap, snapshot);
#endif
/* Store shared tuplesort-private state, for which we reserved space */
sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
tuplesort_initialize_shared(sharedsort, scantuplesortstates,
pcxt->seg);
ivfcenters = (Vector *) shm_toc_allocate(pcxt->toc, estcenters);
memcpy(ivfcenters, buildstate->centers->items, estcenters);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_IVFFLAT_SHARED, ivfshared);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_IVFFLAT_CENTERS, ivfcenters);
/* Store query string for workers */
if (debug_query_string)
{
char *sharedquery;
sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
memcpy(sharedquery, debug_query_string, querylen + 1);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
}
/* Launch workers, saving status for leader/caller */
LaunchParallelWorkers(pcxt);
ivfleader->pcxt = pcxt;
ivfleader->nparticipanttuplesorts = pcxt->nworkers_launched;
if (leaderparticipates)
ivfleader->nparticipanttuplesorts++;
ivfleader->ivfshared = ivfshared;
ivfleader->sharedsort = sharedsort;
ivfleader->snapshot = snapshot;
ivfleader->ivfcenters = ivfcenters;
/* If no workers were successfully launched, back out (do serial build) */
if (pcxt->nworkers_launched == 0)
{
IvfflatEndParallel(ivfleader);
return;
}
/* Log participants */
ereport(DEBUG1, (errmsg("using %d parallel workers", pcxt->nworkers_launched)));
/* Save leader state now that it's clear build will be parallel */
buildstate->ivfleader = ivfleader;
/* Join heap scan ourselves */
if (leaderparticipates)
IvfflatLeaderParticipateAsWorker(buildstate);
/* Wait for all launched workers */
WaitForParallelWorkersToAttach(pcxt);
}
/*
* Scan table for tuples to index
*/
static void
ScanTable(IvfflatBuildState * buildstate)
AssignTuples(IvfflatBuildState * buildstate)
{
int parallel_workers = 0;
SortCoordinate coordinate = NULL;
/* Sort options, which must match IvfflatParallelScanAndSort */
AttrNumber attNums[] = {1};
Oid sortOperators[] = {Int4LessOperator};
Oid sortCollations[] = {InvalidOid};
bool nullsFirstFlags[] = {false};
UpdateProgress(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_IVFFLAT_PHASE_SORT);
/* Calculate parallel workers */
if (buildstate->heap != NULL)
parallel_workers = plan_create_index_workers(RelationGetRelid(buildstate->heap), RelationGetRelid(buildstate->index));
/* Attempt to launch parallel worker scan when required */
if (parallel_workers > 0)
IvfflatBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers);
/* Set up coordination state if at least one worker launched */
if (buildstate->ivfleader)
{
coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
coordinate->isWorker = false;
coordinate->nParticipants = buildstate->ivfleader->nparticipanttuplesorts;
coordinate->sharedsort = buildstate->ivfleader->sharedsort;
}
/* Begin serial/leader tuplesort */
buildstate->sortstate = tuplesort_begin_heap(buildstate->tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, maintenance_work_mem, coordinate, false);
/* Add tuples to sort */
if (buildstate->heap != NULL)
{
if (buildstate->ivfleader)
buildstate->reltuples = ParallelHeapScan(buildstate);
else
{
#if PG_VERSION_NUM >= 120000
buildstate->reltuples = table_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo,
true, true, BuildCallback, (void *) buildstate, NULL);
buildstate->reltuples = table_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo,
true, true, BuildCallback, (void *) buildstate, NULL);
#else
buildstate->reltuples = IndexBuildHeapScan(buildstate->heap, buildstate->index, buildstate->indexInfo,
true, BuildCallback, (void *) buildstate, NULL);
buildstate->reltuples = IndexBuildHeapScan(buildstate->heap, buildstate->index, buildstate->indexInfo,
true, BuildCallback, (void *) buildstate, NULL);
#endif
}
#ifdef IVFFLAT_KMEANS_DEBUG
PrintKmeansMetrics(buildstate);
#endif
}
}
/*
@@ -587,29 +1047,21 @@ ScanTable(IvfflatBuildState * buildstate)
static void
CreateEntryPages(IvfflatBuildState * buildstate, ForkNumber forkNum)
{
AttrNumber attNums[] = {1};
Oid sortOperators[] = {Int4LessOperator};
Oid sortCollations[] = {InvalidOid};
bool nullsFirstFlags[] = {false};
UpdateProgress(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_IVFFLAT_PHASE_SORT);
buildstate->sortstate = tuplesort_begin_heap(buildstate->tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, maintenance_work_mem, NULL, false);
/* Add tuples to sort */
if (buildstate->heap != NULL)
IvfflatBench("assign tuples", ScanTable(buildstate));
/* Assign */
IvfflatBench("assign tuples", AssignTuples(buildstate));
/* Sort */
IvfflatBench("sort tuples", tuplesort_performsort(buildstate->sortstate));
#ifdef IVFFLAT_KMEANS_DEBUG
PrintKmeansMetrics(buildstate);
#endif
/* Insert */
/* Load */
IvfflatBench("load tuples", InsertTuples(buildstate->index, buildstate, forkNum));
/* End sort */
tuplesort_end(buildstate->sortstate);
/* End parallel build */
if (buildstate->ivfleader)
IvfflatEndParallel(buildstate->ivfleader);
}
/*

View File

@@ -8,6 +8,7 @@
#endif
#include "access/generic_xlog.h"
#include "access/parallel.h"
#include "access/reloptions.h"
#include "nodes/execnodes.h"
#include "port.h" /* for strtof() and random() */
@@ -19,6 +20,10 @@
#include "common/pg_prng.h"
#endif
#if PG_VERSION_NUM < 120000
#include "access/relscan.h"
#endif
#ifdef IVFFLAT_BENCH
#include "portability/instr_time.h"
#endif
@@ -105,6 +110,56 @@ typedef struct IvfflatOptions
int lists; /* number of lists */
} IvfflatOptions;
typedef struct IvfflatSpool
{
Tuplesortstate *sortstate;
Relation heap;
Relation index;
} IvfflatSpool;
typedef struct IvfflatShared
{
/* Immutable state */
Oid heaprelid;
Oid indexrelid;
bool isconcurrent;
int scantuplesortstates;
/* Worker progress */
ConditionVariable workersdonecv;
/* Mutex for mutable state */
slock_t mutex;
/* Mutable state */
int nparticipantsdone;
double reltuples;
double indtuples;
#ifdef IVFFLAT_KMEANS_DEBUG
double inertia;
#endif
#if PG_VERSION_NUM < 120000
ParallelHeapScanDescData heapdesc; /* must come last */
#endif
} IvfflatShared;
#if PG_VERSION_NUM >= 120000
#define ParallelTableScanFromIvfflatShared(shared) \
(ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(IvfflatShared)))
#endif
typedef struct IvfflatLeader
{
ParallelContext *pcxt;
int nparticipanttuplesorts;
IvfflatShared *ivfshared;
Sharedsort *sharedsort;
Snapshot snapshot;
Vector *ivfcenters;
} IvfflatLeader;
typedef struct IvfflatBuildState
{
/* Info */
@@ -150,6 +205,9 @@ typedef struct IvfflatBuildState
/* Memory */
MemoryContext tmpCtx;
/* Parallel builds */
IvfflatLeader *ivfleader;
} IvfflatBuildState;
typedef struct IvfflatMetaPageData
@@ -230,6 +288,7 @@ void IvfflatAppendPage(Relation index, Buffer *buf, Page *page, GenericXLogStat
Buffer IvfflatNewBuffer(Relation index, ForkNumber forkNum);
void IvfflatInitPage(Buffer buf, Page page);
void IvfflatInitRegisterPage(Relation index, Buffer *buf, Page *page, GenericXLogState **state);
PGDLLEXPORT void IvfflatParallelBuildMain(dsm_segment *seg, shm_toc *toc);
/* Index access methods */
IndexBuildResult *ivfflatbuild(Relation heap, Relation index, IndexInfo *indexInfo);

View File

@@ -20,7 +20,7 @@ sub test_recall
SET ivfflat.probes = $probes;
EXPLAIN ANALYZE SELECT i FROM tst ORDER BY v $operator '$queries[0]' LIMIT $limit;
));
like($explain, qr/Index Scan/);
like($explain, qr/Index Scan using idx on tst/);
for my $i (0 .. $#queries) {
my $actual = $node->safe_psql("postgres", qq(
@@ -77,7 +77,6 @@ foreach (@operators) {
push(@expected, $res);
}
# Add index
my $opclass;
if ($operator eq "<->") {
$opclass = "vector_l2_ops";
@@ -86,7 +85,12 @@ foreach (@operators) {
} else {
$opclass = "vector_cosine_ops";
}
$node->safe_psql("postgres", "CREATE INDEX ON tst USING ivfflat (v $opclass);");
# Build index serially
$node->safe_psql("postgres", qq(
SET max_parallel_maintenance_workers = 0;
CREATE INDEX idx ON tst USING ivfflat (v $opclass);
));
# Test approximate results
if ($operator ne "<#>") {
@@ -95,7 +99,29 @@ foreach (@operators) {
test_recall(10, 0.95, $operator);
}
# Account for equal distances
test_recall(100, 0.9975, $operator);
test_recall(100, 0.995, $operator);
$node->safe_psql("postgres", "DROP INDEX idx;");
# Build index in parallel
my ($ret, $stdout, $stderr) = $node->psql("postgres", qq(
SET client_min_messages = DEBUG;
SET min_parallel_table_scan_size = 1;
CREATE INDEX idx ON tst USING ivfflat (v $opclass);
));
is($ret, 0, $stderr);
like($stderr, qr/using \d+ parallel workers/);
# Test approximate results
if ($operator ne "<#>") {
# TODO fix test
test_recall(1, 0.75, $operator);
test_recall(10, 0.95, $operator);
}
# Account for equal distances
test_recall(100, 0.995, $operator);
$node->safe_psql("postgres", "DROP INDEX idx;");
}
done_testing();