286 lines
22 KiB
C
286 lines
22 KiB
C
|
|
// pool_mf2_core.inc.h — Box: MF2 Per-Page Sharding Core (64KB pages)
|
||
|
|
#ifndef POOL_MF2_CORE_INC_H
|
||
|
|
#define POOL_MF2_CORE_INC_H
|
||
|
|
|
||
|
|
// NOTE: This file is included from hakmem_pool.c and relies on its includes.
|
||
|
|
// It intentionally contains function definitions to keep link structure intact.
|
||
|
|
|
||
|
|
// ===========================================================================
|
||
|
|
// MF2 Per-Page Sharding: Mimalloc-Inspired Architecture
|
||
|
|
// ===========================================================================
|
||
|
|
|
||
|
|
// Key idea: Each 64KB page has independent freelist (no sharing!)
|
||
|
|
// - O(1) page lookup from block address: (addr & ~0xFFFF)
|
||
|
|
// - Owner thread: fast path (no locks, no atomics)
|
||
|
|
// - Cross-thread free: lock-free remote stack
|
||
|
|
|
||
|
|
#define MF2_PENDING_QUEUE_BUDGET 4
|
||
|
|
#define MF2_DEBUG_SAMPLE_COUNT 20
|
||
|
|
#define MF2_TSC_CYCLES_PER_US 3000
|
||
|
|
#define MF2_PAGE_SIZE_SHIFT 16
|
||
|
|
#define MF2_PAGE_ALIGNMENT 65536
|
||
|
|
|
||
|
|
#ifdef HAKMEM_DEBUG_MF2
|
||
|
|
#define MF2_DEBUG_LOG(fmt, ...) fprintf(stderr, "[MF2] " fmt "\n", ##__VA_ARGS__)
|
||
|
|
#define MF2_ERROR_LOG(fmt, ...) fprintf(stderr, "[MF2 ERROR] " fmt "\n", ##__VA_ARGS__)
|
||
|
|
#else
|
||
|
|
#define MF2_DEBUG_LOG(fmt, ...) ((void)0)
|
||
|
|
#define MF2_ERROR_LOG(fmt, ...) fprintf(stderr, "[MF2 ERROR] " fmt "\n", ##__VA_ARGS__)
|
||
|
|
#endif
|
||
|
|
|
||
|
|
// Forward
|
||
|
|
static size_t g_class_sizes[POOL_NUM_CLASSES];
|
||
|
|
|
||
|
|
typedef struct MidPage {
|
||
|
|
void* base;
|
||
|
|
uint8_t class_idx;
|
||
|
|
uint8_t flags;
|
||
|
|
uint16_t _pad0;
|
||
|
|
pthread_t owner_tid;
|
||
|
|
struct MF2_ThreadPages* owner_tp;
|
||
|
|
uint64_t last_transfer_time;
|
||
|
|
PoolBlock* freelist;
|
||
|
|
uint16_t free_count;
|
||
|
|
uint16_t capacity;
|
||
|
|
atomic_uintptr_t remote_head;
|
||
|
|
atomic_uint remote_count;
|
||
|
|
atomic_int in_use;
|
||
|
|
atomic_int pending_dn;
|
||
|
|
struct MidPage* next_page;
|
||
|
|
struct MidPage* prev_page;
|
||
|
|
_Atomic(_Bool) in_remote_pending;
|
||
|
|
struct MidPage* next_pending;
|
||
|
|
char _pad[64 - ((sizeof(void*) * 5 + sizeof(PoolBlock*) + sizeof(uint16_t) * 2 + sizeof(atomic_uintptr_t) + sizeof(atomic_uint) + sizeof(atomic_int) * 2 + sizeof(pthread_t) + sizeof(_Atomic(_Bool)) + 4) % 64)];
|
||
|
|
} MidPage;
|
||
|
|
|
||
|
|
#define MF2_PAGE_REGISTRY_BITS 16
|
||
|
|
#define MF2_PAGE_REGISTRY_SIZE (1 << MF2_PAGE_REGISTRY_BITS)
|
||
|
|
#define MF2_PAGE_REGISTRY_MASK (MF2_PAGE_REGISTRY_SIZE - 1)
|
||
|
|
|
||
|
|
typedef struct {
|
||
|
|
MidPage* pages[MF2_PAGE_REGISTRY_SIZE];
|
||
|
|
pthread_mutex_t locks[256];
|
||
|
|
atomic_uint_fast64_t total_pages;
|
||
|
|
atomic_uint_fast64_t active_pages;
|
||
|
|
} MF2_PageRegistry;
|
||
|
|
|
||
|
|
typedef struct MF2_ThreadPages {
|
||
|
|
MidPage* active_page[POOL_NUM_CLASSES];
|
||
|
|
MidPage* partial_pages[POOL_NUM_CLASSES];
|
||
|
|
MidPage* full_pages[POOL_NUM_CLASSES];
|
||
|
|
atomic_uintptr_t pages_remote_pending[POOL_NUM_CLASSES];
|
||
|
|
atomic_flag pending_claim[POOL_NUM_CLASSES];
|
||
|
|
uint32_t page_count[POOL_NUM_CLASSES];
|
||
|
|
pthread_t my_tid;
|
||
|
|
atomic_uint_fast64_t last_alloc_tsc;
|
||
|
|
} MF2_ThreadPages;
|
||
|
|
|
||
|
|
static MF2_PageRegistry g_mf2_page_registry;
|
||
|
|
static __thread MF2_ThreadPages* t_mf2_pages = NULL;
|
||
|
|
|
||
|
|
#define MF2_MAX_THREADS 256
|
||
|
|
|
||
|
|
typedef struct {
|
||
|
|
int enabled;
|
||
|
|
int max_queues;
|
||
|
|
int lease_ms;
|
||
|
|
int idle_threshold_us;
|
||
|
|
} MF2_Config;
|
||
|
|
|
||
|
|
typedef struct {
|
||
|
|
MF2_ThreadPages* all_thread_pages[MF2_MAX_THREADS];
|
||
|
|
_Atomic int num_thread_pages;
|
||
|
|
_Atomic int adoptable_count[POOL_NUM_CLASSES];
|
||
|
|
pthread_key_t tls_key;
|
||
|
|
pthread_once_t key_once;
|
||
|
|
} MF2_Registry;
|
||
|
|
|
||
|
|
typedef struct {
|
||
|
|
atomic_uint_fast64_t alloc_fast_hit;
|
||
|
|
atomic_uint_fast64_t alloc_slow_hit;
|
||
|
|
atomic_uint_fast64_t page_reuse_count;
|
||
|
|
atomic_uint_fast64_t new_page_count;
|
||
|
|
atomic_uint_fast64_t free_owner_count;
|
||
|
|
atomic_uint_fast64_t free_remote_count;
|
||
|
|
atomic_uint_fast64_t drain_count;
|
||
|
|
atomic_uint_fast64_t drain_blocks;
|
||
|
|
atomic_uint_fast64_t drain_attempts;
|
||
|
|
atomic_uint_fast64_t drain_success;
|
||
|
|
atomic_uint_fast64_t slow_checked_drain;
|
||
|
|
atomic_uint_fast64_t slow_found_remote;
|
||
|
|
atomic_uint_fast64_t full_scan_checked;
|
||
|
|
atomic_uint_fast64_t full_scan_found_remote;
|
||
|
|
atomic_uint_fast64_t eager_drain_scanned;
|
||
|
|
atomic_uint_fast64_t eager_drain_found;
|
||
|
|
atomic_uint_fast64_t pending_enqueued;
|
||
|
|
atomic_uint_fast64_t pending_drained;
|
||
|
|
atomic_uint_fast64_t pending_requeued;
|
||
|
|
} MF2_Stats;
|
||
|
|
|
||
|
|
static MF2_Config g_mf2_config = { .enabled = 0, .max_queues = 2, .lease_ms = 10, .idle_threshold_us = 150 };
|
||
|
|
static MF2_Registry g_mf2_registry = { .all_thread_pages = {0}, .num_thread_pages = 0, .adoptable_count = {0}, .tls_key = 0, .key_once = PTHREAD_ONCE_INIT };
|
||
|
|
static MF2_Stats g_mf2_stats = {0};
|
||
|
|
|
||
|
|
#define g_mf2_enabled (g_mf2_config.enabled)
|
||
|
|
#define g_mf2_max_queues (g_mf2_config.max_queues)
|
||
|
|
#define g_mf2_lease_ms (g_mf2_config.lease_ms)
|
||
|
|
#define g_mf2_idle_threshold_us (g_mf2_config.idle_threshold_us)
|
||
|
|
#define g_all_thread_pages (g_mf2_registry.all_thread_pages)
|
||
|
|
#define g_num_thread_pages (g_mf2_registry.num_thread_pages)
|
||
|
|
#define g_adoptable_count (g_mf2_registry.adoptable_count)
|
||
|
|
#define g_mf2_tls_key (g_mf2_registry.tls_key)
|
||
|
|
#define g_mf2_key_once (g_mf2_registry.key_once)
|
||
|
|
#define g_mf2_alloc_fast_hit (g_mf2_stats.alloc_fast_hit)
|
||
|
|
#define g_mf2_alloc_slow_hit (g_mf2_stats.alloc_slow_hit)
|
||
|
|
#define g_mf2_page_reuse_count (g_mf2_stats.page_reuse_count)
|
||
|
|
#define g_mf2_new_page_count (g_mf2_stats.new_page_count)
|
||
|
|
#define g_mf2_free_owner_count (g_mf2_stats.free_owner_count)
|
||
|
|
#define g_mf2_free_remote_count (g_mf2_stats.free_remote_count)
|
||
|
|
#define g_mf2_drain_count (g_mf2_stats.drain_count)
|
||
|
|
#define g_mf2_drain_blocks (g_mf2_stats.drain_blocks)
|
||
|
|
#define g_mf2_drain_attempts (g_mf2_stats.drain_attempts)
|
||
|
|
#define g_mf2_drain_success (g_mf2_stats.drain_success)
|
||
|
|
#define g_mf2_slow_checked_drain (g_mf2_stats.slow_checked_drain)
|
||
|
|
#define g_mf2_slow_found_remote (g_mf2_stats.slow_found_remote)
|
||
|
|
#define g_mf2_full_scan_checked (g_mf2_stats.full_scan_checked)
|
||
|
|
#define g_mf2_full_scan_found_remote (g_mf2_stats.full_scan_found_remote)
|
||
|
|
#define g_mf2_eager_drain_scanned (g_mf2_stats.eager_drain_scanned)
|
||
|
|
#define g_mf2_eager_drain_found (g_mf2_stats.eager_drain_found)
|
||
|
|
#define g_mf2_pending_enqueued (g_mf2_stats.pending_enqueued)
|
||
|
|
#define g_mf2_pending_drained (g_mf2_stats.pending_drained)
|
||
|
|
#define g_mf2_pending_requeued (g_mf2_stats.pending_requeued)
|
||
|
|
|
||
|
|
// Init / TLS helpers
|
||
|
|
static pthread_once_t mf2_page_registry_init_control = PTHREAD_ONCE_INIT;
|
||
|
|
static void mf2_page_registry_init_impl(void) {
|
||
|
|
memset(&g_mf2_page_registry, 0, sizeof(g_mf2_page_registry));
|
||
|
|
for (int i = 0; i < 256; i++) pthread_mutex_init(&g_mf2_page_registry.locks[i], NULL);
|
||
|
|
atomic_store(&g_mf2_page_registry.total_pages, 0);
|
||
|
|
atomic_store(&g_mf2_page_registry.active_pages, 0);
|
||
|
|
}
|
||
|
|
static void mf2_page_registry_init(void) { pthread_once(&mf2_page_registry_init_control, mf2_page_registry_init_impl); }
|
||
|
|
static void mf2_thread_pages_destructor(void* arg) { (void)arg; }
|
||
|
|
static void mf2_init_tls_key(void) { pthread_key_create(&g_mf2_tls_key, mf2_thread_pages_destructor); }
|
||
|
|
static inline uint64_t mf2_rdtsc(void) {
|
||
|
|
#if defined(__x86_64__) || defined(__i386__)
|
||
|
|
uint32_t lo, hi; __asm__ __volatile__("rdtsc" : "=a"(lo), "=d"(hi)); return ((uint64_t)hi << 32) | lo;
|
||
|
|
#else
|
||
|
|
struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64_t)ts.tv_sec * 1000000000ULL + (uint64_t)ts.tv_nsec;
|
||
|
|
#endif
|
||
|
|
}
|
||
|
|
static MF2_ThreadPages* mf2_thread_pages_get(void) {
|
||
|
|
if (t_mf2_pages) return t_mf2_pages;
|
||
|
|
pthread_once(&g_mf2_key_once, mf2_init_tls_key);
|
||
|
|
MF2_ThreadPages* tp = (MF2_ThreadPages*)hkm_libc_calloc(1, sizeof(MF2_ThreadPages)); if (!tp) return NULL;
|
||
|
|
tp->my_tid = pthread_self();
|
||
|
|
for (int c=0; c<POOL_NUM_CLASSES; c++) { tp->active_page[c]=NULL; tp->full_pages[c]=NULL; atomic_store_explicit(&tp->pages_remote_pending[c],0,memory_order_relaxed); atomic_flag_clear_explicit(&tp->pending_claim[c], memory_order_relaxed); tp->page_count[c]=0; }
|
||
|
|
atomic_store_explicit(&tp->last_alloc_tsc, mf2_rdtsc(), memory_order_relaxed);
|
||
|
|
int idx = atomic_fetch_add_explicit(&g_num_thread_pages, 1, memory_order_acq_rel);
|
||
|
|
if (idx < MF2_MAX_THREADS) atomic_store_explicit((atomic_uintptr_t*)&g_all_thread_pages[idx], (uintptr_t)tp, memory_order_release);
|
||
|
|
pthread_setspecific(g_mf2_tls_key, tp); t_mf2_pages = tp; return tp;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Registry ops
|
||
|
|
static inline MidPage* mf2_addr_to_page(void* addr) {
|
||
|
|
void* page_base = (void*)((uintptr_t)addr & ~0xFFFFULL);
|
||
|
|
size_t idx = ((uintptr_t)page_base >> 16) & (MF2_PAGE_REGISTRY_SIZE - 1);
|
||
|
|
MidPage* page = g_mf2_page_registry.pages[idx];
|
||
|
|
if (page && page->base == page_base) return page; return NULL;
|
||
|
|
}
|
||
|
|
static void mf2_register_page(MidPage* page) {
|
||
|
|
if (!page) return; size_t idx = ((uintptr_t)page->base >> 16) & (MF2_PAGE_REGISTRY_SIZE - 1);
|
||
|
|
int lock_idx = idx % 256; pthread_mutex_lock(&g_mf2_page_registry.locks[lock_idx]);
|
||
|
|
if (g_mf2_page_registry.pages[idx] != NULL) { HAKMEM_LOG("[MF2] WARNING: Page registry collision at index %zu\n", idx); }
|
||
|
|
g_mf2_page_registry.pages[idx] = page;
|
||
|
|
atomic_fetch_add_explicit(&g_mf2_page_registry.total_pages, 1, memory_order_relaxed);
|
||
|
|
atomic_fetch_add_explicit(&g_mf2_page_registry.active_pages, 1, memory_order_relaxed);
|
||
|
|
pthread_mutex_unlock(&g_mf2_page_registry.locks[lock_idx]);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Allocation helpers
|
||
|
|
static MidPage* mf2_alloc_new_page(int class_idx) {
|
||
|
|
if (class_idx < 0 || class_idx >= POOL_NUM_CLASSES) return NULL; size_t user_size = g_class_sizes[class_idx]; if (user_size == 0) return NULL; size_t block_size = HEADER_SIZE + user_size;
|
||
|
|
size_t alloc_size = POOL_PAGE_SIZE * 2; void* raw = mmap(NULL, alloc_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); if (raw == MAP_FAILED) return NULL;
|
||
|
|
uintptr_t addr = (uintptr_t)raw; uintptr_t aligned = (addr + 0xFFFF) & ~0xFFFFULL; void* page_base = (void*)aligned;
|
||
|
|
size_t prefix_size = aligned - addr; if (prefix_size > 0) munmap(raw, prefix_size);
|
||
|
|
size_t suffix_offset = prefix_size + POOL_PAGE_SIZE; if (suffix_offset < alloc_size) munmap((char*)raw + suffix_offset, alloc_size - suffix_offset);
|
||
|
|
if (((uintptr_t)page_base & 0xFFFF) != 0) { MF2_ERROR_LOG("ALIGNMENT BUG: Page %p not 64KB aligned!", page_base); }
|
||
|
|
memset(page_base, 0, POOL_PAGE_SIZE);
|
||
|
|
MidPage* page = (MidPage*)hkm_libc_calloc(1, sizeof(MidPage)); if (!page) { munmap(page_base, POOL_PAGE_SIZE); return NULL; }
|
||
|
|
page->base = page_base; page->class_idx = (uint8_t)class_idx; page->flags = 0; page->owner_tid = pthread_self(); page->owner_tp = mf2_thread_pages_get(); page->last_transfer_time = 0;
|
||
|
|
size_t usable_size = POOL_PAGE_SIZE; size_t num_blocks = usable_size / block_size; page->capacity = (uint16_t)num_blocks; page->free_count = (uint16_t)num_blocks;
|
||
|
|
PoolBlock* freelist_head = NULL; PoolBlock* freelist_tail = NULL; for (size_t i=0;i<num_blocks;i++){ char* block_addr=(char*)page_base + (i*block_size); PoolBlock* block=(PoolBlock*)block_addr; block->next=NULL; if(!freelist_head){freelist_head=block; freelist_tail=block;} else {freelist_tail->next=block; freelist_tail=block;}}
|
||
|
|
page->freelist = freelist_head; atomic_store(&page->remote_head,(uintptr_t)0); atomic_store(&page->remote_count,0); atomic_store(&page->in_use,0); atomic_store(&page->pending_dn,0);
|
||
|
|
page->next_page=NULL; page->prev_page=NULL; atomic_store_explicit(&page->in_remote_pending,false,memory_order_relaxed); page->next_pending=NULL; mf2_register_page(page); return page;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Remote-drain / Pending queue
|
||
|
|
static int mf2_drain_remote_frees(MidPage* page) {
|
||
|
|
if (!page) return 0; atomic_fetch_add(&g_mf2_drain_attempts, 1);
|
||
|
|
unsigned int remote_count = atomic_load_explicit(&page->remote_count, memory_order_seq_cst); if (remote_count == 0) return 0;
|
||
|
|
uintptr_t head = atomic_exchange_explicit(&page->remote_head, (uintptr_t)0, memory_order_acq_rel); if (!head) { atomic_store_explicit(&page->remote_count, 0, memory_order_release); return 0; }
|
||
|
|
atomic_store_explicit(&page->remote_count, 0, memory_order_release);
|
||
|
|
int drained = 0; PoolBlock* cur=(PoolBlock*)head; PoolBlock* tail=NULL; while(cur){drained++; tail=cur; cur=cur->next;}
|
||
|
|
if (tail){ tail->next = page->freelist; page->freelist=(PoolBlock*)head; page->free_count += drained; }
|
||
|
|
atomic_fetch_add(&g_mf2_drain_count,1); atomic_fetch_add(&g_mf2_drain_blocks,drained);
|
||
|
|
unsigned int post = atomic_load_explicit(&page->remote_count, memory_order_acquire); if (post>=1 && page->owner_tp){ /* re-enqueue */ }
|
||
|
|
return drained;
|
||
|
|
}
|
||
|
|
static void mf2_enqueue_pending(MF2_ThreadPages* owner_tp, MidPage* page) {
|
||
|
|
if (!owner_tp || !page) return; _Bool was_pending = atomic_exchange_explicit(&page->in_remote_pending, true, memory_order_acq_rel); if (was_pending) return;
|
||
|
|
atomic_fetch_add(&g_mf2_pending_enqueued, 1); uintptr_t old_head; do { old_head = atomic_load_explicit(&owner_tp->pages_remote_pending[page->class_idx], memory_order_relaxed); page->next_pending=(MidPage*)old_head; } while (!atomic_compare_exchange_weak_explicit(&owner_tp->pages_remote_pending[page->class_idx], &old_head, (uintptr_t)page, memory_order_release, memory_order_relaxed)); if (old_head==0) atomic_fetch_add_explicit(&g_adoptable_count[page->class_idx],1,memory_order_relaxed);
|
||
|
|
}
|
||
|
|
static MidPage* mf2_dequeue_pending(MF2_ThreadPages* tp, int class_idx) {
|
||
|
|
if (!tp) return NULL; uintptr_t old_head; do { old_head = atomic_load_explicit(&tp->pages_remote_pending[class_idx], memory_order_acquire); if (old_head==0) return NULL; MidPage* page=(MidPage*)old_head; if (atomic_compare_exchange_weak_explicit(&tp->pages_remote_pending[class_idx], &old_head, (uintptr_t)page->next_pending, memory_order_acq_rel, memory_order_relaxed)) { MidPage* next=page->next_pending; page->next_pending=NULL; if (next==NULL) atomic_fetch_sub_explicit(&g_adoptable_count[class_idx],1,memory_order_relaxed); return page; } } while (1);
|
||
|
|
}
|
||
|
|
|
||
|
|
// === Helper functions and alloc/free paths (moved from hakmem_pool.c) ===
|
||
|
|
static inline void mf2_make_page_active(MF2_ThreadPages* tp, int class_idx, MidPage* page) {
|
||
|
|
if (!tp || !page) return;
|
||
|
|
if (tp->active_page[class_idx]) { MidPage* old_active = tp->active_page[class_idx]; old_active->next_page = tp->full_pages[class_idx]; tp->full_pages[class_idx] = old_active; }
|
||
|
|
tp->active_page[class_idx] = page; page->next_page = NULL;
|
||
|
|
}
|
||
|
|
static inline bool mf2_try_drain_to_partial(MF2_ThreadPages* tp, int class_idx, MidPage* page) {
|
||
|
|
if (!tp || !page) return false; int drained = mf2_drain_remote_frees(page);
|
||
|
|
if (page->freelist) { atomic_fetch_add(&g_mf2_page_reuse_count, 1); page->next_page = tp->partial_pages[class_idx]; tp->partial_pages[class_idx] = page; return true; }
|
||
|
|
page->next_page = tp->full_pages[class_idx]; tp->full_pages[class_idx] = page; return false;
|
||
|
|
}
|
||
|
|
static inline bool mf2_try_drain_and_activate(MF2_ThreadPages* tp, int class_idx, MidPage* page) {
|
||
|
|
if (!tp || !page) return false; int drained = mf2_drain_remote_frees(page);
|
||
|
|
if (page->freelist) { atomic_fetch_add(&g_mf2_page_reuse_count, 1); mf2_make_page_active(tp, class_idx, page); return true; }
|
||
|
|
page->next_page = tp->full_pages[class_idx]; tp->full_pages[class_idx] = page; return false;
|
||
|
|
}
|
||
|
|
static bool mf2_try_reuse_own_pending(MF2_ThreadPages* tp, int class_idx) {
|
||
|
|
if (!tp) return false; for (int budget=0; budget<MF2_PENDING_QUEUE_BUDGET; budget++) { MidPage* pending_page = mf2_dequeue_pending(tp, class_idx); if (!pending_page) break; atomic_fetch_add(&g_mf2_pending_drained,1); atomic_store_explicit(&pending_page->in_remote_pending,false,memory_order_release); if (mf2_try_drain_and_activate(tp, class_idx, pending_page)) return true; } return false;
|
||
|
|
}
|
||
|
|
static bool mf2_try_drain_active_remotes(MF2_ThreadPages* tp, int class_idx) {
|
||
|
|
if (!tp) return false; MidPage* page = tp->active_page[class_idx]; if (!page) return false; atomic_fetch_add(&g_mf2_slow_checked_drain,1); unsigned int remote_cnt = atomic_load_explicit(&page->remote_count, memory_order_seq_cst); if (remote_cnt>0){ atomic_fetch_add(&g_mf2_slow_found_remote,1); int drained = mf2_drain_remote_frees(page); if (drained>0 && page->freelist){ atomic_fetch_add(&g_mf2_drain_success,1); return true; } } return false;
|
||
|
|
}
|
||
|
|
static MidPage* mf2_alloc_and_activate_new_page(MF2_ThreadPages* tp, int class_idx) {
|
||
|
|
if (!tp) return NULL; atomic_fetch_add(&g_mf2_new_page_count,1); static _Atomic int new_page_samples=0; int sample_idx=atomic_fetch_add_explicit(&new_page_samples,1,memory_order_relaxed); if (sample_idx<MF2_DEBUG_SAMPLE_COUNT){ int total_adoptable=0; for (int i=0;i<POOL_NUM_CLASSES;i++){ total_adoptable += atomic_load_explicit(&g_adoptable_count[i], memory_order_relaxed); } MF2_DEBUG_LOG("NEW_PAGE %d: class=%d, own_pending=%p, adoptable_total=%d, active=%p, full=%p", sample_idx, class_idx, (void*)atomic_load_explicit(&tp->pages_remote_pending[class_idx], memory_order_relaxed), total_adoptable, tp->active_page[class_idx], tp->full_pages[class_idx]); }
|
||
|
|
MidPage* page = mf2_alloc_new_page(class_idx); if (!page) return NULL; if (tp->active_page[class_idx]){ MidPage* old_page = tp->active_page[class_idx]; old_page->next_page = tp->full_pages[class_idx]; tp->full_pages[class_idx] = old_page; }
|
||
|
|
tp->active_page[class_idx]=page; tp->page_count[class_idx]++; return page;
|
||
|
|
}
|
||
|
|
static bool mf2_try_adopt_pending(MF2_ThreadPages* me, int class_idx) {
|
||
|
|
if (!me) return false; int adoptable=atomic_load_explicit(&g_adoptable_count[class_idx], memory_order_relaxed); if (adoptable==0) return false; int num_tp=atomic_load_explicit(&g_num_thread_pages, memory_order_acquire); if (num_tp==0) return false; int scan_limit=(num_tp<g_mf2_max_queues)?num_tp:g_mf2_max_queues; static _Atomic uint64_t adopt_counter=0; uint64_t start_idx=atomic_fetch_add_explicit(&adopt_counter,1,memory_order_relaxed);
|
||
|
|
for (int i=0;i<scan_limit;i++){ int tp_idx=(start_idx + i) % num_tp; MF2_ThreadPages* other_tp=(MF2_ThreadPages*)atomic_load_explicit((atomic_uintptr_t*)&g_all_thread_pages[tp_idx], memory_order_acquire); if (!other_tp) continue; uint64_t now_tsc=mf2_rdtsc(); uint64_t owner_last_alloc=atomic_load_explicit(&other_tp->last_alloc_tsc, memory_order_relaxed); uint64_t idle_thr=(uint64_t)g_mf2_idle_threshold_us * MF2_TSC_CYCLES_PER_US; if ((now_tsc - owner_last_alloc) < idle_thr) continue; if (atomic_flag_test_and_set_explicit(&other_tp->pending_claim[class_idx], memory_order_acquire)) continue; MidPage* page = mf2_dequeue_pending(other_tp,class_idx); if (!page){ atomic_flag_clear_explicit(&other_tp->pending_claim[class_idx], memory_order_release); continue; }
|
||
|
|
atomic_store_explicit(&page->in_remote_pending,false,memory_order_release);
|
||
|
|
uint64_t now = mf2_rdtsc(); uint64_t last_transfer = page->last_transfer_time; if (g_mf2_lease_ms>0 && last_transfer!=0){ uint64_t lease_cycles = (uint64_t)g_mf2_lease_ms * (MF2_TSC_CYCLES_PER_US * 1000ULL); if ((now - last_transfer) < lease_cycles){ page->next_page = other_tp->full_pages[class_idx]; other_tp->full_pages[class_idx]=page; atomic_flag_clear_explicit(&other_tp->pending_claim[class_idx], memory_order_release); continue; } }
|
||
|
|
page->owner_tid = pthread_self(); page->owner_tp = me; page->last_transfer_time = now; unsigned int pre_remote = atomic_load_explicit(&page->remote_count, memory_order_relaxed); unsigned int pre_free = page->free_count; PoolBlock* pre_freelist = page->freelist; int drained = mf2_drain_remote_frees(page);
|
||
|
|
if (page->freelist){ atomic_fetch_add(&g_mf2_page_reuse_count,1); atomic_fetch_add(&g_mf2_pending_drained,1); atomic_fetch_add(&g_mf2_drain_success,1); mf2_make_page_active(me,class_idx,page); atomic_flag_clear_explicit(&other_tp->pending_claim[class_idx], memory_order_release); return true; }
|
||
|
|
page->next_page = me->full_pages[class_idx]; me->full_pages[class_idx]=page; atomic_flag_clear_explicit(&other_tp->pending_claim[class_idx], memory_order_release);
|
||
|
|
}
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
static inline void* mf2_alloc_fast(int class_idx, size_t size, uintptr_t site_id) {
|
||
|
|
MF2_ThreadPages* tp = mf2_thread_pages_get(); if (!tp) return NULL; MidPage* page=tp->active_page[class_idx]; if (!page) return mf2_alloc_slow(class_idx,size,site_id); if (page->freelist){ atomic_fetch_add(&g_mf2_alloc_fast_hit,1); atomic_store_explicit(&tp->last_alloc_tsc, mf2_rdtsc(), memory_order_relaxed); PoolBlock* block=page->freelist; page->freelist=block->next; page->free_count--; atomic_fetch_add_explicit(&page->in_use,1,memory_order_relaxed); return (char*)block + HEADER_SIZE; } return mf2_alloc_slow(class_idx,size,site_id);
|
||
|
|
}
|
||
|
|
static void* mf2_alloc_slow(int class_idx, size_t size, uintptr_t site_id) {
|
||
|
|
(void)site_id; atomic_fetch_add(&g_mf2_alloc_slow_hit,1); MF2_ThreadPages* tp=mf2_thread_pages_get(); if (!tp) return NULL; if (mf2_try_reuse_own_pending(tp,class_idx)) return mf2_alloc_fast(class_idx,size,site_id); if (mf2_try_drain_active_remotes(tp,class_idx)) return mf2_alloc_fast(class_idx,size,site_id); if (mf2_try_adopt_pending(tp,class_idx)) return mf2_alloc_fast(class_idx,size,site_id); MidPage* page=mf2_alloc_and_activate_new_page(tp,class_idx); if (!page) return NULL; return mf2_alloc_fast(class_idx,size,site_id);
|
||
|
|
}
|
||
|
|
static inline void mf2_free_fast(MidPage* page, void* ptr) { if (!page||!ptr) return; atomic_fetch_add(&g_mf2_free_owner_count,1); PoolBlock* block=(PoolBlock*)((char*)ptr - HEADER_SIZE); block->next=page->freelist; page->freelist=block; page->free_count++; int old_in_use=atomic_fetch_sub_explicit(&page->in_use,1,memory_order_release); if (old_in_use==1 && page->free_count==page->capacity) hak_batch_add_page(page->base, POOL_PAGE_SIZE); }
|
||
|
|
static void mf2_free_slow(MidPage* page, void* ptr) { if (!page||!ptr) return; atomic_fetch_add(&g_mf2_free_remote_count,1); PoolBlock* block=(PoolBlock*)((char*)ptr - HEADER_SIZE); uintptr_t old_head; do { old_head=atomic_load_explicit(&page->remote_head, memory_order_acquire); block->next=(PoolBlock*)old_head; } while(!atomic_compare_exchange_weak_explicit(&page->remote_head,&old_head,(uintptr_t)block, memory_order_release, memory_order_relaxed)); unsigned int old_count=atomic_fetch_add_explicit(&page->remote_count,1,memory_order_seq_cst); static int g_enqueue_threshold=1; if (old_count+1==(unsigned int)g_enqueue_threshold){ if (page->owner_tp) mf2_enqueue_pending(page->owner_tp,page); } int old_in_use=atomic_fetch_sub_explicit(&page->in_use,1,memory_order_release); if (old_in_use==1 && page->free_count + atomic_load_explicit(&page->remote_count, memory_order_acquire) >= page->capacity) hak_batch_add_page(page->base, POOL_PAGE_SIZE); }
|
||
|
|
static void mf2_free(void* ptr) { if (!ptr) return; MidPage* page=mf2_addr_to_page(ptr); if (!page) return; MF2_ThreadPages* tp=mf2_thread_pages_get(); if (tp && page->owner_tid==tp->my_tid) mf2_free_fast(page,ptr); else mf2_free_slow(page,ptr); }
|
||
|
|
|
||
|
|
#endif // POOL_MF2_CORE_INC_H
|