This commit is contained in:
Andrew Kane
2024-01-16 21:59:11 -08:00
parent d801a843f4
commit a7b6be73e1
2 changed files with 30 additions and 13 deletions

View File

@@ -213,6 +213,7 @@ typedef struct HnswShared
Oid indexrelid;
bool isconcurrent;
int scantuplesortstates;
dsa_pointer dsaptr;
/* Worker progress */
ConditionVariable workersdonecv;

View File

@@ -768,6 +768,8 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
char *sharedquery;
HnswSpool *hnswspool;
HnswShared *hnswshared;
dsa_area *dsa;
void *dsaspace;
char *hnswarea;
Relation heapRel;
Relation indexRel;
@@ -809,7 +811,9 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
hnswspool->heap = heapRel;
hnswspool->index = indexRel;
hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
dsaspace = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
dsa = dsa_attach_in_place(dsaspace, seg);
hnswarea = dsa_get_address(dsa, hnswshared->dsaptr);
/* Perform inserts */
HnswParallelScanAndInsert(hnswspool, hnswshared, hnswarea, false);
@@ -889,8 +893,10 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request)
Snapshot snapshot;
Size esthnswshared;
Size esthnswarea;
Size estother;
Size estdsaspace;
HnswShared *hnswshared;
void *dsaspace;
dsa_area *dsa;
char *hnswarea;
HnswLeader *hnswleader = (HnswLeader *) palloc0(sizeof(HnswLeader));
bool leaderparticipates = true;
@@ -921,15 +927,9 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request)
esthnswshared = ParallelEstimateShared(buildstate->heap, snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, esthnswshared);
/* Leave space for other objects in shared memory */
/* Docker has a default limit of 64 MB for shm_size */
/* which happens to be the default value of maintenance_work_mem */
esthnswarea = maintenance_work_mem * 1024L;
estother = 2 * 1024 * 1024;
if (esthnswarea > estother)
esthnswarea -= estother;
shm_toc_estimate_chunk(&pcxt->estimator, esthnswarea);
/* Start with a minimal DSA so InitializeParallelDSM does not fail */
estdsaspace = dsa_minimum_size();
shm_toc_estimate_chunk(&pcxt->estimator, estdsaspace);
shm_toc_estimate_keys(&pcxt->estimator, 2);
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
@@ -975,12 +975,28 @@ HnswBeginParallel(HnswBuildState * buildstate, bool isconcurrent, int request)
heap_parallelscan_initialize(&hnswshared->heapdesc, buildstate->heap, snapshot);
#endif
hnswarea = (char *) shm_toc_allocate(pcxt->toc, esthnswarea);
dsaspace = shm_toc_allocate(pcxt->toc, estdsaspace);
dsa = dsa_create_in_place(dsaspace, estdsaspace, hnsw_lock_tranche_id, pcxt->seg);
esthnswarea = maintenance_work_mem * 1024L;
hnswshared->dsaptr = dsa_allocate_extended(dsa, esthnswarea, DSA_ALLOC_HUGE | DSA_ALLOC_NO_OOM);
/* If not enough shared memory, back out (do serial build) */
if (!DsaPointerIsValid(hnswshared->dsaptr))
{
if (IsMVCCSnapshot(snapshot))
UnregisterSnapshot(snapshot);
DestroyParallelContext(pcxt);
ExitParallelMode();
return;
}
hnswarea = dsa_get_address(dsa, hnswshared->dsaptr);
/* Report less than allocated so never fails */
InitGraph(&hnswshared->graphData, hnswarea, esthnswarea - 1024 * 1024);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_SHARED, hnswshared);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_AREA, hnswarea);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_HNSW_AREA, dsaspace);
/* Store query string for workers */
if (debug_query_string)