From 0950968c11b050ea3610e461715e8bf25f0b3131 Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Sat, 25 Mar 2023 15:51:59 -0700 Subject: [PATCH] Started support for parallel index scan [skip ci] --- src/ivfflat.c | 33 +++++++++++++++++++++++++++++---- test/sql/ivfflat_parallel.sql | 17 +++++++++++++++++ 2 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 test/sql/ivfflat_parallel.sql diff --git a/src/ivfflat.c b/src/ivfflat.c index 677178c..38a4170 100644 --- a/src/ivfflat.c +++ b/src/ivfflat.c @@ -103,7 +103,13 @@ ivfflatcostestimate(PlannerInfo *root, IndexPath *path, double loop_count, if (ratio > 1) ratio = 1; + // cost estimates for parallel workers applied outside of amcostestimate + elog(INFO, "parallel_workers = %d, parallel aware = %d", path->path.parallel_workers, path->path.parallel_aware); + costs.indexTotalCost *= ratio; + costs.numIndexPages *= ratio; + + elog(INFO, "ivfflatcostestimate = %f", costs.indexTotalCost); /* Startup cost and total cost are same */ *indexStartupCost = costs.indexTotalCost; @@ -151,6 +157,25 @@ ivfflatvalidate(Oid opclassoid) return true; } +static Size +ivfflatestimateparallelscan() +{ + elog(INFO, "ivfflatestimateparallelscan"); + return 0; +} + +static void +ivfflatinitparallelscan(void *target) +{ + elog(INFO, "ivfflatinitparallelscan"); +} + +static void +ivfflatparallelrescan(IndexScanDesc scan) +{ + elog(INFO, "ivfflatparallelrescan"); +} + /* * Define index handler * @@ -178,7 +203,7 @@ ivfflathandler(PG_FUNCTION_ARGS) amroutine->amstorage = false; amroutine->amclusterable = false; amroutine->ampredlocks = false; - amroutine->amcanparallel = false; + amroutine->amcanparallel = true; amroutine->amcaninclude = false; #if PG_VERSION_NUM >= 130000 amroutine->amusemaintenanceworkmem = false; /* not used during VACUUM */ @@ -212,9 +237,9 @@ ivfflathandler(PG_FUNCTION_ARGS) amroutine->amrestrpos = NULL; /* Interface functions to support parallel index scans */ - amroutine->amestimateparallelscan = NULL; - amroutine->aminitparallelscan = NULL; - amroutine->amparallelrescan = NULL; + amroutine->amestimateparallelscan = ivfflatestimateparallelscan; + amroutine->aminitparallelscan = ivfflatinitparallelscan; + amroutine->amparallelrescan = ivfflatparallelrescan; PG_RETURN_POINTER(amroutine); } diff --git a/test/sql/ivfflat_parallel.sql b/test/sql/ivfflat_parallel.sql new file mode 100644 index 0000000..99d313a --- /dev/null +++ b/test/sql/ivfflat_parallel.sql @@ -0,0 +1,17 @@ +SET client_min_messages = warning; +CREATE EXTENSION IF NOT EXISTS vector; +--SET force_parallel_mode = on; +SET parallel_setup_cost = 10; +SET parallel_tuple_cost = 0.001; +SET min_parallel_table_scan_size = 0; +SET min_parallel_index_scan_size = 0; + +CREATE TABLE t (id integer, val vector(3)); +INSERT INTO t (id, val) SELECT n, ARRAY[random(), random(), random()] FROM generate_series(1,1000000) n; +CREATE INDEX ON t USING ivfflat (val) WITH (lists = 10); +SET ivfflat.probes = 2; + +EXPLAIN SELECT * FROM t ORDER BY val <-> '[0.5,0.5,0.5]' LIMIT 5; +SELECT * FROM t ORDER BY val <-> '[0.5,0.5,0.5]' LIMIT 5; + +DROP TABLE t;