""" PHI instruction lowering Critical for SSA form - handles value merging from different control flow paths """ import llvmlite.ir as ir from phi_wiring import phi_at_block_head from typing import Dict, List, Tuple, Optional from utils.values import safe_vmap_write def lower_phi( builder: ir.IRBuilder, dst_vid: int, incoming: List[Tuple[int, int]], # [(value_id, block_id), ...] vmap: Dict[int, ir.Value], bb_map: Dict[int, ir.Block], current_block: ir.Block, resolver=None, # Resolver instance (optional) block_end_values: Optional[Dict[int, Dict[int, ir.Value]]] = None, preds_map: Optional[Dict[int, List[int]]] = None ) -> None: """ Lower MIR PHI instruction Args: builder: Current LLVM IR builder dst_vid: Destination value ID incoming: List of (value_id, block_id) pairs vmap: Value map bb_map: Block map current_block: Current basic block resolver: Optional resolver for advanced type handling """ if not incoming: # No incoming edges - use zero vmap[dst_vid] = ir.Constant(ir.IntType(64), 0) return # Use i64 for PHI to carry handles across blocks (strings/boxes), # avoiding pointer PHIs that complicate dominance and boxing. phi_type = ir.IntType(64) # Build map from provided incoming incoming_map: Dict[int, int] = {} for val_id, block_id in incoming: incoming_map[block_id] = val_id # Resolve actual predecessor set cur_bid = None try: cur_bid = int(str(current_block.name).replace('bb','')) except Exception: pass actual_preds = [] if preds_map is not None and cur_bid is not None: actual_preds = [p for p in preds_map.get(cur_bid, []) if p != cur_bid] else: # Fallback: use blocks in incoming list actual_preds = [b for _, b in incoming] # Collect incoming values incoming_pairs: List[Tuple[ir.Block, ir.Value]] = [] used_default_zero = False for block_id in actual_preds: block = bb_map.get(block_id) vid = incoming_map.get(block_id) if block is None: continue # Prefer resolver-driven localization per predecessor block to satisfy dominance if vid is not None and resolver is not None and bb_map is not None: try: pred_block_obj = bb_map.get(block_id) if pred_block_obj is not None and hasattr(resolver, 'resolve_i64'): val = resolver.resolve_i64(vid, pred_block_obj, preds_map or {}, block_end_values or {}, vmap, bb_map) else: val = None except Exception: val = None if val is None: # Missing incoming for this predecessor → default 0 val = ir.Constant(phi_type, 0) used_default_zero = True else: # Snapshot fallback if block_end_values is not None: snap = block_end_values.get(block_id, {}) val = snap.get(vid) if vid is not None else None else: val = vmap.get(vid) if vid is not None else None if not val: # Missing incoming for this predecessor → default 0 val = ir.Constant(phi_type, 0) used_default_zero = True # Coerce pointer to i64 at predecessor end if hasattr(val, 'type') and val.type != phi_type: pb = ir.IRBuilder(block) try: term = block.terminator if term is not None: pb.position_before(term) else: pb.position_at_end(block) except Exception: pb.position_at_end(block) if isinstance(phi_type, ir.IntType) and val.type.is_pointer: i8p = ir.IntType(8).as_pointer() try: if hasattr(val.type, 'pointee') and isinstance(val.type.pointee, ir.ArrayType): c0 = ir.Constant(ir.IntType(32), 0) val = pb.gep(val, [c0, c0], name=f"phi_gep_{vid}") except Exception: pass boxer = None for f in builder.module.functions: if f.name == 'nyash.box.from_i8_string': boxer = f break if boxer is None: boxer = ir.Function(builder.module, ir.FunctionType(ir.IntType(64), [i8p]), name='nyash.box.from_i8_string') val = pb.call(boxer, [val], name=f"phi_ptr2h_{vid}") incoming_pairs.append((block, val)) # If nothing collected, use zero constant and bail out if not incoming_pairs: vmap[dst_vid] = ir.Constant(phi_type, 0) return # Create PHI instruction at the block head and add incoming phi = phi_at_block_head(current_block, phi_type, name=f"phi_{dst_vid}") for block, val in incoming_pairs: phi.add_incoming(val, block) # Store PHI result vmap[dst_vid] = phi # Register PHI definition in def_blocks (critical for resolver dominance tracking) if resolver is not None and hasattr(resolver, 'def_blocks') and cur_bid is not None: resolver.def_blocks.setdefault(dst_vid, set()).add(cur_bid) import os if os.environ.get('NYASH_LLVM_PHI_DEBUG') == '1': try: from trace import phi as trace_phi_debug trace_phi_debug(f"[PHI_DEBUG] Registered dst_vid={dst_vid} in def_blocks for block={cur_bid}") except Exception: pass # Strict mode: fail fast on synthesized zeros (indicates incomplete incoming or dominance issue) import os if used_default_zero and os.environ.get('NYASH_LLVM_PHI_STRICT') == '1': raise RuntimeError(f"[LLVM_PY] PHI dst={dst_vid} used synthesized zero; check preds/incoming") try: from trace import phi as trace_phi try: blkname = str(current_block.name) except Exception: blkname = '' trace_phi(f"[PHI] {blkname} v{dst_vid} incoming={len(incoming_pairs)} zero={1 if used_default_zero else 0}") except Exception: pass # Propagate string-ness: if any incoming value-id is tagged string-ish, mark dst as string-ish. try: if resolver is not None and hasattr(resolver, 'is_stringish') and hasattr(resolver, 'mark_string'): for val_id, _b in incoming: try: if resolver.is_stringish(val_id): resolver.mark_string(dst_vid) break except Exception: pass except Exception: pass def defer_phi_wiring( dst_vid: int, incoming: List[Tuple[int, int]], phi_deferrals: List[Tuple[int, List[Tuple[int, int]]]] ) -> None: """ Defer PHI wiring for sealed block approach Args: dst_vid: Destination value ID incoming: Incoming edges phi_deferrals: List to store deferred PHIs """ phi_deferrals.append((dst_vid, incoming))