diff --git a/src/hnsw.c b/src/hnsw.c index 6464083..249719f 100644 --- a/src/hnsw.c +++ b/src/hnsw.c @@ -18,9 +18,18 @@ #define MarkGUCPrefixReserved(x) EmitWarningsOnPlaceholders(x) #endif +static const struct config_enum_entry hnsw_streaming_options[] = { + {"off", HNSW_STREAMING_OFF, false}, + {"strict", HNSW_STREAMING_STRICT, false}, + {"relaxed", HNSW_STREAMING_RELAXED, false}, + /* TODO Change to strict before merging */ + {"on", HNSW_STREAMING_RELAXED, false}, + {NULL, 0, false} +}; + int hnsw_ef_search; int hnsw_ef_stream; -bool hnsw_streaming; +int hnsw_streaming; int hnsw_lock_tranche_id; static relopt_kind hnsw_relopt_kind; @@ -72,9 +81,9 @@ HnswInit(void) HNSW_DEFAULT_EF_SEARCH, HNSW_MIN_EF_SEARCH, HNSW_MAX_EF_SEARCH, PGC_USERSET, 0, NULL, NULL, NULL); /* TODO Figure out name */ - DefineCustomBoolVariable("hnsw.streaming", "Use streaming mode", + DefineCustomEnumVariable("hnsw.streaming", "Use streaming mode", NULL, &hnsw_streaming, - HNSW_DEFAULT_STREAMING, PGC_USERSET, 0, NULL, NULL, NULL); + HNSW_STREAMING_OFF, hnsw_streaming_options, PGC_USERSET, 0, NULL, NULL, NULL); /* TODO Figure out name */ /* TODO Use same value as ivfflat.max_probes for "all" */ diff --git a/src/hnsw.h b/src/hnsw.h index 0e1aa54..878e262 100644 --- a/src/hnsw.h +++ b/src/hnsw.h @@ -46,7 +46,6 @@ #define HNSW_DEFAULT_EF_SEARCH 40 #define HNSW_MIN_EF_SEARCH 1 #define HNSW_MAX_EF_SEARCH 1000 -#define HNSW_DEFAULT_STREAMING false #define HNSW_DEFAULT_EF_STREAM -1 #define HNSW_MIN_EF_STREAM -1 #define HNSW_MAX_EF_STREAM INT_MAX @@ -130,9 +129,16 @@ /* Variables */ extern int hnsw_ef_search; extern int hnsw_ef_stream; -extern bool hnsw_streaming; +extern int hnsw_streaming; extern int hnsw_lock_tranche_id; +typedef enum HnswStreamingMode +{ + HNSW_STREAMING_OFF, + HNSW_STREAMING_STRICT, + HNSW_STREAMING_RELAXED +} HnswStreamingMode; + typedef struct HnswElementData HnswElementData; typedef struct HnswNeighborArray HnswNeighborArray; @@ -371,6 +377,7 @@ typedef struct HnswScanOpaqueData Datum q; int m; int64 tuples; + double previousDistance; MemoryContext tmpCtx; /* Support functions */ diff --git a/src/hnswscan.c b/src/hnswscan.c index a2f6f8b..61c80cf 100644 --- a/src/hnswscan.c +++ b/src/hnswscan.c @@ -1,5 +1,7 @@ #include "postgres.h" +#include + #include "access/relscan.h" #include "hnsw.h" #include "pgstat.h" @@ -40,7 +42,7 @@ GetScanItems(IndexScanDesc scan, Datum q) ep = w; } - return HnswSearchLayer(base, q, ep, hnsw_ef_search, 0, index, procinfo, collation, m, false, NULL, &so->v, hnsw_streaming ? &so->discarded : NULL, true, &so->tuples); + return HnswSearchLayer(base, q, ep, hnsw_ef_search, 0, index, procinfo, collation, m, false, NULL, &so->v, hnsw_streaming != HNSW_STREAMING_OFF ? &so->discarded : NULL, true, &so->tuples); } /* @@ -149,6 +151,7 @@ hnswrescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int no so->first = true; so->tuples = 0; + so->previousDistance = -INFINITY; MemoryContextReset(so->tmpCtx); if (keys && scan->numberOfKeys > 0) @@ -219,7 +222,7 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir) if (list_length(so->w) == 0) { - if (!hnsw_streaming) + if (hnsw_streaming == HNSW_STREAMING_OFF) break; /* Empty index */ @@ -285,7 +288,7 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir) so->w = list_delete_last(so->w); /* Mark memory as free for next iteration */ - if (hnsw_streaming) + if (hnsw_streaming != HNSW_STREAMING_OFF) { pfree(element); pfree(hc); @@ -296,6 +299,14 @@ hnswgettuple(IndexScanDesc scan, ScanDirection dir) heaptid = &element->heaptids[--element->heaptidsLength]; + if (hnsw_streaming == HNSW_STREAMING_STRICT) + { + if (hc->distance < so->previousDistance) + continue; + + so->previousDistance = hc->distance; + } + MemoryContextSwitchTo(oldCtx); scan->xs_heaptid = *heaptid;