From b6a822918ff5cff1d769da32051cb3b989f12597 Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Sat, 15 Jul 2023 19:52:25 -0700 Subject: [PATCH] Added support for parallel index builds --- CHANGELOG.md | 4 + src/ivfbuild.c | 498 +++++++++++++++++++++++++++++++++++++++++-- src/ivfflat.h | 59 +++++ test/t/003_recall.pl | 34 ++- 4 files changed, 568 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7944166..79f4eb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.5.0 (unreleased) + +- Added support for parallel index builds + ## 0.4.4 (2023-06-12) - Improved error message for malformed vector literal diff --git a/src/ivfbuild.c b/src/ivfbuild.c index cb0b069..2cd4a20 100644 --- a/src/ivfbuild.c +++ b/src/ivfbuild.c @@ -2,11 +2,14 @@ #include +#include "access/parallel.h" +#include "access/xact.h" #include "catalog/index.h" #include "ivfflat.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "utils/memutils.h" +#include "tcop/tcopprot.h" #if PG_VERSION_NUM >= 140000 #include "utils/backend_progress.h" @@ -38,6 +41,25 @@ #define UpdateProgress(index, val) ((void)val) #endif +#if PG_VERSION_NUM >= 140000 +#include "utils/backend_status.h" +#include "utils/wait_event.h" +#endif + +#if PG_VERSION_NUM >= 120000 +#include "access/table.h" +#include "optimizer/optimizer.h" +#else +#include "access/heapam.h" +#include "optimizer/planner.h" +#include "pgstat.h" +#endif + +#define PARALLEL_KEY_IVFFLAT_SHARED UINT64CONST(0xA000000000000001) +#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002) +#define PARALLEL_KEY_IVFFLAT_CENTERS UINT64CONST(0xA000000000000003) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004) + /* * Add sample */ @@ -386,6 +408,8 @@ InitBuildState(IvfflatBuildState * buildstate, Relation heap, Relation index, In buildstate->listSums = palloc0(sizeof(double) * buildstate->lists); buildstate->listCounts = palloc0(sizeof(int) * buildstate->lists); #endif + + buildstate->ivfleader = NULL; } /* @@ -531,7 +555,7 @@ PrintKmeansMetrics(IvfflatBuildState * buildstate) elog(INFO, "inertia: %.3e", buildstate->inertia); /* Calculate Davies-Bouldin index */ - if (buildstate->lists > 1) + if (buildstate->lists > 1 && !buildstate->ivfleader) { double db = 0.0; @@ -566,19 +590,455 @@ PrintKmeansMetrics(IvfflatBuildState * buildstate) } #endif +/* + * Within leader, wait for end of heap scan + */ +static double +ParallelHeapScan(IvfflatBuildState * buildstate) +{ + IvfflatShared *ivfshared = buildstate->ivfleader->ivfshared; + int nparticipanttuplesorts; + double reltuples; + + nparticipanttuplesorts = buildstate->ivfleader->nparticipanttuplesorts; + for (;;) + { + SpinLockAcquire(&ivfshared->mutex); + if (ivfshared->nparticipantsdone == nparticipanttuplesorts) + { + buildstate->indtuples = ivfshared->indtuples; + reltuples = ivfshared->reltuples; +#ifdef IVFFLAT_KMEANS_DEBUG + buildstate->inertia = ivfshared->inertia; +#endif + SpinLockRelease(&ivfshared->mutex); + break; + } + SpinLockRelease(&ivfshared->mutex); + + ConditionVariableSleep(&ivfshared->workersdonecv, + WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); + } + + ConditionVariableCancelSleep(); + + return reltuples; +} + +/* + * Perform a worker's portion of a parallel sort + */ +static void +IvfflatParallelScanAndSort(IvfflatSpool * ivfspool, IvfflatShared * ivfshared, Sharedsort *sharedsort, Vector * ivfcenters, int sortmem, bool progress) +{ + SortCoordinate coordinate; + IvfflatBuildState buildstate; +#if PG_VERSION_NUM >= 120000 + TableScanDesc scan; +#else + HeapScanDesc scan; +#endif + double reltuples; + IndexInfo *indexInfo; + + /* Sort options, which must match AssignTuples */ + AttrNumber attNums[] = {1}; + Oid sortOperators[] = {Int4LessOperator}; + Oid sortCollations[] = {InvalidOid}; + bool nullsFirstFlags[] = {false}; + + /* Initialize local tuplesort coordination state */ + coordinate = palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = true; + coordinate->nParticipants = -1; + coordinate->sharedsort = sharedsort; + + /* Join parallel scan */ + indexInfo = BuildIndexInfo(ivfspool->index); + indexInfo->ii_Concurrent = ivfshared->isconcurrent; + InitBuildState(&buildstate, ivfspool->heap, ivfspool->index, indexInfo); + memcpy(buildstate.centers->items, ivfcenters, VECTOR_SIZE(buildstate.centers->dim) * buildstate.centers->maxlen); + buildstate.centers->length = buildstate.centers->maxlen; + ivfspool->sortstate = tuplesort_begin_heap(buildstate.tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, sortmem, coordinate, false); + buildstate.sortstate = ivfspool->sortstate; +#if PG_VERSION_NUM >= 120000 + scan = table_beginscan_parallel(ivfspool->heap, + ParallelTableScanFromIvfflatShared(ivfshared)); + reltuples = table_index_build_scan(ivfspool->heap, ivfspool->index, indexInfo, + true, progress, BuildCallback, + (void *) &buildstate, scan); +#else + scan = heap_beginscan_parallel(ivfspool->heap, &ivfshared->heapdesc); + reltuples = IndexBuildHeapScan(ivfspool->heap, ivfspool->index, indexInfo, + true, BuildCallback, + (void *) &buildstate, scan); +#endif + + /* Execute this worker's part of the sort */ + tuplesort_performsort(ivfspool->sortstate); + + /* Record statistics */ + SpinLockAcquire(&ivfshared->mutex); + ivfshared->nparticipantsdone++; + ivfshared->reltuples += reltuples; + ivfshared->indtuples += buildstate.indtuples; +#ifdef IVFFLAT_KMEANS_DEBUG + ivfshared->inertia += buildstate.inertia; +#endif + SpinLockRelease(&ivfshared->mutex); + + /* Log statistics */ + if (progress) + ereport(DEBUG1, (errmsg("leader processed " INT64_FORMAT " tuples", (int64) reltuples))); + else + ereport(DEBUG1, (errmsg("worker processed " INT64_FORMAT " tuples", (int64) reltuples))); + + /* Notify leader */ + ConditionVariableSignal(&ivfshared->workersdonecv); + + /* We can end tuplesorts immediately */ + tuplesort_end(ivfspool->sortstate); + + FreeBuildState(&buildstate); +} + +/* + * Perform work within a launched parallel process + */ +void +IvfflatParallelBuildMain(dsm_segment *seg, shm_toc *toc) +{ + char *sharedquery; + IvfflatSpool *ivfspool; + IvfflatShared *ivfshared; + Sharedsort *sharedsort; + Vector *ivfcenters; + Relation heapRel; + Relation indexRel; + LOCKMODE heapLockmode; + LOCKMODE indexLockmode; + int sortmem; + + /* Set debug_query_string for individual workers first */ + sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + + /* Report the query string from leader */ + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* Look up shared state */ + ivfshared = shm_toc_lookup(toc, PARALLEL_KEY_IVFFLAT_SHARED, false); + + /* Open relations using lock modes known to be obtained by index.c */ + if (!ivfshared->isconcurrent) + { + heapLockmode = ShareLock; + indexLockmode = AccessExclusiveLock; + } + else + { + heapLockmode = ShareUpdateExclusiveLock; + indexLockmode = RowExclusiveLock; + } + + /* Open relations within worker */ +#if PG_VERSION_NUM >= 120000 + heapRel = table_open(ivfshared->heaprelid, heapLockmode); +#else + heapRel = heap_open(ivfshared->heaprelid, heapLockmode); +#endif + indexRel = index_open(ivfshared->indexrelid, indexLockmode); + + /* Initialize worker's own spool */ + ivfspool = (IvfflatSpool *) palloc0(sizeof(IvfflatSpool)); + ivfspool->heap = heapRel; + ivfspool->index = indexRel; + + /* Look up shared state private to tuplesort.c */ + sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false); + tuplesort_attach_shared(sharedsort, seg); + + ivfcenters = shm_toc_lookup(toc, PARALLEL_KEY_IVFFLAT_CENTERS, false); + + /* Perform sorting */ + sortmem = maintenance_work_mem / ivfshared->scantuplesortstates; + IvfflatParallelScanAndSort(ivfspool, ivfshared, sharedsort, ivfcenters, sortmem, false); + + /* Close relations within worker */ + index_close(indexRel, indexLockmode); +#if PG_VERSION_NUM >= 120000 + table_close(heapRel, heapLockmode); +#else + heap_close(heapRel, heapLockmode); +#endif +} + +/* + * End parallel build + */ +static void +IvfflatEndParallel(IvfflatLeader * ivfleader) +{ + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(ivfleader->pcxt); + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(ivfleader->snapshot)) + UnregisterSnapshot(ivfleader->snapshot); + DestroyParallelContext(ivfleader->pcxt); + ExitParallelMode(); +} + +/* + * Return size of shared memory required for parallel index build + */ +static Size +ParallelEstimateShared(Relation heap, Snapshot snapshot) +{ +#if PG_VERSION_NUM >= 120000 + return add_size(BUFFERALIGN(sizeof(IvfflatShared)), table_parallelscan_estimate(heap, snapshot)); +#else + if (!IsMVCCSnapshot(snapshot)) + { + Assert(snapshot == SnapshotAny); + return sizeof(IvfflatShared); + } + + return add_size(offsetof(IvfflatShared, heapdesc) + + offsetof(ParallelHeapScanDescData, phs_snapshot_data), + EstimateSnapshotSpace(snapshot)); +#endif +} + +/* + * Within leader, participate as a parallel worker + */ +static void +IvfflatLeaderParticipateAsWorker(IvfflatBuildState * buildstate) +{ + IvfflatLeader *ivfleader = buildstate->ivfleader; + IvfflatSpool *leaderworker; + int sortmem; + + /* Allocate memory and initialize private spool */ + leaderworker = (IvfflatSpool *) palloc0(sizeof(IvfflatSpool)); + leaderworker->heap = buildstate->heap; + leaderworker->index = buildstate->index; + + /* Perform work common to all participants */ + sortmem = maintenance_work_mem / ivfleader->nparticipanttuplesorts; + IvfflatParallelScanAndSort(leaderworker, ivfleader->ivfshared, + ivfleader->sharedsort, ivfleader->ivfcenters, + sortmem, true); +} + +/* + * Begin parallel build + */ +static void +IvfflatBeginParallel(IvfflatBuildState * buildstate, bool isconcurrent, int request) +{ + ParallelContext *pcxt; + int scantuplesortstates; + Snapshot snapshot; + Size estivfshared; + Size estsort; + Size estcenters; + IvfflatShared *ivfshared; + Sharedsort *sharedsort; + Vector *ivfcenters; + IvfflatLeader *ivfleader = (IvfflatLeader *) palloc0(sizeof(IvfflatLeader)); + bool leaderparticipates = true; + int querylen; + +#ifdef DISABLE_LEADER_PARTICIPATION + leaderparticipates = false; +#endif + + /* Enter parallel mode and create context */ + EnterParallelMode(); + Assert(request > 0); +#if PG_VERSION_NUM >= 120000 + pcxt = CreateParallelContext("vector", "IvfflatParallelBuildMain", request); +#else + pcxt = CreateParallelContext("vector", "IvfflatParallelBuildMain", request, true); +#endif + + scantuplesortstates = leaderparticipates ? request + 1 : request; + + /* Get snapshot for table scan */ + if (!isconcurrent) + snapshot = SnapshotAny; + else + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + + /* Estimate size of workspaces */ + estivfshared = ParallelEstimateShared(buildstate->heap, snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, estivfshared); + estsort = tuplesort_estimate_shared(scantuplesortstates); + shm_toc_estimate_chunk(&pcxt->estimator, estsort); + estcenters = VECTOR_SIZE(buildstate->dimensions) * buildstate->lists; + shm_toc_estimate_chunk(&pcxt->estimator, estcenters); + shm_toc_estimate_keys(&pcxt->estimator, 3); + + /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + /* Everyone's had a chance to ask for space, so now create the DSM */ + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial build) */ + if (pcxt->seg == NULL) + { + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); + return; + } + + /* Store shared build state, for which we reserved space */ + ivfshared = (IvfflatShared *) shm_toc_allocate(pcxt->toc, estivfshared); + /* Initialize immutable state */ + ivfshared->heaprelid = RelationGetRelid(buildstate->heap); + ivfshared->indexrelid = RelationGetRelid(buildstate->index); + ivfshared->isconcurrent = isconcurrent; + ivfshared->scantuplesortstates = scantuplesortstates; + ConditionVariableInit(&ivfshared->workersdonecv); + SpinLockInit(&ivfshared->mutex); + /* Initialize mutable state */ + ivfshared->nparticipantsdone = 0; + ivfshared->reltuples = 0; + ivfshared->indtuples = 0; +#ifdef IVFFLAT_KMEANS_DEBUG + ivfshared->inertia = 0; +#endif +#if PG_VERSION_NUM >= 120000 + table_parallelscan_initialize(buildstate->heap, + ParallelTableScanFromIvfflatShared(ivfshared), + snapshot); +#else + heap_parallelscan_initialize(&ivfshared->heapdesc, buildstate->heap, snapshot); +#endif + + /* Store shared tuplesort-private state, for which we reserved space */ + sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort); + tuplesort_initialize_shared(sharedsort, scantuplesortstates, + pcxt->seg); + + ivfcenters = (Vector *) shm_toc_allocate(pcxt->toc, estcenters); + memcpy(ivfcenters, buildstate->centers->items, estcenters); + + shm_toc_insert(pcxt->toc, PARALLEL_KEY_IVFFLAT_SHARED, ivfshared); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_IVFFLAT_CENTERS, ivfcenters); + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); + } + + /* Launch workers, saving status for leader/caller */ + LaunchParallelWorkers(pcxt); + ivfleader->pcxt = pcxt; + ivfleader->nparticipanttuplesorts = pcxt->nworkers_launched; + if (leaderparticipates) + ivfleader->nparticipanttuplesorts++; + ivfleader->ivfshared = ivfshared; + ivfleader->sharedsort = sharedsort; + ivfleader->snapshot = snapshot; + ivfleader->ivfcenters = ivfcenters; + + /* If no workers were successfully launched, back out (do serial build) */ + if (pcxt->nworkers_launched == 0) + { + IvfflatEndParallel(ivfleader); + return; + } + + /* Log participants */ + ereport(DEBUG1, (errmsg("using %d parallel workers", pcxt->nworkers_launched))); + + /* Save leader state now that it's clear build will be parallel */ + buildstate->ivfleader = ivfleader; + + /* Join heap scan ourselves */ + if (leaderparticipates) + IvfflatLeaderParticipateAsWorker(buildstate); + + /* Wait for all launched workers */ + WaitForParallelWorkersToAttach(pcxt); +} + /* * Scan table for tuples to index */ static void -ScanTable(IvfflatBuildState * buildstate) +AssignTuples(IvfflatBuildState * buildstate) { + int parallel_workers = 0; + SortCoordinate coordinate = NULL; + + /* Sort options, which must match IvfflatParallelScanAndSort */ + AttrNumber attNums[] = {1}; + Oid sortOperators[] = {Int4LessOperator}; + Oid sortCollations[] = {InvalidOid}; + bool nullsFirstFlags[] = {false}; + + UpdateProgress(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_IVFFLAT_PHASE_SORT); + + /* Calculate parallel workers */ + if (buildstate->heap != NULL) + parallel_workers = plan_create_index_workers(RelationGetRelid(buildstate->heap), RelationGetRelid(buildstate->index)); + + /* Attempt to launch parallel worker scan when required */ + if (parallel_workers > 0) + IvfflatBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers); + + /* Set up coordination state if at least one worker launched */ + if (buildstate->ivfleader) + { + coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = false; + coordinate->nParticipants = buildstate->ivfleader->nparticipanttuplesorts; + coordinate->sharedsort = buildstate->ivfleader->sharedsort; + } + + /* Begin serial/leader tuplesort */ + buildstate->sortstate = tuplesort_begin_heap(buildstate->tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, maintenance_work_mem, coordinate, false); + + /* Add tuples to sort */ + if (buildstate->heap != NULL) + { + if (buildstate->ivfleader) + buildstate->reltuples = ParallelHeapScan(buildstate); + else + { #if PG_VERSION_NUM >= 120000 - buildstate->reltuples = table_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo, - true, true, BuildCallback, (void *) buildstate, NULL); + buildstate->reltuples = table_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo, + true, true, BuildCallback, (void *) buildstate, NULL); #else - buildstate->reltuples = IndexBuildHeapScan(buildstate->heap, buildstate->index, buildstate->indexInfo, - true, BuildCallback, (void *) buildstate, NULL); + buildstate->reltuples = IndexBuildHeapScan(buildstate->heap, buildstate->index, buildstate->indexInfo, + true, BuildCallback, (void *) buildstate, NULL); #endif + } + +#ifdef IVFFLAT_KMEANS_DEBUG + PrintKmeansMetrics(buildstate); +#endif + } } /* @@ -587,29 +1047,21 @@ ScanTable(IvfflatBuildState * buildstate) static void CreateEntryPages(IvfflatBuildState * buildstate, ForkNumber forkNum) { - AttrNumber attNums[] = {1}; - Oid sortOperators[] = {Int4LessOperator}; - Oid sortCollations[] = {InvalidOid}; - bool nullsFirstFlags[] = {false}; - - UpdateProgress(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_IVFFLAT_PHASE_SORT); - - buildstate->sortstate = tuplesort_begin_heap(buildstate->tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, maintenance_work_mem, NULL, false); - - /* Add tuples to sort */ - if (buildstate->heap != NULL) - IvfflatBench("assign tuples", ScanTable(buildstate)); + /* Assign */ + IvfflatBench("assign tuples", AssignTuples(buildstate)); /* Sort */ IvfflatBench("sort tuples", tuplesort_performsort(buildstate->sortstate)); -#ifdef IVFFLAT_KMEANS_DEBUG - PrintKmeansMetrics(buildstate); -#endif - - /* Insert */ + /* Load */ IvfflatBench("load tuples", InsertTuples(buildstate->index, buildstate, forkNum)); + + /* End sort */ tuplesort_end(buildstate->sortstate); + + /* End parallel build */ + if (buildstate->ivfleader) + IvfflatEndParallel(buildstate->ivfleader); } /* diff --git a/src/ivfflat.h b/src/ivfflat.h index 5bd7622..74bbe6e 100644 --- a/src/ivfflat.h +++ b/src/ivfflat.h @@ -8,6 +8,7 @@ #endif #include "access/generic_xlog.h" +#include "access/parallel.h" #include "access/reloptions.h" #include "nodes/execnodes.h" #include "port.h" /* for strtof() and random() */ @@ -19,6 +20,10 @@ #include "common/pg_prng.h" #endif +#if PG_VERSION_NUM < 120000 +#include "access/relscan.h" +#endif + #ifdef IVFFLAT_BENCH #include "portability/instr_time.h" #endif @@ -105,6 +110,56 @@ typedef struct IvfflatOptions int lists; /* number of lists */ } IvfflatOptions; +typedef struct IvfflatSpool +{ + Tuplesortstate *sortstate; + Relation heap; + Relation index; +} IvfflatSpool; + +typedef struct IvfflatShared +{ + /* Immutable state */ + Oid heaprelid; + Oid indexrelid; + bool isconcurrent; + int scantuplesortstates; + + /* Worker progress */ + ConditionVariable workersdonecv; + + /* Mutex for mutable state */ + slock_t mutex; + + /* Mutable state */ + int nparticipantsdone; + double reltuples; + double indtuples; + +#ifdef IVFFLAT_KMEANS_DEBUG + double inertia; +#endif + +#if PG_VERSION_NUM < 120000 + ParallelHeapScanDescData heapdesc; /* must come last */ +#endif +} IvfflatShared; + +#if PG_VERSION_NUM >= 120000 +#define ParallelTableScanFromIvfflatShared(shared) \ + (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(IvfflatShared))) +#endif + +typedef struct IvfflatLeader +{ + ParallelContext *pcxt; + int nparticipanttuplesorts; + IvfflatShared *ivfshared; + Sharedsort *sharedsort; + Snapshot snapshot; + Vector *ivfcenters; +} IvfflatLeader; + typedef struct IvfflatBuildState { /* Info */ @@ -150,6 +205,9 @@ typedef struct IvfflatBuildState /* Memory */ MemoryContext tmpCtx; + + /* Parallel builds */ + IvfflatLeader *ivfleader; } IvfflatBuildState; typedef struct IvfflatMetaPageData @@ -230,6 +288,7 @@ void IvfflatAppendPage(Relation index, Buffer *buf, Page *page, GenericXLogStat Buffer IvfflatNewBuffer(Relation index, ForkNumber forkNum); void IvfflatInitPage(Buffer buf, Page page); void IvfflatInitRegisterPage(Relation index, Buffer *buf, Page *page, GenericXLogState **state); +PGDLLEXPORT void IvfflatParallelBuildMain(dsm_segment *seg, shm_toc *toc); /* Index access methods */ IndexBuildResult *ivfflatbuild(Relation heap, Relation index, IndexInfo *indexInfo); diff --git a/test/t/003_recall.pl b/test/t/003_recall.pl index 8e7042a..88c620f 100644 --- a/test/t/003_recall.pl +++ b/test/t/003_recall.pl @@ -20,7 +20,7 @@ sub test_recall SET ivfflat.probes = $probes; EXPLAIN ANALYZE SELECT i FROM tst ORDER BY v $operator '$queries[0]' LIMIT $limit; )); - like($explain, qr/Index Scan/); + like($explain, qr/Index Scan using idx on tst/); for my $i (0 .. $#queries) { my $actual = $node->safe_psql("postgres", qq( @@ -77,7 +77,6 @@ foreach (@operators) { push(@expected, $res); } - # Add index my $opclass; if ($operator eq "<->") { $opclass = "vector_l2_ops"; @@ -86,7 +85,12 @@ foreach (@operators) { } else { $opclass = "vector_cosine_ops"; } - $node->safe_psql("postgres", "CREATE INDEX ON tst USING ivfflat (v $opclass);"); + + # Build index serially + $node->safe_psql("postgres", qq( + SET max_parallel_maintenance_workers = 0; + CREATE INDEX idx ON tst USING ivfflat (v $opclass); + )); # Test approximate results if ($operator ne "<#>") { @@ -95,7 +99,29 @@ foreach (@operators) { test_recall(10, 0.95, $operator); } # Account for equal distances - test_recall(100, 0.9975, $operator); + test_recall(100, 0.995, $operator); + + $node->safe_psql("postgres", "DROP INDEX idx;"); + + # Build index in parallel + my ($ret, $stdout, $stderr) = $node->psql("postgres", qq( + SET client_min_messages = DEBUG; + SET min_parallel_table_scan_size = 1; + CREATE INDEX idx ON tst USING ivfflat (v $opclass); + )); + is($ret, 0, $stderr); + like($stderr, qr/using \d+ parallel workers/); + + # Test approximate results + if ($operator ne "<#>") { + # TODO fix test + test_recall(1, 0.75, $operator); + test_recall(10, 0.95, $operator); + } + # Account for equal distances + test_recall(100, 0.995, $operator); + + $node->safe_psql("postgres", "DROP INDEX idx;"); } done_testing();