From f6058beeda3fb6d0783ada1da83c57d565d11c98 Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Mon, 22 Sep 2025 11:21:14 -0700 Subject: [PATCH] Started support for parallel index scan [skip ci] --- src/ivfflat.c | 31 +++++++++++++++++++++++++++---- src/ivfscan.c | 3 +++ test/sql/ivfflat_parallel.sql | 16 ++++++++++++++++ 3 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 test/sql/ivfflat_parallel.sql diff --git a/src/ivfflat.c b/src/ivfflat.c index 31c2f7d..f5bf7f5 100644 --- a/src/ivfflat.c +++ b/src/ivfflat.c @@ -143,6 +143,10 @@ ivfflatcostestimate(PlannerInfo *root, IndexPath *path, double loop_count, *indexSelectivity = costs.indexSelectivity; *indexCorrelation = costs.indexCorrelation; *indexPages = costs.numIndexPages; + + elog(INFO, "ivfflatcostestimate = %f", costs.indexTotalCost); + /* Cost estimates for parallel workers applied outside of amcostestimate */ + elog(INFO, "parallel_workers = %d, parallel aware = %d", path->path.parallel_workers, path->path.parallel_aware); } /* @@ -170,6 +174,25 @@ ivfflatvalidate(Oid opclassoid) return true; } +static Size +ivfflatestimateparallelscan() +{ + elog(INFO, "ivfflatestimateparallelscan"); + return 0; +} + +static void +ivfflatinitparallelscan(void *target) +{ + elog(INFO, "ivfflatinitparallelscan"); +} + +static void +ivfflatparallelrescan(IndexScanDesc scan) +{ + elog(INFO, "ivfflatparallelrescan"); +} + /* * Define index handler * @@ -200,7 +223,7 @@ ivfflathandler(PG_FUNCTION_ARGS) amroutine->amstorage = false; amroutine->amclusterable = false; amroutine->ampredlocks = false; - amroutine->amcanparallel = false; + amroutine->amcanparallel = true; #if PG_VERSION_NUM >= 170000 amroutine->amcanbuildparallel = true; #endif @@ -242,9 +265,9 @@ ivfflathandler(PG_FUNCTION_ARGS) amroutine->amrestrpos = NULL; /* Interface functions to support parallel index scans */ - amroutine->amestimateparallelscan = NULL; - amroutine->aminitparallelscan = NULL; - amroutine->amparallelrescan = NULL; + amroutine->amestimateparallelscan = ivfflatestimateparallelscan; + amroutine->aminitparallelscan = ivfflatinitparallelscan; + amroutine->amparallelrescan = ivfflatparallelrescan; #if PG_VERSION_NUM >= 180000 amroutine->amtranslatestrategy = NULL; diff --git a/src/ivfscan.c b/src/ivfscan.c index 6cc5d2e..217ebb6 100644 --- a/src/ivfscan.c +++ b/src/ivfscan.c @@ -119,6 +119,9 @@ GetScanItems(IndexScanDesc scan, Datum value) tuplesort_reset(so->sortstate); + if (scan->parallel_scan != NULL) + elog(INFO, "parallel scan"); + /* Search closest probes lists */ while (so->listIndex < so->maxProbes && (++batchProbes) <= so->probes) { diff --git a/test/sql/ivfflat_parallel.sql b/test/sql/ivfflat_parallel.sql new file mode 100644 index 0000000..351742b --- /dev/null +++ b/test/sql/ivfflat_parallel.sql @@ -0,0 +1,16 @@ +-- SET force_parallel_mode = on; +SET parallel_setup_cost = 10; +SET parallel_tuple_cost = 0.000001; +SET min_parallel_table_scan_size = 1; +SET min_parallel_index_scan_size = 1; + +CREATE TABLE t (id integer, val vector(3)); +ALTER TABLE t ALTER COLUMN val SET STORAGE PLAIN; +INSERT INTO t (id, val) SELECT n, ARRAY[random(), random(), random()] FROM generate_series(1,1000000) n; +CREATE INDEX ON t USING ivfflat (val) WITH (lists = 10); +SET ivfflat.probes = 4; + +EXPLAIN SELECT * FROM t ORDER BY val <-> '[0.5,0.5,0.5]' LIMIT 5; +SELECT * FROM t ORDER BY val <-> '[0.5,0.5,0.5]' LIMIT 5; + +DROP TABLE t;