// hakmem_batch.c - madvise Batching Implementation // // Batch MADV_DONTNEED calls to reduce TLB flush overhead // Enable GNU extensions for madvise #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include "hakmem_batch.h" #include "hakmem_sys.h" // Phase 6.11.1: Syscall wrappers with timing #include "hakmem_whale.h" // Phase 6.11.1: Whale fast-path cache #include #include #include #include #include // Release-silent logging #include "hakmem_internal.h" // For madvise (Linux) #ifdef __linux__ #include #endif // Prefer MADV_FREE (faster, less TLB overhead) // Fallback to MADV_DONTNEED if not available #ifndef MADV_FREE #ifdef __linux__ #define MADV_FREE 8 // Linux MADV_FREE #else #define MADV_FREE MADV_DONTNEED // Fallback #endif #endif #ifndef MADV_DONTNEED #define MADV_DONTNEED 4 #endif // Global batch state typedef struct { void* pages[BATCH_MAX_BLOCKS]; size_t sizes[BATCH_MAX_BLOCKS]; int count; } PageBatch; static DontneedBatch g_batch; static PageBatch g_pg_batch; static int g_initialized = 0; static int g_bg_enabled = 1; // env: HAKMEM_BATCH_BG=1 enables background worker static pthread_mutex_t g_batch_mu; // protects g_batch static pthread_cond_t g_batch_cv; // signals worker when flush is requested or items present static pthread_t g_batch_thread; static atomic_int g_flush_requested; // producers request flush when threshold reached static atomic_int g_stop_worker; // shutdown flag // Forward decl static void* hak_batch_worker(void* arg); // Statistics static uint64_t g_total_added = 0; static uint64_t g_total_flushed = 0; static uint64_t g_flush_count = 0; static uint64_t g_immediate_dontneed = 0; // Blocks too small to batch // Initialize batch system void hak_batch_init(void) { if (g_initialized) return; memset(&g_batch, 0, sizeof(g_batch)); memset(&g_pg_batch, 0, sizeof(g_pg_batch)); g_total_added = 0; g_total_flushed = 0; g_flush_count = 0; g_immediate_dontneed = 0; // Background worker toggle const char* e_bg = getenv("HAKMEM_BATCH_BG"); if (e_bg) g_bg_enabled = (atoi(e_bg) != 0); // Sync primitives pthread_mutex_init(&g_batch_mu, NULL); pthread_cond_init(&g_batch_cv, NULL); atomic_store(&g_flush_requested, 0); atomic_store(&g_stop_worker, 0); // Spawn worker if enabled if (g_bg_enabled) { int rc = pthread_create(&g_batch_thread, NULL, hak_batch_worker, NULL); if (rc != 0) { g_bg_enabled = 0; // fallback to sync flush } } g_initialized = 1; #if !HAKMEM_BUILD_RELEASE fprintf(stderr, "[Batch] Initialized (threshold=%d MB, min_size=%d KB, bg=%s)\n", BATCH_THRESHOLD / (1024 * 1024), BATCH_MIN_SIZE / 1024, g_bg_enabled?"on":"off"); #endif } // Flush all batched blocks void hak_batch_flush(void) { if (!g_initialized) return; if (g_batch.count == 0) return; // Nothing to flush #ifdef __linux__ // Execute madvise for all blocks in batch // Phase 6.3: Use MADV_FREE (prefer) or MADV_DONTNEED (fallback) // MADV_FREE is faster (less TLB overhead) and works better with quick reuse // Take a snapshot under lock to minimize producer stall DontneedBatch snap; pthread_mutex_lock(&g_batch_mu); if (g_batch.count == 0) { pthread_mutex_unlock(&g_batch_mu); return; } snap = g_batch; // shallow copy (pointers/sizes) g_batch.count = 0; g_batch.total_bytes = 0; pthread_mutex_unlock(&g_batch_mu); for (int i = 0; i < snap.count; i++) { void* ptr = snap.blocks[i]; size_t size = snap.sizes[i]; // Step 1: MADV_FREE to release physical pages (fast, low TLB cost) int ret = madvise(ptr, size, MADV_FREE); if (ret != 0) { // Fallback to MADV_DONTNEED if MADV_FREE not supported ret = madvise(ptr, size, MADV_DONTNEED); if (ret != 0) { fprintf(stderr, "[Batch] Warning: madvise failed for block %p (size %zu)\n", ptr, size); } } // Step 2: Deferred munmap (batched to reduce TLB flush overhead) // This is the key optimization: batch munmap reduces TLB flushes from N to 1 // Phase 6.11.1: Try whale cache first if (hkm_whale_put(ptr, size) != 0) { hkm_sys_munmap(ptr, size); } g_total_flushed++; } #endif g_flush_count++; #if !HAKMEM_BUILD_RELEASE fprintf(stderr, "[Batch] Flushed %d blocks (%.1f MB) - flush #%lu\n", snap.count, snap.total_bytes / (1024.0 * 1024.0), g_flush_count); #endif } // Add block to batch int hak_batch_add(void* ptr, size_t size) { if (!g_initialized) hak_batch_init(); g_total_added++; // Small blocks: immediate madvise (don't batch) if (size < BATCH_MIN_SIZE) { #ifdef __linux__ hkm_sys_madvise_dontneed(ptr, size); // Phase 6.11.1: Use syscall wrapper #endif g_immediate_dontneed++; return 0; // Not flushed (immediate) } // Add to batch (under lock) pthread_mutex_lock(&g_batch_mu); if (g_batch.count >= BATCH_MAX_BLOCKS) { // Fall back: request flush and overwrite last slot atomic_store(&g_flush_requested, 1); pthread_cond_signal(&g_batch_cv); g_batch.count = 0; g_batch.total_bytes = 0; } g_batch.blocks[g_batch.count] = ptr; g_batch.sizes[g_batch.count] = size; g_batch.count++; g_batch.total_bytes += size; int reached = (g_batch.total_bytes >= BATCH_THRESHOLD); pthread_mutex_unlock(&g_batch_mu); // If threshold reached, request background flush (or flush inline if bg disabled) if (reached) { if (g_bg_enabled) { atomic_store(&g_flush_requested, 1); pthread_cond_signal(&g_batch_cv); } else { hak_batch_flush(); } return 1; } return 0; // Buffered } // Shutdown batch system void hak_batch_shutdown(void) { if (!g_initialized) return; // Signal worker to stop if (g_bg_enabled) { atomic_store(&g_stop_worker, 1); pthread_cond_signal(&g_batch_cv); pthread_join(g_batch_thread, NULL); } else { // Flush any remaining blocks synchronously if (g_batch.count > 0) { fprintf(stderr, "[Batch] Final flush of %d remaining blocks...\n", g_batch.count); hak_batch_flush(); } } hak_batch_print_stats(); g_initialized = 0; } // Print statistics void hak_batch_print_stats(void) { if (!g_initialized) return; #if !HAKMEM_BUILD_RELEASE fprintf(stderr, "\n========================================\n"); fprintf(stderr, "madvise Batching Statistics\n"); fprintf(stderr, "========================================\n"); fprintf(stderr, "Total blocks added: %lu\n", g_total_added); fprintf(stderr, "Total blocks flushed: %lu\n", g_total_flushed); fprintf(stderr, "Immediate (unbatched): %lu\n", g_immediate_dontneed); fprintf(stderr, "Flush operations: %lu\n", g_flush_count); if (g_flush_count > 0) { double avg_blocks_per_flush = (double)g_total_flushed / (double)g_flush_count; fprintf(stderr, "Avg blocks per flush: %.1f\n", avg_blocks_per_flush); fprintf(stderr, "TLB flush reduction: %.1fx (vs unbatched)\n", avg_blocks_per_flush); } fprintf(stderr, "Pending blocks: %d\n", g_batch.count); fprintf(stderr, "Pending bytes: %.1f MB\n", g_batch.total_bytes / (1024.0 * 1024.0)); fprintf(stderr, "========================================\n\n"); #endif } // Query pending bytes size_t hak_batch_get_pending_bytes(void) { return g_batch.total_bytes; } // Query pending count int hak_batch_get_pending_count(void) { return g_batch.count; } // Enqueue a managed page for MADV_DONTNEED (no munmap) int hak_batch_add_page(void* page, size_t size) { if (!g_initialized) hak_batch_init(); pthread_mutex_lock(&g_batch_mu); if (g_pg_batch.count < BATCH_MAX_BLOCKS) { g_pg_batch.pages[g_pg_batch.count] = page; g_pg_batch.sizes[g_pg_batch.count] = size; g_pg_batch.count++; } pthread_mutex_unlock(&g_batch_mu); if (g_bg_enabled) { atomic_store(&g_flush_requested, 1); pthread_cond_signal(&g_batch_cv); return 0; } else { // Synchronous fallback hkm_sys_madvise_dontneed(page, size); return 1; } } // Background worker entry point static void* hak_batch_worker(void* arg) { (void)arg; while (!atomic_load(&g_stop_worker)) { pthread_mutex_lock(&g_batch_mu); // Wait up to 10ms or until signaled struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_nsec += 10 * 1000 * 1000; // +10ms if (ts.tv_nsec >= 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; } pthread_cond_timedwait(&g_batch_cv, &g_batch_mu, &ts); int should_flush = (g_batch.count > 0) || (g_pg_batch.count > 0) || atomic_exchange(&g_flush_requested, 0); pthread_mutex_unlock(&g_batch_mu); if (should_flush) { // Snapshot & flush page batch first (MADV_DONTNEED only) PageBatch snap_pg; pthread_mutex_lock(&g_batch_mu); snap_pg = g_pg_batch; g_pg_batch.count = 0; pthread_mutex_unlock(&g_batch_mu); for (int i=0;i 0) { PageBatch snap_pg; pthread_mutex_lock(&g_batch_mu); snap_pg = g_pg_batch; g_pg_batch.count = 0; pthread_mutex_unlock(&g_batch_mu); for (int i=0;i 0) hak_batch_flush(); return NULL; }