#include "pool_tls_remote.h" #include #include #include #include #include #include "box/tiny_next_ptr_box.h" // Box API: preserve header by using class-aware next offset #define REMOTE_BUCKETS 256 // Lock-free MPSC queue per class typedef struct RemoteQueue { _Atomic(void*) head; // Atomic head for lock-free push (LIFO) _Atomic uint32_t count; // Approximate count } RemoteQueue; typedef struct RemoteRec { int tid; RemoteQueue queues[7]; // One queue per class (lock-free) struct RemoteRec* next; } RemoteRec; static RemoteRec* g_buckets[REMOTE_BUCKETS]; static pthread_mutex_t g_locks[REMOTE_BUCKETS]; // Only for RemoteRec creation static pthread_once_t g_once = PTHREAD_ONCE_INIT; static void rq_init(void){ for (int i=0;i 6 || ptr == NULL) return 0; pthread_once(&g_once, rq_init); unsigned b = hb(owner_tid); // Find or create RemoteRec (only this part needs mutex) RemoteRec* r = g_buckets[b]; while (r && r->tid != owner_tid) r = r->next; if (!r){ pthread_mutex_lock(&g_locks[b]); // Double-check after acquiring lock r = g_buckets[b]; while (r && r->tid != owner_tid) r = r->next; if (!r){ r = (RemoteRec*)calloc(1, sizeof(RemoteRec)); r->tid = owner_tid; r->next = g_buckets[b]; g_buckets[b] = r; } pthread_mutex_unlock(&g_locks[b]); } // Lock-free push using CAS (this is the hot path!) RemoteQueue* q = &r->queues[class_idx]; void* old_head = atomic_load_explicit(&q->head, memory_order_relaxed); do { // Link new node to current head using Box API (preserves header) tiny_next_write(class_idx, ptr, old_head); } while (!atomic_compare_exchange_weak_explicit( &q->head, &old_head, ptr, memory_order_release, memory_order_relaxed)); atomic_fetch_add_explicit(&q->count, 1, memory_order_relaxed); return 1; } // Drain up to a small batch for this thread and class int pool_remote_pop_chain(int class_idx, int max_take, void** out_chain){ if (class_idx < 0 || class_idx > 6 || out_chain==NULL) return 0; pthread_once(&g_once, rq_init); int mytid = (int)syscall(SYS_gettid); unsigned b = hb(mytid); // Find my RemoteRec (no lock needed for reading) RemoteRec* r = g_buckets[b]; while (r && r->tid != mytid) r = r->next; if (!r) return 0; // No remote queue for this thread // Lock-free pop using atomic exchange RemoteQueue* q = &r->queues[class_idx]; void* head = atomic_exchange_explicit(&q->head, NULL, memory_order_acquire); if (!head) return 0; // Queue was empty // Count nodes and take up to max_take (traverse LIFO chain) if (max_take <= 0) max_take = 32; void* chain = NULL; void* tail = NULL; int batch = 0; // Pop up to max_take from the stolen chain while (head && batch < max_take){ void* nxt = tiny_next_read(class_idx, head); // Build output chain (reverse for FIFO order) if (!chain){ chain = head; tail = head; } else { tiny_next_write(class_idx, tail, head); tail = head; } head = nxt; batch++; } // If we didn't take all nodes, push remainder back (lock-free) if (head){ void* old_head = atomic_load_explicit(&q->head, memory_order_relaxed); do { // Find tail of remainder chain void* remainder_tail = head; while (tiny_next_read(class_idx, remainder_tail)) { remainder_tail = tiny_next_read(class_idx, remainder_tail); } // Link remainder to current head tiny_next_write(class_idx, remainder_tail, old_head); } while (!atomic_compare_exchange_weak_explicit( &q->head, &old_head, head, memory_order_release, memory_order_relaxed)); } atomic_fetch_sub_explicit(&q->count, batch, memory_order_relaxed); *out_chain = chain; return batch; }