310 lines
10 KiB
C
310 lines
10 KiB
C
|
|
// hakmem_batch.c - madvise Batching Implementation
|
||
|
|
//
|
||
|
|
// Batch MADV_DONTNEED calls to reduce TLB flush overhead
|
||
|
|
|
||
|
|
// Enable GNU extensions for madvise
|
||
|
|
#ifndef _GNU_SOURCE
|
||
|
|
#define _GNU_SOURCE
|
||
|
|
#endif
|
||
|
|
|
||
|
|
#include "hakmem_batch.h"
|
||
|
|
#include "hakmem_sys.h" // Phase 6.11.1: Syscall wrappers with timing
|
||
|
|
#include "hakmem_whale.h" // Phase 6.11.1: Whale fast-path cache
|
||
|
|
#include <stdio.h>
|
||
|
|
#include <string.h>
|
||
|
|
#include <stdlib.h>
|
||
|
|
#include <pthread.h>
|
||
|
|
#include <stdatomic.h>
|
||
|
|
|
||
|
|
// For madvise (Linux)
|
||
|
|
#ifdef __linux__
|
||
|
|
#include <sys/mman.h>
|
||
|
|
#endif
|
||
|
|
|
||
|
|
// Prefer MADV_FREE (faster, less TLB overhead)
|
||
|
|
// Fallback to MADV_DONTNEED if not available
|
||
|
|
#ifndef MADV_FREE
|
||
|
|
#ifdef __linux__
|
||
|
|
#define MADV_FREE 8 // Linux MADV_FREE
|
||
|
|
#else
|
||
|
|
#define MADV_FREE MADV_DONTNEED // Fallback
|
||
|
|
#endif
|
||
|
|
#endif
|
||
|
|
|
||
|
|
#ifndef MADV_DONTNEED
|
||
|
|
#define MADV_DONTNEED 4
|
||
|
|
#endif
|
||
|
|
|
||
|
|
// Global batch state
|
||
|
|
typedef struct { void* pages[BATCH_MAX_BLOCKS]; size_t sizes[BATCH_MAX_BLOCKS]; int count; } PageBatch;
|
||
|
|
static DontneedBatch g_batch;
|
||
|
|
static PageBatch g_pg_batch;
|
||
|
|
static int g_initialized = 0;
|
||
|
|
static int g_bg_enabled = 1; // env: HAKMEM_BATCH_BG=1 enables background worker
|
||
|
|
static pthread_mutex_t g_batch_mu; // protects g_batch
|
||
|
|
static pthread_cond_t g_batch_cv; // signals worker when flush is requested or items present
|
||
|
|
static pthread_t g_batch_thread;
|
||
|
|
static atomic_int g_flush_requested; // producers request flush when threshold reached
|
||
|
|
static atomic_int g_stop_worker; // shutdown flag
|
||
|
|
|
||
|
|
// Forward decl
|
||
|
|
static void* hak_batch_worker(void* arg);
|
||
|
|
|
||
|
|
// Statistics
|
||
|
|
static uint64_t g_total_added = 0;
|
||
|
|
static uint64_t g_total_flushed = 0;
|
||
|
|
static uint64_t g_flush_count = 0;
|
||
|
|
static uint64_t g_immediate_dontneed = 0; // Blocks too small to batch
|
||
|
|
|
||
|
|
// Initialize batch system
|
||
|
|
void hak_batch_init(void) {
|
||
|
|
if (g_initialized) return;
|
||
|
|
|
||
|
|
memset(&g_batch, 0, sizeof(g_batch));
|
||
|
|
memset(&g_pg_batch, 0, sizeof(g_pg_batch));
|
||
|
|
g_total_added = 0;
|
||
|
|
g_total_flushed = 0;
|
||
|
|
g_flush_count = 0;
|
||
|
|
g_immediate_dontneed = 0;
|
||
|
|
|
||
|
|
// Background worker toggle
|
||
|
|
const char* e_bg = getenv("HAKMEM_BATCH_BG");
|
||
|
|
if (e_bg) g_bg_enabled = (atoi(e_bg) != 0);
|
||
|
|
|
||
|
|
// Sync primitives
|
||
|
|
pthread_mutex_init(&g_batch_mu, NULL);
|
||
|
|
pthread_cond_init(&g_batch_cv, NULL);
|
||
|
|
atomic_store(&g_flush_requested, 0);
|
||
|
|
atomic_store(&g_stop_worker, 0);
|
||
|
|
|
||
|
|
// Spawn worker if enabled
|
||
|
|
if (g_bg_enabled) {
|
||
|
|
int rc = pthread_create(&g_batch_thread, NULL, hak_batch_worker, NULL);
|
||
|
|
if (rc != 0) {
|
||
|
|
g_bg_enabled = 0; // fallback to sync flush
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
g_initialized = 1;
|
||
|
|
{
|
||
|
|
const char* q = getenv("HAKMEM_QUIET");
|
||
|
|
if (!(q && strcmp(q, "1") == 0)) {
|
||
|
|
fprintf(stderr, "[Batch] Initialized (threshold=%d MB, min_size=%d KB, bg=%s)\n",
|
||
|
|
BATCH_THRESHOLD / (1024 * 1024), BATCH_MIN_SIZE / 1024, g_bg_enabled?"on":"off");
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Flush all batched blocks
|
||
|
|
void hak_batch_flush(void) {
|
||
|
|
if (!g_initialized) return;
|
||
|
|
if (g_batch.count == 0) return; // Nothing to flush
|
||
|
|
|
||
|
|
#ifdef __linux__
|
||
|
|
// Execute madvise for all blocks in batch
|
||
|
|
// Phase 6.3: Use MADV_FREE (prefer) or MADV_DONTNEED (fallback)
|
||
|
|
// MADV_FREE is faster (less TLB overhead) and works better with quick reuse
|
||
|
|
// Take a snapshot under lock to minimize producer stall
|
||
|
|
DontneedBatch snap;
|
||
|
|
pthread_mutex_lock(&g_batch_mu);
|
||
|
|
if (g_batch.count == 0) { pthread_mutex_unlock(&g_batch_mu); return; }
|
||
|
|
snap = g_batch; // shallow copy (pointers/sizes)
|
||
|
|
g_batch.count = 0; g_batch.total_bytes = 0;
|
||
|
|
pthread_mutex_unlock(&g_batch_mu);
|
||
|
|
|
||
|
|
for (int i = 0; i < snap.count; i++) {
|
||
|
|
void* ptr = snap.blocks[i];
|
||
|
|
size_t size = snap.sizes[i];
|
||
|
|
|
||
|
|
// Step 1: MADV_FREE to release physical pages (fast, low TLB cost)
|
||
|
|
int ret = madvise(ptr, size, MADV_FREE);
|
||
|
|
if (ret != 0) {
|
||
|
|
// Fallback to MADV_DONTNEED if MADV_FREE not supported
|
||
|
|
ret = madvise(ptr, size, MADV_DONTNEED);
|
||
|
|
if (ret != 0) {
|
||
|
|
fprintf(stderr, "[Batch] Warning: madvise failed for block %p (size %zu)\n", ptr, size);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Step 2: Deferred munmap (batched to reduce TLB flush overhead)
|
||
|
|
// This is the key optimization: batch munmap reduces TLB flushes from N to 1
|
||
|
|
// Phase 6.11.1: Try whale cache first
|
||
|
|
if (hkm_whale_put(ptr, size) != 0) {
|
||
|
|
hkm_sys_munmap(ptr, size);
|
||
|
|
}
|
||
|
|
|
||
|
|
g_total_flushed++;
|
||
|
|
}
|
||
|
|
#endif
|
||
|
|
|
||
|
|
g_flush_count++;
|
||
|
|
{
|
||
|
|
const char* q = getenv("HAKMEM_QUIET");
|
||
|
|
if (!(q && strcmp(q, "1") == 0)) {
|
||
|
|
fprintf(stderr, "[Batch] Flushed %d blocks (%.1f MB) - flush #%lu\n",
|
||
|
|
snap.count, snap.total_bytes / (1024.0 * 1024.0), g_flush_count);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Add block to batch
|
||
|
|
int hak_batch_add(void* ptr, size_t size) {
|
||
|
|
if (!g_initialized) hak_batch_init();
|
||
|
|
|
||
|
|
g_total_added++;
|
||
|
|
|
||
|
|
// Small blocks: immediate madvise (don't batch)
|
||
|
|
if (size < BATCH_MIN_SIZE) {
|
||
|
|
#ifdef __linux__
|
||
|
|
hkm_sys_madvise_dontneed(ptr, size); // Phase 6.11.1: Use syscall wrapper
|
||
|
|
#endif
|
||
|
|
g_immediate_dontneed++;
|
||
|
|
return 0; // Not flushed (immediate)
|
||
|
|
}
|
||
|
|
|
||
|
|
// Add to batch (under lock)
|
||
|
|
pthread_mutex_lock(&g_batch_mu);
|
||
|
|
if (g_batch.count >= BATCH_MAX_BLOCKS) {
|
||
|
|
// Fall back: request flush and overwrite last slot
|
||
|
|
atomic_store(&g_flush_requested, 1);
|
||
|
|
pthread_cond_signal(&g_batch_cv);
|
||
|
|
g_batch.count = 0; g_batch.total_bytes = 0;
|
||
|
|
}
|
||
|
|
g_batch.blocks[g_batch.count] = ptr;
|
||
|
|
g_batch.sizes[g_batch.count] = size;
|
||
|
|
g_batch.count++;
|
||
|
|
g_batch.total_bytes += size;
|
||
|
|
int reached = (g_batch.total_bytes >= BATCH_THRESHOLD);
|
||
|
|
pthread_mutex_unlock(&g_batch_mu);
|
||
|
|
|
||
|
|
// If threshold reached, request background flush (or flush inline if bg disabled)
|
||
|
|
if (reached) {
|
||
|
|
if (g_bg_enabled) {
|
||
|
|
atomic_store(&g_flush_requested, 1);
|
||
|
|
pthread_cond_signal(&g_batch_cv);
|
||
|
|
} else {
|
||
|
|
hak_batch_flush();
|
||
|
|
}
|
||
|
|
return 1;
|
||
|
|
}
|
||
|
|
|
||
|
|
return 0; // Buffered
|
||
|
|
}
|
||
|
|
|
||
|
|
// Shutdown batch system
|
||
|
|
void hak_batch_shutdown(void) {
|
||
|
|
if (!g_initialized) return;
|
||
|
|
|
||
|
|
// Signal worker to stop
|
||
|
|
if (g_bg_enabled) {
|
||
|
|
atomic_store(&g_stop_worker, 1);
|
||
|
|
pthread_cond_signal(&g_batch_cv);
|
||
|
|
pthread_join(g_batch_thread, NULL);
|
||
|
|
} else {
|
||
|
|
// Flush any remaining blocks synchronously
|
||
|
|
if (g_batch.count > 0) {
|
||
|
|
const char* q = getenv("HAKMEM_QUIET");
|
||
|
|
if (!(q && strcmp(q, "1") == 0)) {
|
||
|
|
fprintf(stderr, "[Batch] Final flush of %d remaining blocks...\n", g_batch.count);
|
||
|
|
}
|
||
|
|
hak_batch_flush();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
hak_batch_print_stats();
|
||
|
|
g_initialized = 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Print statistics
|
||
|
|
void hak_batch_print_stats(void) {
|
||
|
|
if (!g_initialized) return;
|
||
|
|
|
||
|
|
fprintf(stderr, "\n========================================\n");
|
||
|
|
fprintf(stderr, "madvise Batching Statistics\n");
|
||
|
|
fprintf(stderr, "========================================\n");
|
||
|
|
fprintf(stderr, "Total blocks added: %lu\n", g_total_added);
|
||
|
|
fprintf(stderr, "Total blocks flushed: %lu\n", g_total_flushed);
|
||
|
|
fprintf(stderr, "Immediate (unbatched): %lu\n", g_immediate_dontneed);
|
||
|
|
fprintf(stderr, "Flush operations: %lu\n", g_flush_count);
|
||
|
|
|
||
|
|
if (g_flush_count > 0) {
|
||
|
|
double avg_blocks_per_flush = (double)g_total_flushed / (double)g_flush_count;
|
||
|
|
fprintf(stderr, "Avg blocks per flush: %.1f\n", avg_blocks_per_flush);
|
||
|
|
fprintf(stderr, "TLB flush reduction: %.1fx (vs unbatched)\n", avg_blocks_per_flush);
|
||
|
|
}
|
||
|
|
|
||
|
|
fprintf(stderr, "Pending blocks: %d\n", g_batch.count);
|
||
|
|
fprintf(stderr, "Pending bytes: %.1f MB\n", g_batch.total_bytes / (1024.0 * 1024.0));
|
||
|
|
fprintf(stderr, "========================================\n\n");
|
||
|
|
}
|
||
|
|
|
||
|
|
// Query pending bytes
|
||
|
|
size_t hak_batch_get_pending_bytes(void) {
|
||
|
|
return g_batch.total_bytes;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Query pending count
|
||
|
|
int hak_batch_get_pending_count(void) {
|
||
|
|
return g_batch.count;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Enqueue a managed page for MADV_DONTNEED (no munmap)
|
||
|
|
int hak_batch_add_page(void* page, size_t size) {
|
||
|
|
if (!g_initialized) hak_batch_init();
|
||
|
|
|
||
|
|
pthread_mutex_lock(&g_batch_mu);
|
||
|
|
if (g_pg_batch.count < BATCH_MAX_BLOCKS) {
|
||
|
|
g_pg_batch.pages[g_pg_batch.count] = page;
|
||
|
|
g_pg_batch.sizes[g_pg_batch.count] = size;
|
||
|
|
g_pg_batch.count++;
|
||
|
|
}
|
||
|
|
pthread_mutex_unlock(&g_batch_mu);
|
||
|
|
|
||
|
|
if (g_bg_enabled) {
|
||
|
|
atomic_store(&g_flush_requested, 1);
|
||
|
|
pthread_cond_signal(&g_batch_cv);
|
||
|
|
return 0;
|
||
|
|
} else {
|
||
|
|
// Synchronous fallback
|
||
|
|
hkm_sys_madvise_dontneed(page, size);
|
||
|
|
return 1;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
// Background worker entry point
|
||
|
|
static void* hak_batch_worker(void* arg) {
|
||
|
|
(void)arg;
|
||
|
|
while (!atomic_load(&g_stop_worker)) {
|
||
|
|
pthread_mutex_lock(&g_batch_mu);
|
||
|
|
// Wait up to 10ms or until signaled
|
||
|
|
struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts);
|
||
|
|
ts.tv_nsec += 10 * 1000 * 1000; // +10ms
|
||
|
|
if (ts.tv_nsec >= 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; }
|
||
|
|
pthread_cond_timedwait(&g_batch_cv, &g_batch_mu, &ts);
|
||
|
|
int should_flush = (g_batch.count > 0) || (g_pg_batch.count > 0) || atomic_exchange(&g_flush_requested, 0);
|
||
|
|
pthread_mutex_unlock(&g_batch_mu);
|
||
|
|
if (should_flush) {
|
||
|
|
// Snapshot & flush page batch first (MADV_DONTNEED only)
|
||
|
|
PageBatch snap_pg;
|
||
|
|
pthread_mutex_lock(&g_batch_mu);
|
||
|
|
snap_pg = g_pg_batch; g_pg_batch.count = 0;
|
||
|
|
pthread_mutex_unlock(&g_batch_mu);
|
||
|
|
for (int i=0;i<snap_pg.count;i++) {
|
||
|
|
hkm_sys_madvise_dontneed(snap_pg.pages[i], snap_pg.sizes[i]);
|
||
|
|
}
|
||
|
|
// Then flush regular batch
|
||
|
|
hak_batch_flush();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (g_pg_batch.count > 0) {
|
||
|
|
PageBatch snap_pg;
|
||
|
|
pthread_mutex_lock(&g_batch_mu);
|
||
|
|
snap_pg = g_pg_batch; g_pg_batch.count = 0;
|
||
|
|
pthread_mutex_unlock(&g_batch_mu);
|
||
|
|
for (int i=0;i<snap_pg.count;i++) {
|
||
|
|
hkm_sys_madvise_dontneed(snap_pg.pages[i], snap_pg.sizes[i]);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (g_batch.count > 0) hak_batch_flush();
|
||
|
|
return NULL;
|
||
|
|
}
|