From a7b6be73e155b858ac3f6bcf8376dd6b6bb394a6 Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Tue, 16 Jan 2024 21:59:11 -0800 Subject: [PATCH] Use DSA --- src/hnsw.h | 1 + src/hnswbuild.c | 42 +++++++++++++++++++++++++++++------------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/src/hnsw.h b/src/hnsw.h index d055368..e900f2b 100644 --- a/src/hnsw.h +++ b/src/hnsw.h @@ -213,6 +213,7 @@ typedef struct HnswShared Oid indexrelid; bool isconcurrent; int scantuplesortstates; + dsa_pointer dsaptr; /* Worker progress */ ConditionVariable workersdonecv; diff --git a/src/hnswbuild.c b/src/hnswbuild.c index 9021591..3e068ba 100644 --- a/src/hnswbuild.c +++ b/src/hnswbuild.c @@ -768,6 +768,8 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc) char *sharedquery; HnswSpool *hnswspool; HnswShared *hnswshared; + dsa_area *dsa; + void *dsaspace; char *hnswarea; Relation heapRel; Relation indexRel; @@ -809,7 +811,9 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc) hnswspool->heap = heapRel; hnswspool->index = indexRel; - hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false); + dsaspace = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false); + dsa = dsa_attach_in_place(dsaspace, seg); + hnswarea = dsa_get_address(dsa, hnswshared->dsaptr); /* Perform inserts */ HnswParallelScanAndInsert(hnswspool, hnswshared, hnswarea, false); @@ -889,8 +893,10 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) Snapshot snapshot; Size esthnswshared; Size esthnswarea; - Size estother; + Size estdsaspace; HnswShared *hnswshared; + void *dsaspace; + dsa_area *dsa; char *hnswarea; HnswLeader *hnswleader = (HnswLeader *) palloc0(sizeof(HnswLeader)); bool leaderparticipates = true; @@ -921,15 +927,9 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) esthnswshared = ParallelEstimateShared(buildstate->heap, snapshot); shm_toc_estimate_chunk(&pcxt->estimator, esthnswshared); - /* Leave space for other objects in shared memory */ - /* Docker has a default limit of 64 MB for shm_size */ - /* which happens to be the default value of maintenance_work_mem */ - esthnswarea = maintenance_work_mem * 1024L; - estother = 2 * 1024 * 1024; - if (esthnswarea > estother) - esthnswarea -= estother; - - shm_toc_estimate_chunk(&pcxt->estimator, esthnswarea); + /* Start with a minimal DSA so InitializeParallelDSM does not fail */ + estdsaspace = dsa_minimum_size(); + shm_toc_estimate_chunk(&pcxt->estimator, estdsaspace); shm_toc_estimate_keys(&pcxt->estimator, 2); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ @@ -975,12 +975,28 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request) heap_parallelscan_initialize(&hnswshared->heapdesc, buildstate->heap, snapshot); #endif - hnswarea = (char *) shm_toc_allocate(pcxt->toc, esthnswarea); + dsaspace = shm_toc_allocate(pcxt->toc, estdsaspace); + dsa = dsa_create_in_place(dsaspace, estdsaspace, hnsw_lock_tranche_id, pcxt->seg); + + esthnswarea = maintenance_work_mem * 1024L; + hnswshared->dsaptr = dsa_allocate_extended(dsa, esthnswarea, DSA_ALLOC_HUGE | DSA_ALLOC_NO_OOM); + + /* If not enough shared memory, back out (do serial build) */ + if (!DsaPointerIsValid(hnswshared->dsaptr)) + { + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); + return; + } + + hnswarea = dsa_get_address(dsa, hnswshared->dsaptr); /* Report less than allocated so never fails */ InitGraph(&hnswshared->graphData, hnswarea, esthnswarea - 1024 * 1024); shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_SHARED, hnswshared); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_AREA, hnswarea); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_AREA, dsaspace); /* Store query string for workers */ if (debug_query_string)