diff --git a/core/pool_tls_registry.c b/core/pool_tls_registry.c index e2fd0a2d..125de942 100644 --- a/core/pool_tls_registry.c +++ b/core/pool_tls_registry.c @@ -2,22 +2,26 @@ #include #include #include +#include typedef struct RegEntry { void* base; void* end; pid_t tid; int class_idx; - struct RegEntry* next; + _Atomic(struct RegEntry*) next; // Atomic for lock-free reads } RegEntry; #define REG_BUCKETS 1024 -static RegEntry* g_buckets[REG_BUCKETS]; -static pthread_mutex_t g_locks[REG_BUCKETS]; +static _Atomic(RegEntry*) g_buckets[REG_BUCKETS]; // Atomic buckets for lock-free reads +static pthread_mutex_t g_locks[REG_BUCKETS]; // Only for registration/unregistration static pthread_once_t g_init_once = PTHREAD_ONCE_INIT; static void reg_init(void){ - for (int i=0;ibase = base; e->end = end; e->tid = tid; e->class_idx = class_idx; e->next = g_buckets[h]; - g_buckets[h] = e; + e->base = base; e->end = end; e->tid = tid; e->class_idx = class_idx; + RegEntry* old_head = atomic_load_explicit(&g_buckets[h], memory_order_relaxed); + atomic_store_explicit(&e->next, old_head, memory_order_relaxed); + atomic_store_explicit(&g_buckets[h], e, memory_order_release); pthread_mutex_unlock(&g_locks[h]); } @@ -39,13 +45,25 @@ void pool_reg_unregister(void* base, size_t size, pid_t tid){ pthread_once(&g_init_once, reg_init); uint64_t h = hash_ptr(base) & (REG_BUCKETS-1); pthread_mutex_lock(&g_locks[h]); - RegEntry** pp = &g_buckets[h]; - while (*pp){ - RegEntry* e = *pp; + + // Need to carefully update atomic pointers + _Atomic(RegEntry*)* pp = &g_buckets[h]; + RegEntry* e = atomic_load_explicit(pp, memory_order_relaxed); + RegEntry* prev = NULL; + + while (e){ if (e->base == base && e->tid == tid){ - *pp = e->next; free(e); break; + RegEntry* next = atomic_load_explicit(&e->next, memory_order_relaxed); + if (prev == NULL) { + atomic_store_explicit(&g_buckets[h], next, memory_order_release); + } else { + atomic_store_explicit(&prev->next, next, memory_order_release); + } + free(e); + break; } - pp = &e->next; + prev = e; + e = atomic_load_explicit(&e->next, memory_order_relaxed); } pthread_mutex_unlock(&g_locks[h]); } @@ -53,16 +71,21 @@ void pool_reg_unregister(void* base, size_t size, pid_t tid){ int pool_reg_lookup(void* ptr, pid_t* tid_out, int* class_idx_out){ pthread_once(&g_init_once, reg_init); uint64_t h = hash_ptr(ptr) & (REG_BUCKETS-1); - pthread_mutex_lock(&g_locks[h]); - for (RegEntry* e = g_buckets[h]; e; e=e->next){ - if (ptr >= e->base && ptr < e->end){ + + // Lock-free lookup! No mutex needed for reads + RegEntry* e = atomic_load_explicit(&g_buckets[h], memory_order_acquire); + while (e) { + // Load entry fields (they're stable after registration) + void* base = e->base; + void* end = e->end; + + if (ptr >= base && ptr < end){ if (tid_out) *tid_out = e->tid; if (class_idx_out) *class_idx_out = e->class_idx; - pthread_mutex_unlock(&g_locks[h]); return 1; } + e = atomic_load_explicit(&e->next, memory_order_acquire); } - pthread_mutex_unlock(&g_locks[h]); return 0; } diff --git a/core/pool_tls_remote.c b/core/pool_tls_remote.c index f4a18e59..dff5a5f8 100644 --- a/core/pool_tls_remote.c +++ b/core/pool_tls_remote.c @@ -3,19 +3,25 @@ #include #include #include +#include #include "box/tiny_next_ptr_box.h" // Box API: preserve header by using class-aware next offset #define REMOTE_BUCKETS 256 +// Lock-free MPSC queue per class +typedef struct RemoteQueue { + _Atomic(void*) head; // Atomic head for lock-free push (LIFO) + _Atomic uint32_t count; // Approximate count +} RemoteQueue; + typedef struct RemoteRec { int tid; - void* head[7]; - int count[7]; + RemoteQueue queues[7]; // One queue per class (lock-free) struct RemoteRec* next; } RemoteRec; static RemoteRec* g_buckets[REMOTE_BUCKETS]; -static pthread_mutex_t g_locks[REMOTE_BUCKETS]; +static pthread_mutex_t g_locks[REMOTE_BUCKETS]; // Only for RemoteRec creation static pthread_once_t g_once = PTHREAD_ONCE_INIT; static void rq_init(void){ @@ -28,18 +34,35 @@ int pool_remote_push(int class_idx, void* ptr, int owner_tid){ if (class_idx < 0 || class_idx > 6 || ptr == NULL) return 0; pthread_once(&g_once, rq_init); unsigned b = hb(owner_tid); - pthread_mutex_lock(&g_locks[b]); + + // Find or create RemoteRec (only this part needs mutex) RemoteRec* r = g_buckets[b]; while (r && r->tid != owner_tid) r = r->next; if (!r){ - r = (RemoteRec*)calloc(1, sizeof(RemoteRec)); - r->tid = owner_tid; r->next = g_buckets[b]; g_buckets[b] = r; + pthread_mutex_lock(&g_locks[b]); + // Double-check after acquiring lock + r = g_buckets[b]; + while (r && r->tid != owner_tid) r = r->next; + if (!r){ + r = (RemoteRec*)calloc(1, sizeof(RemoteRec)); + r->tid = owner_tid; + r->next = g_buckets[b]; + g_buckets[b] = r; + } + pthread_mutex_unlock(&g_locks[b]); } - // Use Box next-pointer API to avoid clobbering header (classes 1-6 store next at base+1) - tiny_next_write(class_idx, ptr, r->head[class_idx]); - r->head[class_idx] = ptr; - r->count[class_idx]++; - pthread_mutex_unlock(&g_locks[b]); + + // Lock-free push using CAS (this is the hot path!) + RemoteQueue* q = &r->queues[class_idx]; + void* old_head = atomic_load_explicit(&q->head, memory_order_relaxed); + do { + // Link new node to current head using Box API (preserves header) + tiny_next_write(class_idx, ptr, old_head); + } while (!atomic_compare_exchange_weak_explicit( + &q->head, &old_head, ptr, + memory_order_release, memory_order_relaxed)); + + atomic_fetch_add_explicit(&q->count, 1, memory_order_relaxed); return 1; } @@ -49,26 +72,57 @@ int pool_remote_pop_chain(int class_idx, int max_take, void** out_chain){ pthread_once(&g_once, rq_init); int mytid = (int)syscall(SYS_gettid); unsigned b = hb(mytid); - pthread_mutex_lock(&g_locks[b]); + + // Find my RemoteRec (no lock needed for reading) RemoteRec* r = g_buckets[b]; while (r && r->tid != mytid) r = r->next; - int drained = 0; - if (r){ - // Pop up to max_take nodes and return chain - void* head = r->head[class_idx]; - int batch = 0; if (max_take <= 0) max_take = 32; - void* chain = NULL; void* tail = NULL; - while (head && batch < max_take){ - void* nxt = tiny_next_read(class_idx, head); - if (!chain){ chain = head; tail = head; } - else { tiny_next_write(class_idx, tail, head); tail = head; } - head = nxt; batch++; + if (!r) return 0; // No remote queue for this thread + + // Lock-free pop using atomic exchange + RemoteQueue* q = &r->queues[class_idx]; + void* head = atomic_exchange_explicit(&q->head, NULL, memory_order_acquire); + if (!head) return 0; // Queue was empty + + // Count nodes and take up to max_take (traverse LIFO chain) + if (max_take <= 0) max_take = 32; + void* chain = NULL; + void* tail = NULL; + int batch = 0; + + // Pop up to max_take from the stolen chain + while (head && batch < max_take){ + void* nxt = tiny_next_read(class_idx, head); + + // Build output chain (reverse for FIFO order) + if (!chain){ + chain = head; + tail = head; + } else { + tiny_next_write(class_idx, tail, head); + tail = head; } - r->head[class_idx] = head; - r->count[class_idx] -= batch; - drained = batch; - *out_chain = chain; + + head = nxt; + batch++; } - pthread_mutex_unlock(&g_locks[b]); - return drained; + + // If we didn't take all nodes, push remainder back (lock-free) + if (head){ + void* old_head = atomic_load_explicit(&q->head, memory_order_relaxed); + do { + // Find tail of remainder chain + void* remainder_tail = head; + while (tiny_next_read(class_idx, remainder_tail)) { + remainder_tail = tiny_next_read(class_idx, remainder_tail); + } + // Link remainder to current head + tiny_next_write(class_idx, remainder_tail, old_head); + } while (!atomic_compare_exchange_weak_explicit( + &q->head, &old_head, head, + memory_order_release, memory_order_relaxed)); + } + + atomic_fetch_sub_explicit(&q->count, batch, memory_order_relaxed); + *out_chain = chain; + return batch; }