diff --git a/CURRENT_TASK.md b/CURRENT_TASK.md index 11ff0685..952741d5 100644 --- a/CURRENT_TASK.md +++ b/CURRENT_TASK.md @@ -1,4 +1,197 @@ -# 🎯 現在のタスク (2025-08-11 BoxBase + BoxCore革命開始!) +# 🎯 現在のタスク (2025-08-11 P2PBox設計完成!) + +## 🚀 2025-08-11 P2PBox完璧設計達成 + +### 💡 **ChatGPT大会議成果** +**禿げるほど考えた末の完璧なアーキテクチャ決定!** + +#### **核心設計思想** +- **Bus = ローカルOS**: 常に保持、配送・購読・監視のハブ +- **Transport = NIC**: 通信手段選択、InProcess/WebSocket/WebRTC切り替え +- **IntentBox**: メッセージ専用Box(Transportと分離) + +#### **完全実装仕様(コンテキスト圧縮復元)** + +**🎯 IntentBox詳細設計(Nyash同期・シンプル版)** +```rust +// ✅ 最初の実装はシンプル同期版 +pub struct IntentBox { + pub intent: String, // Intent種類("chat.message", "file.transfer"等) + pub payload: HashMap>, // Nyashネイティブ・同期 +} + +impl IntentBox { + pub fn new(intent: &str) -> Self; + pub fn set(&mut self, key: &str, value: Box); + pub fn get(&self, key: &str) -> Option<&Box>; +} + +// 🔄 将来拡張用(後回し) +// pub struct SendOpts { ack_required, timeout_ms } - async時に追加 +// pub struct IntentEnvelope { from, to, intent } - ネット対応時に追加 +``` + +**🎯 P2PBox詳細設計(Nyash同期・シンプル版)** +```rust +// ✅ 最初の実装はシンプル同期版 +pub struct P2PBox { + node_id: String, + transport: Box, + bus: Arc, // ← 常に保持!(ローカル配送・購読・監視用) +} + +impl P2PBox { + // シンプル同期コンストラクタ + pub fn new(node_id: &str, transport_kind: TransportKind) -> Self { + let bus = get_global_message_bus(); // シングルトン取得 + let transport = create_transport(transport_kind, node_id); // 簡単ファクトリ + + // 自ノード登録 + bus.register_node(node_id).unwrap(); + + Self { + node_id: node_id.to_string(), + transport, + bus + } + } + + // 購読メソッド - Busに登録 + pub fn on(&self, intent: &str, callback: Box) { + self.bus.on(&self.node_id, intent, callback).unwrap(); + } + + // 送信メソッド - 天才アルゴリズム内蔵(同期版) + pub fn send(&self, to: &str, intent_box: &IntentBox) -> Result<(), String> { + // 1) 宛先が同プロセス(Busが知っている)ならローカル配送 + if self.bus.has_node(to) { + let message = BusMessage { + from: self.node_id.clone(), + to: to.to_string(), + intent: intent_box.intent.clone(), + data: /* IntentBoxをNyashBoxに変換 */, + timestamp: std::time::SystemTime::now(), + }; + self.bus.route(message)?; // 爆速ローカル + return Ok(()); + } + + // 2) ローカルに居ない → Transportで外へ出す + self.transport.send(to, &intent_box.intent, /* data */) + } + + pub fn get_node_id(&self) -> &str { + &self.node_id + } +} + +// 🔄 将来拡張用(後回し) +// async fn send() - async対応時 +// TransportFactory::create() - 複雑なオプション対応時 +// on_receive()コールバック - ネット受信対応時 +``` + +**🎯 TransportKind & ファクトリ(Nyash同期・シンプル版)** +```rust +// ✅ 最初の実装はシンプル版 +#[derive(Debug, Clone)] +pub enum TransportKind { + InProcess, // プロセス内通信(最初に実装) + WebSocket, // WebSocket通信(将来実装) + WebRTC, // P2P直接通信(将来実装) +} + +// シンプルファクトリ関数 +pub fn create_transport(kind: TransportKind, node_id: &str) -> Box { + match kind { + TransportKind::InProcess => Box::new(InProcessTransport::new(node_id.to_string())), + TransportKind::WebSocket => todo!("WebSocket transport - 将来実装"), + TransportKind::WebRTC => todo!("WebRTC transport - 将来実装"), + } +} + +// 🔄 将来拡張用(後回し) +// pub struct TransportFactory; - 複雑なオプション対応時 +// pub struct TransportOpts; - オプション追加時 +``` + +**🎯 4つの核心(忘れてはいけないポイント)** +``` +1. P2PBoxは、トランスポートがネットでもBusを持ち続ける(ローカル配送・購読・監視用) +2. P2PBoxはIntentBoxを使って送る +3. 送信アルゴリズム:ローカルならBus、それ以外はTransport +4. 受信アルゴリズム:Transport→P2PBox→Bus でローカルハンドラに届く +``` + +**🎯 天才アルゴリズム実装(同期・シンプル版)** +```rust +// 送信:ローカル優先 → リモートフォールバック +if self.bus.has_node(to) { + self.bus.route(message)?; // ← 爆速ローカル(ゼロコピー級) + return Ok(()); +} else { + self.transport.send(to, intent, data)?; // ← Transport経由(同期) +} + +// 受信:将来実装時の流れ +// Transport.receive() → IntentBox → MessageBus.route() → LocalHandler +``` + +**🎯 使用例(Nyash同期・シンプル版)** +```rust +// 基本使用パターン(同期版) +let alice = P2PBox::new("alice", TransportKind::InProcess); +let bob = P2PBox::new("bob", TransportKind::InProcess); + +// 購読登録 +bob.on("chat.message", Box::new(|intent_box: &IntentBox| { + if let Some(text) = intent_box.get("text") { + println!("Received: {}", text.to_string_box().value); + } +})); + +// メッセージ送信 +let mut intent = IntentBox::new("chat.message"); +intent.set("text", Box::new(StringBox::new("Hello Bob!"))); +alice.send("bob", &intent).unwrap(); // ← 天才アルゴリズム自動判定(同期) +``` + +**🎯 実装順序(重要)** +``` +1. まず cargo build --lib でコンパイル確認 +2. IntentBox実装(HashMap + Nyashネイティブ) +3. TransportKind enum実装 +4. P2PBox本体実装(天才アルゴリズム内蔵) +5. テスト用Nyashコード作成・動作確認 +``` + +#### **勝利ポイント** +1. **統一API**: send()/on() でローカル・ネット同じ +2. **最速ローカル**: Bus直接配送でゼロコピー級 +3. **拡張自在**: TransportKind で通信手段切り替え +4. **デバッグ天国**: Bus でメッセージ全監視 +5. **NyaMesh実証済み**: Transport抽象化パターン + +### 🎯 **次の実装ステップ(詳細設計復元完了)** + +**基盤レイヤー(ほぼ完了)** +1. ✅ **Transport trait 定義** - NyaMesh参考実装完了 +2. ✅ **MessageBus シングルトン** - 基本実装済み、OnceLock使用 +3. 🔄 **InProcessTransport修正** - 新仕様対応が必要 + +**コアレイヤー(最優先実装)** +4. 🚨 **IntentBox実装** - HashMap>構造 +5. 🚨 **TransportKind enum** - create_transport()ファクトリ含む +6. 🚨 **P2PBox本体実装** - 天才アルゴリズム send()メソッド内蔵 + +**統合レイヤー(最終段階)** +7. **インタープリター統合** - new P2PBox(), new IntentBox()対応 +8. **テストスイート** - 基本動作確認 + +**🚨 現在の状況** +- transport_trait.rs、message_bus.rs、in_process_transport.rs 基本実装済み +- **詳細設計復元完了** ← 最重要!コンテキスト圧縮で失われた仕様を復活 +- 次回: まずcargo build --lib でコンパイル確認、その後IntentBox実装開始 ## 🔥 2025-08-11 本日の大成果 diff --git a/src/boxes/message_intent_box.rs b/src/boxes/message_intent_box.rs new file mode 100644 index 00000000..324e98f2 --- /dev/null +++ b/src/boxes/message_intent_box.rs @@ -0,0 +1,134 @@ +/** + * MessageIntentBox - メッセージコンテナBox(P2P通信用) + * + * 設計原則: + * - HashMap>でNyashネイティブデータ保持 + * - 同期・シンプル実装(async対応は将来拡張) + * - Everything is Box哲学に準拠 + * + * 注意: 既存のIntentBox(通信世界)とは別物 + * - IntentBox = 通信世界・環境の定義 + * - MessageIntentBox = 実際のメッセージデータ(これ) + */ + +use std::collections::HashMap; +use std::fmt; +use crate::box_trait::{NyashBox, BoxCore, BoxBase, next_box_id}; + +/// MessageIntentBox - Intent型通信メッセージのコンテナ +pub struct MessageIntentBox { + base: BoxBase, + /// Intent種類("chat.message", "file.transfer"等) + pub intent: String, + /// Nyashネイティブデータ保持 + pub payload: HashMap>, +} + +impl MessageIntentBox { + /// 新しいMessageIntentBoxを作成 + pub fn new(intent: &str) -> Self { + Self { + base: BoxBase { + id: next_box_id(), + parent_type_id: None, // ビルトインBox継承なし + }, + intent: intent.to_string(), + payload: HashMap::new(), + } + } + + /// キー-値ペアを設定 + pub fn set(&mut self, key: &str, value: Box) { + self.payload.insert(key.to_string(), value); + } + + /// キーに対応する値を取得 + pub fn get(&self, key: &str) -> Option<&Box> { + self.payload.get(key) + } + + /// キーに対応する値を削除 + pub fn remove(&mut self, key: &str) -> Option> { + self.payload.remove(key) + } + + /// すべてのキーを取得 + pub fn keys(&self) -> Vec { + self.payload.keys().cloned().collect() + } + + /// ペイロードが空かチェック + pub fn is_empty(&self) -> bool { + self.payload.is_empty() + } + + /// ペイロード要素数を取得 + pub fn len(&self) -> usize { + self.payload.len() + } +} + +impl BoxCore for MessageIntentBox { + fn box_id(&self) -> u64 { + self.base.id + } + + fn parent_type_id(&self) -> Option { + self.base.parent_type_id + } + + fn fmt_box(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "MessageIntentBox(intent: {}, payload: {} items)", + self.intent, self.payload.len()) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } +} + +impl NyashBox for MessageIntentBox { + fn type_name(&self) -> &'static str { + "MessageIntentBox" + } + + fn to_string_box(&self) -> crate::StringBox { + crate::StringBox::new(&format!("MessageIntentBox({})", self.intent)) + } + + fn clone_box(&self) -> Box { + let mut new_intent = MessageIntentBox::new(&self.intent); + + // PayloadをDeepClone + for (key, value) in &self.payload { + new_intent.payload.insert(key.clone(), value.clone_box()); + } + + Box::new(new_intent) + } + + fn equals(&self, other: &dyn NyashBox) -> crate::BoolBox { + if let Some(other_intent) = other.as_any().downcast_ref::() { + crate::BoolBox::new(self.box_id() == other_intent.box_id()) + } else { + crate::BoolBox::new(false) + } + } +} + +impl fmt::Display for MessageIntentBox { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.fmt_box(f) + } +} + +impl fmt::Debug for MessageIntentBox { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "MessageIntentBox {{ intent: {:?}, payload: {:?} }}", + self.intent, self.payload.len()) + } +} \ No newline at end of file diff --git a/src/boxes/mod.rs b/src/boxes/mod.rs index 24ad41d1..15637df5 100644 --- a/src/boxes/mod.rs +++ b/src/boxes/mod.rs @@ -108,6 +108,8 @@ pub mod regex; // P2P通信Box群 pub mod intent_box; pub mod p2p_box; +pub mod message_intent_box; +pub mod new_p2p_box; // null関数も再エクスポート pub use null_box::{NullBox, null}; @@ -125,4 +127,6 @@ pub use regex::RegexBox; // P2P通信Boxの再エクスポート pub use intent_box::IntentBox; -pub use p2p_box::P2PBox; \ No newline at end of file +pub use p2p_box::P2PBox; +pub use message_intent_box::MessageIntentBox; +pub use new_p2p_box::NewP2PBox; \ No newline at end of file diff --git a/src/boxes/new_p2p_box.rs b/src/boxes/new_p2p_box.rs new file mode 100644 index 00000000..d26b7061 --- /dev/null +++ b/src/boxes/new_p2p_box.rs @@ -0,0 +1,143 @@ +/** + * NewP2PBox - 天才アルゴリズム内蔵P2PBox(同期・シンプル版) + * + * 設計原則(4つの核心): + * 1. P2PBoxは、トランスポートがネットでもBusを持ち続ける(ローカル配送・購読・監視用) + * 2. P2PBoxはMessageIntentBoxを使って送る + * 3. 送信アルゴリズム:ローカルならBus、それ以外はTransport + * 4. 受信アルゴリズム:Transport→P2PBox→Bus でローカルハンドラに届く + * + * Everything is Box哲学準拠・同期実装 + */ + +use std::sync::Arc; +use crate::box_trait::{NyashBox, BoxCore, BoxBase, next_box_id}; +use crate::boxes::MessageIntentBox; +use crate::transport_trait::{Transport, TransportKind, create_transport}; +use crate::message_bus::{get_global_message_bus, BusMessage, MessageBus}; + +/// NewP2PBox - 天才アルゴリズム内蔵P2P通信ノード +pub struct NewP2PBox { + base: BoxBase, + node_id: String, + transport: Box, + bus: Arc, // ← 常に保持!(ローカル配送・購読・監視用) +} + +impl NewP2PBox { + /// シンプル同期コンストラクタ + pub fn new(node_id: &str, transport_kind: TransportKind) -> Self { + let bus = get_global_message_bus(); // シングルトン取得 + let transport = create_transport(transport_kind, node_id); // 簡単ファクトリ + + // 自ノード登録 + bus.register_node(node_id).unwrap(); + + Self { + base: BoxBase { + id: next_box_id(), + parent_type_id: None, + }, + node_id: node_id.to_string(), + transport, + bus + } + } + + /// 購読メソッド - Busに登録 + pub fn on(&self, intent: &str, callback: Box) { + // BusMessageからMessageIntentBoxを抽出するラッパー + let wrapper = Box::new(move |bus_message: &BusMessage| { + // BusMessageのdataをMessageIntentBoxにダウンキャスト + if let Some(intent_box) = bus_message.data.as_any().downcast_ref::() { + callback(intent_box); + } + }); + self.bus.on(&self.node_id, intent, wrapper).unwrap(); + } + + /// 送信メソッド - 天才アルゴリズム内蔵(同期版) + pub fn send(&self, to: &str, intent_box: &MessageIntentBox) -> Result<(), String> { + // 1) 宛先が同プロセス(Busが知っている)ならローカル配送 + if self.bus.has_node(to) { + // MessageIntentBoxからBusMessageに変換 + let message = BusMessage { + from: self.node_id.clone(), + to: to.to_string(), + intent: intent_box.intent.clone(), + data: intent_box.clone_box(), // MessageIntentBox全体をデータとして送信 + timestamp: std::time::SystemTime::now(), + }; + self.bus.route(message)?; // 爆速ローカル + return Ok(()); + } + + // 2) ローカルに居ない → Transportで外へ出す + self.transport.send(to, &intent_box.intent, intent_box.clone_box()) + } + + /// ノードID取得 + pub fn get_node_id(&self) -> &str { + &self.node_id + } +} + +impl BoxCore for NewP2PBox { + fn box_id(&self) -> u64 { + self.base.id + } + + fn parent_type_id(&self) -> Option { + self.base.parent_type_id + } + + fn fmt_box(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "NewP2PBox(node_id: {}, transport: {})", + self.node_id, self.transport.transport_type()) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } +} + +impl NyashBox for NewP2PBox { + fn type_name(&self) -> &'static str { + "NewP2PBox" + } + + fn to_string_box(&self) -> crate::StringBox { + crate::StringBox::new(&format!("NewP2PBox({})", self.node_id)) + } + + fn clone_box(&self) -> Box { + // P2PBoxは基本的にクローンしない(ノードの一意性のため) + // 必要に応じて別のコンストラクタで同じ設定の新ノードを作成する + todo!("P2PBox clone not recommended - create new node instead") + } + + fn equals(&self, other: &dyn NyashBox) -> crate::BoolBox { + if let Some(other_p2p) = other.as_any().downcast_ref::() { + crate::BoolBox::new(self.node_id == other_p2p.node_id) + } else { + crate::BoolBox::new(false) + } + } +} + +impl std::fmt::Display for NewP2PBox { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.fmt_box(f) + } +} + +impl std::fmt::Debug for NewP2PBox { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "NewP2PBox {{ node_id: {:?}, transport: {:?} }}", + self.node_id, self.transport.transport_type()) + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 62d947a3..2df99d1c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,9 @@ pub mod finalization; pub mod exception_box; pub mod method_box; pub mod type_box; // 🌟 TypeBox revolutionary system +pub mod transport_trait; +pub mod message_bus; +pub mod transports; pub mod operator_traits; // 🚀 Rust-style trait-based operator overloading pub mod box_operators; // 🚀 Operator implementations for basic Box types diff --git a/src/message_bus.rs b/src/message_bus.rs new file mode 100644 index 00000000..f9ce0ce9 --- /dev/null +++ b/src/message_bus.rs @@ -0,0 +1,173 @@ +/** + * MessageBus - Central communication hub (Bus = Local OS metaphor) + * + * Design principles from ChatGPT discussion: + * - Always present in P2PBox (even for network transport) + * - Handles local routing, subscription, monitoring + * - Singleton pattern for process-wide message coordination + * - Synchronous-first implementation + * + * NyaMesh inspiration: + * - InProcessMessageBus singleton pattern + * - Node registration/unregistration + * - Statistics tracking + */ + +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock}; +use crate::NyashBox; + +/// Message structure for internal routing +#[derive(Debug)] +pub struct BusMessage { + pub from: String, + pub to: String, + pub intent: String, + pub data: Box, + pub timestamp: std::time::SystemTime, +} + +impl Clone for BusMessage { + fn clone(&self) -> Self { + Self { + from: self.from.clone(), + to: self.to.clone(), + intent: self.intent.clone(), + data: self.data.clone_box(), // NyashBoxのclone_box()メソッドを使用 + timestamp: self.timestamp, + } + } +} + +/// Node registration information +struct NodeInfo { + node_id: String, + callbacks: HashMap>>, +} + +/// Central MessageBus - handles all local message routing +pub struct MessageBus { + /// Registered nodes in this process + nodes: RwLock>>>, + + /// Bus-level statistics + stats: Mutex, +} + +#[derive(Debug, Clone, Default)] +pub struct BusStats { + pub messages_routed: u64, + pub routing_errors: u64, + pub nodes_registered: u64, + pub total_callbacks: u64, +} + +impl MessageBus { + /// Create new MessageBus instance + pub fn new() -> Self { + Self { + nodes: RwLock::new(HashMap::new()), + stats: Mutex::new(BusStats::default()), + } + } + + /// Register a node in the message bus + pub fn register_node(&self, node_id: &str) -> Result<(), String> { + let mut nodes = self.nodes.write().unwrap(); + + if nodes.contains_key(node_id) { + return Err(format!("Node '{}' already registered", node_id)); + } + + let node_info = NodeInfo { + node_id: node_id.to_string(), + callbacks: HashMap::new(), + }; + + nodes.insert(node_id.to_string(), Arc::new(Mutex::new(node_info))); + + // Update stats + let mut stats = self.stats.lock().unwrap(); + stats.nodes_registered += 1; + + Ok(()) + } + + /// Unregister a node from the message bus + pub fn unregister_node(&self, node_id: &str) { + let mut nodes = self.nodes.write().unwrap(); + nodes.remove(node_id); + } + + /// Check if a node is registered locally + pub fn has_node(&self, node_id: &str) -> bool { + let nodes = self.nodes.read().unwrap(); + nodes.contains_key(node_id) + } + + /// Route message to local node + pub fn route(&self, message: BusMessage) -> Result<(), String> { + let nodes = self.nodes.read().unwrap(); + + if let Some(node) = nodes.get(&message.to) { + let node = node.lock().unwrap(); + + // Find callbacks for this intent + if let Some(callbacks) = node.callbacks.get(&message.intent) { + for callback in callbacks { + callback(&message); + } + } + + // Update stats + let mut stats = self.stats.lock().unwrap(); + stats.messages_routed += 1; + + Ok(()) + } else { + let mut stats = self.stats.lock().unwrap(); + stats.routing_errors += 1; + Err(format!("Node '{}' not found for routing", message.to)) + } + } + + /// Register callback for specific intent on a node + pub fn on(&self, node_id: &str, intent: &str, callback: Box) -> Result<(), String> { + let nodes = self.nodes.read().unwrap(); + + if let Some(node) = nodes.get(node_id) { + let mut node = node.lock().unwrap(); + node.callbacks.entry(intent.to_string()).or_insert_with(Vec::new).push(callback); + + // Update stats + let mut stats = self.stats.lock().unwrap(); + stats.total_callbacks += 1; + + Ok(()) + } else { + Err(format!("Node '{}' not found for callback registration", node_id)) + } + } + + /// Get list of registered nodes + pub fn get_registered_nodes(&self) -> Vec { + let nodes = self.nodes.read().unwrap(); + nodes.keys().cloned().collect() + } + + /// Get bus statistics + pub fn get_stats(&self) -> BusStats { + let stats = self.stats.lock().unwrap(); + stats.clone() + } +} + +use std::sync::OnceLock; + +/// Global MessageBus singleton +static GLOBAL_MESSAGE_BUS: OnceLock> = OnceLock::new(); + +/// Get global message bus instance +pub fn get_global_message_bus() -> Arc { + GLOBAL_MESSAGE_BUS.get_or_init(|| Arc::new(MessageBus::new())).clone() +} \ No newline at end of file diff --git a/src/transport_trait.rs b/src/transport_trait.rs new file mode 100644 index 00000000..a19e1778 --- /dev/null +++ b/src/transport_trait.rs @@ -0,0 +1,80 @@ +/** + * Transport trait abstraction - NyaMesh style implementation + * + * Design principles from NyaMesh: + * - Transport = NIC (Network Interface Card) - handles communication method only + * - Bus = Local OS - handles routing, subscription, monitoring + * - Clean separation between transport mechanism and message routing + * + * Based on ChatGPT discussion P2PBox architecture: + * - P2PBox always has MessageBus (even for network transport) + * - Transport abstraction allows switching InProcess/WebSocket/WebRTC + * - Synchronous-first implementation strategy + */ + +use crate::NyashBox; +use crate::transports::InProcessTransport; + +/// Transport trait - represents different communication mechanisms +/// Like NyaMesh's TransportInterface, this abstracts the "how to send" part +pub trait Transport: Send + Sync { + /// Initialize the transport (async-compatible but synchronous first) + fn initialize(&mut self) -> Result<(), String>; + + /// Send message through this transport mechanism + /// to: target node ID + /// intent: message intent type + /// data: message payload + fn send(&self, to: &str, intent: &str, data: Box) -> Result<(), String>; + + /// Get transport type identifier (e.g., "inprocess", "websocket", "webrtc") + fn transport_type(&self) -> &'static str; + + /// Check if transport is ready + fn is_ready(&self) -> bool; + + /// Shutdown transport cleanly + fn shutdown(&mut self) -> Result<(), String>; + + /// Get transport statistics + fn get_stats(&self) -> TransportStats; +} + +/// Transport statistics - standardized across all transport types +#[derive(Debug, Clone)] +pub struct TransportStats { + pub transport_type: String, + pub messages_sent: u64, + pub messages_received: u64, + pub errors: u64, + pub is_ready: bool, +} + +impl TransportStats { + pub fn new(transport_type: &str) -> Self { + Self { + transport_type: transport_type.to_string(), + messages_sent: 0, + messages_received: 0, + errors: 0, + is_ready: false, + } + } +} + +/// TransportKind - 通信方式の選択(Nyash同期・シンプル版) +#[derive(Debug, Clone)] +pub enum TransportKind { + InProcess, // プロセス内通信(最初に実装) + WebSocket, // WebSocket通信(将来実装) + WebRTC, // P2P直接通信(将来実装) +} + +/// シンプルファクトリ関数 +pub fn create_transport(kind: TransportKind, node_id: &str) -> Box { + match kind { + TransportKind::InProcess => Box::new(InProcessTransport::new(node_id.to_string())), + TransportKind::WebSocket => todo!("WebSocket transport - 将来実装"), + TransportKind::WebRTC => todo!("WebRTC transport - 将来実装"), + } +} \ No newline at end of file diff --git a/src/transports/in_process_transport.rs b/src/transports/in_process_transport.rs new file mode 100644 index 00000000..19ed6419 --- /dev/null +++ b/src/transports/in_process_transport.rs @@ -0,0 +1,134 @@ +/** + * InProcessTransport - Local process communication transport + * + * Based on NyaMesh InProcessTransport design: + * - Synchronous-first implementation (parallelSafe flag support) + * - Direct function pointer callbacks (no async complexity) + * - Simple message routing through global MessageBus + * + * Key features from NyaMesh: + * - parallelSafe = false by default (GUI thread safe) + * - Direct callback execution + * - Statistics tracking + */ + +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use crate::transport_trait::{Transport, TransportStats}; +use crate::message_bus::{get_global_message_bus, BusMessage}; +use crate::NyashBox; + +/// InProcessTransport - for local communication within same process +pub struct InProcessTransport { + /// Node ID for this transport + node_id: String, + + /// Whether transport is initialized + initialized: AtomicBool, + + /// Statistics + messages_sent: AtomicU64, + messages_received: AtomicU64, + errors: AtomicU64, +} + +impl InProcessTransport { + /// Create new InProcessTransport with given node ID + pub fn new(node_id: String) -> Self { + Self { + node_id, + initialized: AtomicBool::new(false), + messages_sent: AtomicU64::new(0), + messages_received: AtomicU64::new(0), + errors: AtomicU64::new(0), + } + } + + /// Get node ID + pub fn node_id(&self) -> &str { + &self.node_id + } +} + +impl Transport for InProcessTransport { + fn initialize(&mut self) -> Result<(), String> { + if self.initialized.load(Ordering::Relaxed) { + return Ok(()); + } + + // Register with global message bus + let bus = get_global_message_bus(); + bus.register_node(&self.node_id)?; + + self.initialized.store(true, Ordering::Relaxed); + Ok(()) + } + + fn send(&self, to: &str, intent: &str, data: Box) -> Result<(), String> { + if !self.initialized.load(Ordering::Relaxed) { + self.errors.fetch_add(1, Ordering::Relaxed); + return Err("Transport not initialized".to_string()); + } + + // Create bus message + let message = BusMessage { + from: self.node_id.clone(), + to: to.to_string(), + intent: intent.to_string(), + data, + timestamp: std::time::SystemTime::now(), + }; + + // Route through global message bus + let bus = get_global_message_bus(); + + // Check if target is local + if bus.has_node(to) { + // Local routing - direct through bus + match bus.route(message) { + Ok(_) => { + self.messages_sent.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + Err(e) => { + self.errors.fetch_add(1, Ordering::Relaxed); + Err(e) + } + } + } else { + // Target not found locally + self.errors.fetch_add(1, Ordering::Relaxed); + Err(format!("Target node '{}' not found in process", to)) + } + } + + fn transport_type(&self) -> &'static str { + "inprocess" + } + + fn is_ready(&self) -> bool { + self.initialized.load(Ordering::Relaxed) + } + + fn shutdown(&mut self) -> Result<(), String> { + if !self.initialized.load(Ordering::Relaxed) { + return Ok(()); + } + + // Unregister from global message bus + let bus = get_global_message_bus(); + bus.unregister_node(&self.node_id); + + self.initialized.store(false, Ordering::Relaxed); + Ok(()) + } + + fn get_stats(&self) -> TransportStats { + TransportStats { + transport_type: self.transport_type().to_string(), + messages_sent: self.messages_sent.load(Ordering::Relaxed), + messages_received: self.messages_received.load(Ordering::Relaxed), + errors: self.errors.load(Ordering::Relaxed), + is_ready: self.is_ready(), + } + } +} \ No newline at end of file diff --git a/src/transports/mod.rs b/src/transports/mod.rs new file mode 100644 index 00000000..c3796659 --- /dev/null +++ b/src/transports/mod.rs @@ -0,0 +1,11 @@ +/** + * Transport implementations module + * + * Contains various transport implementations: + * - InProcessTransport: For local communication within same process + * - Future: WebSocketTransport, WebRTCTransport, etc. + */ + +pub mod in_process_transport; + +pub use in_process_transport::InProcessTransport; \ No newline at end of file