Files
hakmem/core/box/pool_mf2_adoption.inc.h

130 lines
6.2 KiB
C
Raw Normal View History

// Consumer-Driven Adoption: Try to adopt a page from ANY thread's pending queue
// Returns true if a page was successfully adopted and activated
// Called from alloc_slow when allocating thread needs memory
static bool mf2_try_adopt_pending(MF2_ThreadPages* me, int class_idx) {
if (!me) return false;
// IMMEDIATE FIX #1: Early return if no adoptable pages (O(1) gating)
// Avoids scanning empty queues (major performance win!)
int adoptable = atomic_load_explicit(&g_adoptable_count[class_idx], memory_order_relaxed);
if (adoptable == 0) return false; // All queues empty, no scan needed
// Get global thread registry
int num_tp = atomic_load_explicit(&g_num_thread_pages, memory_order_acquire);
if (num_tp == 0) return false;
// IMMEDIATE FIX #2: Limit scan to MAX_QUEUES threads (configurable via HAKMEM_MF2_MAX_QUEUES)
// Prevents excessive scanning overhead (2-8 threads is usually enough)
int scan_limit = (num_tp < g_mf2_max_queues) ? num_tp : g_mf2_max_queues;
// Round-robin scan (limited number of threads, not ALL!)
static _Atomic uint64_t adopt_counter = 0;
uint64_t start_idx = atomic_fetch_add_explicit(&adopt_counter, 1, memory_order_relaxed);
for (int i = 0; i < scan_limit; i++) {
int tp_idx = (start_idx + i) % num_tp;
MF2_ThreadPages* other_tp = (MF2_ThreadPages*)atomic_load_explicit(
(atomic_uintptr_t*)&g_all_thread_pages[tp_idx], memory_order_acquire);
if (!other_tp) continue;
// Route P: Idle Detection - Only adopt from idle owners
// Check if owner is still actively allocating (threshold configurable via env var)
uint64_t now_tsc = mf2_rdtsc();
uint64_t owner_last_alloc = atomic_load_explicit(&other_tp->last_alloc_tsc, memory_order_relaxed);
uint64_t idle_threshold_cycles = (uint64_t)g_mf2_idle_threshold_us * MF2_TSC_CYCLES_PER_US;
if ((now_tsc - owner_last_alloc) < idle_threshold_cycles) {
continue; // Owner still active, skip adoption
}
// IMMEDIATE FIX #3: Claim exclusive access (prevent multi-consumer CAS thrashing!)
// Only one thread scans each queue at a time → eliminates CAS contention
if (atomic_flag_test_and_set_explicit(&other_tp->pending_claim[class_idx], memory_order_acquire)) {
continue; // Another thread is already scanning this queue, skip
}
// Try to dequeue a pending page from this thread
MidPage* page = mf2_dequeue_pending(other_tp, class_idx);
if (!page) {
// Queue empty, release claim and try next thread
atomic_flag_clear_explicit(&other_tp->pending_claim[class_idx], memory_order_release);
continue;
}
// Clear pending flag (no longer in queue)
atomic_store_explicit(&page->in_remote_pending, false, memory_order_release);
// Check lease: Has enough time passed since last transfer? (configurable via HAKMEM_MF2_LEASE_MS)
// 0ms = disabled (no lease check), >0 = lease period in milliseconds
uint64_t now = mf2_rdtsc();
uint64_t last_transfer = page->last_transfer_time;
if (g_mf2_lease_ms > 0 && last_transfer != 0) {
// Calculate lease cycles from ms (approx 3GHz CPU)
uint64_t lease_cycles = (uint64_t)g_mf2_lease_ms * (MF2_TSC_CYCLES_PER_US * 1000ULL);
if ((now - last_transfer) < lease_cycles) {
// Lease still active, return page to full_pages (don't thrash ownership)
page->next_page = other_tp->full_pages[class_idx];
other_tp->full_pages[class_idx] = page;
// Release claim before continuing
atomic_flag_clear_explicit(&other_tp->pending_claim[class_idx], memory_order_release);
continue; // Try next thread
}
}
// Try to transfer ownership using CAS
pthread_t old_owner = page->owner_tid;
pthread_t new_owner = pthread_self();
// Note: pthread_t may not be atomic-compatible on all platforms
// For now, we'll use a simple write (ownership transfer is rare)
// TODO: If thrashing is observed, add atomic CAS with serialization
page->owner_tid = new_owner;
page->owner_tp = me;
page->last_transfer_time = now;
// DEBUG: Log drain state
static _Atomic int adopt_samples = 0;
int sample_idx = atomic_fetch_add_explicit(&adopt_samples, 1, memory_order_relaxed);
unsigned int pre_remote = atomic_load_explicit(&page->remote_count, memory_order_relaxed);
unsigned int pre_free = page->free_count;
PoolBlock* pre_freelist = page->freelist;
// Drain remote frees
int drained = mf2_drain_remote_frees(page);
// DEBUG: Log result (first 10 samples)
if (sample_idx < 10) {
MF2_DEBUG_LOG("ADOPT_DRAIN %d: class=%d, remote_cnt=%u, drained=%d, pre_free=%u, post_free=%u, pre_freelist=%p, post_freelist=%p",
sample_idx, class_idx, pre_remote, drained,
pre_free, page->free_count, pre_freelist, page->freelist);
}
// Make adopted page ACTIVE immediately (not partial!)
// Adoption needs immediate activation for caller's mf2_alloc_fast()
// Partial list is only for own pending queue drains
if (page->freelist) {
atomic_fetch_add(&g_mf2_page_reuse_count, 1);
atomic_fetch_add(&g_mf2_pending_drained, 1);
atomic_fetch_add(&g_mf2_drain_success, 1);
// Make it active (move old active to full_pages)
mf2_make_page_active(me, class_idx, page);
// Release claim before returning SUCCESS
atomic_flag_clear_explicit(&other_tp->pending_claim[class_idx], memory_order_release);
return true; // SUCCESS! Page adopted and activated
}
// No freelist after drain, return to MY full_pages (I'm the new owner!)
page->next_page = me->full_pages[class_idx];
me->full_pages[class_idx] = page;
// Release claim before continuing search
atomic_flag_clear_explicit(&other_tp->pending_claim[class_idx], memory_order_release);
// Continue searching for a better page
}
return false; // No adoptable pages found
}