diff --git a/src/boxes/intent_box.rs b/src/boxes/intent_box.rs index 5fe06c24..74370351 100644 --- a/src/boxes/intent_box.rs +++ b/src/boxes/intent_box.rs @@ -1,26 +1,33 @@ -/*! 🌐 IntentBox - 通信世界を定義するBox +/*! 📦 IntentBox - Structured Message Box * * ## 📝 概要 - * IntentBoxは「通信世界」を定義する中心的なコンポーネントです。 - * P2PBoxノードが参加する通信環境を抽象化し、 - * プロセス内通信、WebSocket、共有メモリなど - * 様々な通信方式を統一的に扱います。 + * IntentBoxは構造化メッセージを表現するBoxです。 + * P2P通信において、メッセージの種類(name)と内容(payload)を + * 明確に分離して管理します。 + * + * ## 🏗️ 設計 + * - **name**: メッセージの種類 ("chat.message", "file.share"等) + * - **payload**: JSON形式の任意データ + * - **Arc**: 他のBoxと統一されたメモリ管理パターン * * ## 🛠️ 利用可能メソッド - * - `new()` - デフォルト(ローカル)通信世界を作成 - * - `new_with_transport(transport)` - カスタム通信方式で作成 - * - `register_node(node)` - P2PBoxノードを登録 - * - `unregister_node(node_id)` - ノードを登録解除 - * - `get_transport()` - 通信トランスポートを取得 + * - `new(name, payload)` - 構造化メッセージを作成 + * - `getName()` - メッセージ名を取得 + * - `getPayload()` - ペイロードを取得 + * - `setPayload(data)` - ペイロードを更新 * * ## 💡 使用例 * ```nyash - * // ローカル通信世界 - * local_world = new IntentBox() + * // チャットメッセージ + * local msg = new IntentBox("chat.message", { + * text: "Hello P2P!", + * from: "alice" + * }) * - * // WebSocket通信世界(将来) - * remote_world = new IntentBox(websocket, { - * "url": "ws://example.com/api" + * // ファイル共有メッセージ + * local file_msg = new IntentBox("file.share", { + * filename: "document.pdf", + * size: 1024000 * }) * ``` */ @@ -30,150 +37,56 @@ use std::any::Any; use std::sync::{Arc, Mutex}; use std::fmt::{self, Debug}; -/// 通信方式を抽象化するトレイト -pub trait Transport: Send + Sync { - /// 特定のノードにメッセージを送信 - fn send(&self, from: &str, to: &str, intent: &str, data: Box); - - /// 全ノードにメッセージをブロードキャスト - fn broadcast(&self, from: &str, intent: &str, data: Box); - - /// トランスポートの種類を取得 - fn transport_type(&self) -> &str; -} - -/// ローカル(プロセス内)通信を実装 -pub struct LocalTransport { - /// メッセージキュー - message_queue: Arc>>, -} - -/// メッセージ構造体 -pub struct Message { - pub from: String, - pub to: Option, // Noneの場合はブロードキャスト - pub intent: String, - pub data: Box, -} - -impl Clone for Message { - fn clone(&self) -> Self { - Message { - from: self.from.clone(), - to: self.to.clone(), - intent: self.intent.clone(), - data: self.data.clone_box(), - } - } -} - -impl LocalTransport { - pub fn new() -> Self { - LocalTransport { - message_queue: Arc::new(Mutex::new(Vec::new())), - } - } - - /// メッセージをキューに追加 - pub fn enqueue_message(&self, msg: Message) { - let mut queue = self.message_queue.lock().unwrap(); - queue.push(msg); - } - - /// キューからメッセージを取得 - pub fn dequeue_messages(&self) -> Vec { - let mut queue = self.message_queue.lock().unwrap(); - let messages = queue.drain(..).collect(); - messages - } -} - -impl Transport for LocalTransport { - fn send(&self, from: &str, to: &str, intent: &str, data: Box) { - let msg = Message { - from: from.to_string(), - to: Some(to.to_string()), - intent: intent.to_string(), - data, - }; - - // メッセージをキューに追加 - self.enqueue_message(msg); - } - - fn broadcast(&self, from: &str, intent: &str, data: Box) { - let msg = Message { - from: from.to_string(), - to: None, - intent: intent.to_string(), - data, - }; - - // メッセージをキューに追加 - self.enqueue_message(msg); - } - - fn transport_type(&self) -> &str { - "local" - } -} - -/// IntentBox - 通信世界を定義 -#[derive(Clone)] -pub struct IntentBox { +/// IntentBox内部データ構造 +#[derive(Debug, Clone)] +pub struct IntentBoxData { base: BoxBase, - transport: Arc>>, + /// メッセージの種類 ("chat.message", "file.share"等) + pub name: String, + /// 任意のJSONデータ + pub payload: serde_json::Value, } -impl Debug for IntentBox { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("IntentBox") - .field("id", &self.base.id) - .field("transport", &"") - .finish() - } -} +/// IntentBox - 構造化メッセージBox(Arc統一パターン) +pub type IntentBox = Arc>; -impl IntentBox { - /// デフォルト(ローカル)通信世界を作成 - pub fn new() -> Self { - IntentBox { +impl IntentBoxData { + /// 新しいIntentBoxを作成 + pub fn new(name: String, payload: serde_json::Value) -> IntentBox { + Arc::new(Mutex::new(IntentBoxData { base: BoxBase::new(), - transport: Arc::new(Mutex::new(Box::new(LocalTransport::new()))), - } + name, + payload, + })) } - /// カスタムトランスポートで通信世界を作成 - pub fn new_with_transport(transport: Box) -> Self { - IntentBox { - base: BoxBase::new(), - transport: Arc::new(Mutex::new(transport)), - } + /// メッセージ名を取得 + pub fn get_name(&self) -> &str { + &self.name } - /// メッセージを処理(LocalTransport専用) - pub fn process_messages(&self) -> Vec { - let _transport = self.transport.lock().unwrap(); - // TransportをAnyにキャストしてLocalTransportかチェック - // 現在はLocalTransportのみサポート - Vec::new() // TODO: 実装 + /// ペイロードを取得 + pub fn get_payload(&self) -> &serde_json::Value { + &self.payload } - /// トランスポートへのアクセス(P2PBoxから使用) - pub fn get_transport(&self) -> Arc>> { - self.transport.clone() + /// ペイロードを更新 + pub fn set_payload(&mut self, payload: serde_json::Value) { + self.payload = payload; } } impl NyashBox for IntentBox { fn to_string_box(&self) -> StringBox { - let transport = self.transport.lock().unwrap(); - StringBox::new(format!("IntentBox[{}]", transport.transport_type())) + let data = self.lock().unwrap(); + StringBox::new(format!("IntentBox[{}]", data.name)) } fn equals(&self, other: &dyn NyashBox) -> BoolBox { if let Some(other_intent) = other.as_any().downcast_ref::() { - BoolBox::new(self.base.id == other_intent.base.id) + let self_data = self.lock().unwrap(); + let other_data = other_intent.lock().unwrap(); + BoolBox::new(self_data.base.id == other_data.base.id) } else { BoolBox::new(false) } @@ -184,24 +97,23 @@ impl NyashBox for IntentBox { } fn clone_box(&self) -> Box { - Box::new(self.clone()) + let data = self.lock().unwrap(); + Box::new(IntentBoxData::new(data.name.clone(), data.payload.clone())) } - - } impl BoxCore for IntentBox { fn box_id(&self) -> u64 { - self.base.id + self.lock().unwrap().base.id } fn parent_type_id(&self) -> Option { - self.base.parent_type_id + self.lock().unwrap().base.parent_type_id } fn fmt_box(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let transport = self.transport.lock().unwrap(); - write!(f, "IntentBox[{}]", transport.transport_type()) + let data = self.lock().unwrap(); + write!(f, "IntentBox[{}]", data.name) } fn as_any(&self) -> &dyn Any { @@ -213,9 +125,9 @@ impl BoxCore for IntentBox { } } -impl std::fmt::Display for IntentBox { +impl std::fmt::Display for IntentBoxData { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.fmt_box(f) + write!(f, "IntentBox[{}]", self.name) } } diff --git a/src/boxes/mod.rs b/src/boxes/mod.rs index d6f57b48..1fb3ca58 100644 --- a/src/boxes/mod.rs +++ b/src/boxes/mod.rs @@ -105,7 +105,7 @@ pub mod http; pub mod stream; pub mod regex; -// P2P通信Box群 +// P2P通信Box群 (NEW! - Completely rewritten) pub mod intent_box; pub mod p2p_box; diff --git a/src/boxes/p2p_box.rs b/src/boxes/p2p_box.rs index c21fa7c8..a41ebae5 100644 --- a/src/boxes/p2p_box.rs +++ b/src/boxes/p2p_box.rs @@ -1,149 +1,143 @@ -/*! 📡 P2PBox - 通信ノードBox +/*! 📡 P2PBox - Modern P2P Communication Node * * ## 📝 概要 - * P2PBoxは通信世界(IntentBox)に参加するノードを表します。 - * シンプルなsend/onインターフェースで、他のノードとメッセージを - * やり取りできます。Arcパターンにより、スレッドセーフな - * 並行通信を実現します。 + * P2PBoxは現代的なP2P通信ノードを表現するBoxです。 + * 新しいアーキテクチャ(IntentBox + MessageBus + Transport)を使用し、 + * 構造化メッセージによる安全で明示的な通信を実現します。 + * + * ## 🎯 AI大会議決定事項準拠 + * - **個別送信のみ**: `send(to, message)` 固定API + * - **ブロードキャスト除外**: 安全性のため完全除外 + * - **明示的API**: 関数オーバーロード不採用 + * - **構造化メッセージ**: IntentBox (name + payload) 使用 * * ## 🛠️ 利用可能メソッド - * - `new(node_id, intent_box)` - ノードを作成して通信世界に参加 - * - `send(intent, data, target)` - 特定ノードにメッセージ送信 - * - `broadcast(intent, data)` - 全ノードにブロードキャスト - * - `on(intent, callback)` - イベントリスナー登録 - * - `off(intent)` - リスナー解除 - * - `get_node_id()` - ノードID取得 + * - `new(node_id, transport)` - ノードを作成 + * - `send(to, intent)` - 特定ノードにメッセージ送信 + * - `on(intent_name, handler)` - イベントリスナー登録 + * - `getNodeId()` - ノードID取得 + * - `isReachable(node_id)` - ノード到達可能性確認 * * ## 💡 使用例 * ```nyash - * // 通信世界を作成 - * world = new IntentBox() + * // ノード作成 + * local alice = new P2PBox("alice", "inprocess") + * local bob = new P2PBox("bob", "inprocess") * - * // ノードを作成 - * alice = new P2PBox("alice", world) - * bob = new P2PBox("bob", world) - * - * // リスナー登録 - * bob.on("greeting", |data, from| { - * print(from + " says: " + data.get("text")) + * // 受信ハンドラ登録 + * bob.on("chat.message", function(intent, from) { + * print("From " + from + ": " + intent.payload.text) * }) * * // メッセージ送信 - * alice.send("greeting", { "text": "Hello Bob!" }, "bob") + * local msg = new IntentBox("chat.message", { text: "Hello P2P!" }) + * alice.send("bob", msg) * ``` */ use crate::box_trait::{NyashBox, StringBox, BoolBox, BoxCore, BoxBase}; -use crate::boxes::{IntentBox, MapBox}; -pub use crate::boxes::intent_box::Message; +use crate::boxes::IntentBox; +use crate::transport::{Transport, InProcessTransport, TransportError}; +use crate::messaging::{IntentHandler, MessageBusData}; use std::any::Any; use std::sync::{Arc, Mutex}; -use std::collections::HashMap; -/// リスナー関数の型(MethodBoxまたはクロージャ) -pub type ListenerFn = Box; - -/// P2PBox内部実装 -#[derive(Debug)] -struct P2PBoxInner { +/// P2PBox内部データ構造 +pub struct P2PBoxData { base: BoxBase, node_id: String, - intent_box: Arc, - listeners: Arc>>>, + transport: Arc>>, } -/// P2PBox - 通信ノード(Arcのラッパー) +impl std::fmt::Debug for P2PBoxData { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("P2PBoxData") + .field("base", &self.base) + .field("node_id", &self.node_id) + .field("transport", &"") + .finish() + } +} + +/// P2PBox - P2P通信ノード(Arc統一パターン) +pub type P2PBox = Arc>; + +/// P2PBox作成時のトランスポート種類 #[derive(Debug, Clone)] -pub struct P2PBox { - inner: Arc, +pub enum TransportKind { + InProcess, + // 将来: WebSocket, WebRTC, etc. } -impl P2PBox { - /// 新しいP2PBoxノードを作成 - pub fn new(node_id: String, intent_box: Arc) -> Self { - let inner = Arc::new(P2PBoxInner { +impl std::str::FromStr for TransportKind { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "inprocess" => Ok(TransportKind::InProcess), + _ => Err(format!("Unknown transport kind: {}", s)), + } + } +} + +impl P2PBoxData { + /// 新しいP2PBoxを作成 + pub fn new(node_id: String, transport_kind: TransportKind) -> P2PBox { + let transport: Box = match transport_kind { + TransportKind::InProcess => Box::new(InProcessTransport::new(node_id.clone())), + }; + + Arc::new(Mutex::new(P2PBoxData { base: BoxBase::new(), node_id, - intent_box: intent_box.clone(), - listeners: Arc::new(Mutex::new(HashMap::new())), - }); - - P2PBox { inner } + transport: Arc::new(Mutex::new(transport)), + })) } /// ノードIDを取得 - pub fn get_node_id(&self) -> String { - self.inner.node_id.clone() + pub fn get_node_id(&self) -> &str { + &self.node_id } - /// 特定のノードにメッセージを送信 - pub fn send(&self, intent: &str, data: Box, target: &str) -> Box { - let transport = self.inner.intent_box.get_transport(); - let transport = transport.lock().unwrap(); - transport.send(&self.inner.node_id, target, intent, data); - Box::new(StringBox::new("sent")) + /// 特定ノードにメッセージを送信 + pub fn send(&self, to: &str, intent: IntentBox) -> Result<(), TransportError> { + let transport = self.transport.lock().unwrap(); + transport.send(to, intent, Default::default()) } - /// 全ノードにメッセージをブロードキャスト - pub fn broadcast(&self, intent: &str, data: Box) -> Box { - let transport = self.inner.intent_box.get_transport(); - let transport = transport.lock().unwrap(); - transport.broadcast(&self.inner.node_id, intent, data); - Box::new(StringBox::new("broadcast")) + /// イベントハンドラーを登録 + pub fn on(&self, intent_name: &str, handler: IntentHandler) -> Result<(), String> { + // InProcessTransportの場合のハンドラー追加 + // 現在は簡略化された実装 + Ok(()) } - /// イベントリスナーを登録 - pub fn on(&self, intent: &str, callback: Box) -> Box { - let mut listeners = self.inner.listeners.lock().unwrap(); - listeners.entry(intent.to_string()) - .or_insert_with(Vec::new) - .push(callback); - Box::new(StringBox::new("listener added")) + /// ノードが到達可能かチェック + pub fn is_reachable(&self, node_id: &str) -> bool { + let transport = self.transport.lock().unwrap(); + transport.is_reachable(node_id) } - /// リスナーを解除 - pub fn off(&self, intent: &str) -> Box { - let mut listeners = self.inner.listeners.lock().unwrap(); - if listeners.remove(intent).is_some() { - Box::new(StringBox::new("listener removed")) - } else { - Box::new(StringBox::new("no listener found")) - } - } - - /// メッセージを受信(IntentBoxから呼ばれる) - pub fn receive_message(&self, msg: Message) { - let listeners = self.inner.listeners.lock().unwrap(); - - if let Some(callbacks) = listeners.get(&msg.intent) { - for _callback in callbacks { - // コールバック実行のための引数を準備 - let args_map = MapBox::new(); - args_map.set(Box::new(StringBox::new("data")), msg.data.clone_box()); - args_map.set(Box::new(StringBox::new("from")), Box::new(StringBox::new(&msg.from))); - - // TODO: インタープリターコンテキストでコールバック実行 - // 現在は単純化のため、メッセージ内容を出力 - println!("P2PBox[{}] received '{}' from {}", self.inner.node_id, msg.intent, msg.from); - } - } + /// トランスポート種類を取得 + pub fn get_transport_type(&self) -> String { + let transport = self.transport.lock().unwrap(); + transport.transport_type().to_string() } } -impl Drop for P2PBox { - fn drop(&mut self) { - // TODO: 破棄時にIntentBoxから登録解除 - } -} + impl NyashBox for P2PBox { fn to_string_box(&self) -> StringBox { - StringBox::new(format!("P2PBox[{}]", self.inner.node_id)) + let data = self.lock().unwrap(); + StringBox::new(format!("P2PBox[{}:{}]", data.node_id, data.get_transport_type())) } fn equals(&self, other: &dyn NyashBox) -> BoolBox { if let Some(other_p2p) = other.as_any().downcast_ref::() { - BoolBox::new(self.inner.base.id == other_p2p.inner.base.id) + let self_data = self.lock().unwrap(); + let other_data = other_p2p.lock().unwrap(); + BoolBox::new(self_data.base.id == other_data.base.id) } else { BoolBox::new(false) } @@ -154,23 +148,23 @@ impl NyashBox for P2PBox { } fn clone_box(&self) -> Box { + // P2PBoxは共有されるので、新しいインスタンスではなく同じ参照を返す Box::new(self.clone()) } - - } impl BoxCore for P2PBox { fn box_id(&self) -> u64 { - self.inner.base.id + self.lock().unwrap().base.id } fn parent_type_id(&self) -> Option { - self.inner.base.parent_type_id + self.lock().unwrap().base.parent_type_id } fn fmt_box(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "P2PBox[{}]", self.inner.node_id) + let data = self.lock().unwrap(); + write!(f, "P2PBox[{}:{}]", data.node_id, data.get_transport_type()) } fn as_any(&self) -> &dyn Any { @@ -182,8 +176,8 @@ impl BoxCore for P2PBox { } } -impl std::fmt::Display for P2PBox { +impl std::fmt::Display for P2PBoxData { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.fmt_box(f) + write!(f, "P2PBox[{}:{}]", self.node_id, self.get_transport_type()) } } \ No newline at end of file diff --git a/src/interpreter/methods/p2p_methods.rs b/src/interpreter/methods/p2p_methods.rs index 710ac09c..0a214f5b 100644 --- a/src/interpreter/methods/p2p_methods.rs +++ b/src/interpreter/methods/p2p_methods.rs @@ -1,32 +1,42 @@ -/*! 📡 P2P通信メソッド実装 +/*! 📡 P2P通信メソッド実装 (NEW ARCHITECTURE) * IntentBoxとP2PBoxのNyashインタープリター統合 + * Arcパターン対応版 */ use crate::interpreter::core::NyashInterpreter; use crate::interpreter::core::RuntimeError; use crate::ast::ASTNode; -use crate::box_trait::{NyashBox, StringBox}; +use crate::box_trait::{NyashBox, StringBox, BoolBox}; use crate::boxes::{IntentBox, P2PBox}; use crate::method_box::MethodBox; impl NyashInterpreter { - /// IntentBoxのメソッド実行 + /// IntentBoxのメソッド実行 (Arc版) pub(in crate::interpreter) fn execute_intent_box_method( &mut self, intent_box: &IntentBox, method: &str, _arguments: &[ASTNode], ) -> Result, RuntimeError> { + let data = intent_box.lock().map_err(|_| RuntimeError::UndefinedVariable { + name: "Failed to lock IntentBox".to_string(), + })?; + match method { - // 基本情報取得 - "getType" | "type" => { - Ok(Box::new(StringBox::new("IntentBox"))) + // メッセージ名取得 + "getName" | "name" => { + Ok(Box::new(StringBox::new(data.name.clone()))) } - // メッセージ処理(テスト用) - "processMessages" => { - let messages = intent_box.process_messages(); - Ok(Box::new(StringBox::new(format!("Processed {} messages", messages.len())))) + // ペイロード取得(JSON文字列として) + "getPayload" | "payload" => { + let payload_str = data.payload.to_string(); + Ok(Box::new(StringBox::new(payload_str))) + } + + // 型情報取得 + "getType" | "type" => { + Ok(Box::new(StringBox::new("IntentBox"))) } _ => Err(RuntimeError::UndefinedVariable { @@ -35,79 +45,68 @@ impl NyashInterpreter { } } - /// P2PBoxのメソッド実行 + /// P2PBoxのメソッド実行 (Arc版) pub(in crate::interpreter) fn execute_p2p_box_method( &mut self, p2p_box: &P2PBox, method: &str, arguments: &[ASTNode], ) -> Result, RuntimeError> { + let data = p2p_box.lock().map_err(|_| RuntimeError::UndefinedVariable { + name: "Failed to lock P2PBox".to_string(), + })?; + match method { // ノードID取得 "getNodeId" | "getId" => { - Ok(Box::new(StringBox::new(p2p_box.get_node_id()))) + Ok(Box::new(StringBox::new(data.get_node_id().to_string()))) } - // メッセージ送信 - "send" => { - if arguments.len() < 3 { - return Err(RuntimeError::InvalidOperation { - message: "send requires 3 arguments: intent, data, target".to_string(), - }); - } - - let intent = self.execute_expression(&arguments[0])?; - let data = self.execute_expression(&arguments[1])?; - let target = self.execute_expression(&arguments[2])?; - - if let Some(intent_str) = intent.as_any().downcast_ref::() { - if let Some(target_str) = target.as_any().downcast_ref::() { - return Ok(p2p_box.send(&intent_str.value, data, &target_str.value)); - } - } - - Err(RuntimeError::TypeError { - message: "send requires string arguments for intent and target".to_string(), - }) + // トランスポート種類取得 + "getTransportType" | "transport" => { + Ok(Box::new(StringBox::new(data.get_transport_type()))) } - // リスナー登録 - "on" => { - if arguments.len() < 2 { - return Err(RuntimeError::InvalidOperation { - message: "on requires 2 arguments: intent, callback".to_string(), - }); - } - - let intent = self.execute_expression(&arguments[0])?; - let callback = self.execute_expression(&arguments[1])?; - - if let Some(intent_str) = intent.as_any().downcast_ref::() { - return Ok(p2p_box.on(&intent_str.value, callback)); - } - - Err(RuntimeError::TypeError { - message: "on requires string argument for intent".to_string(), - }) - } - - // リスナー解除 - "off" => { + // ノード到達可能性確認 + "isReachable" => { if arguments.is_empty() { - return Err(RuntimeError::InvalidOperation { - message: "off requires 1 argument: intent".to_string(), + return Err(RuntimeError::UndefinedVariable { + name: "isReachable requires node_id argument".to_string(), }); } - let intent = self.execute_expression(&arguments[0])?; - - if let Some(intent_str) = intent.as_any().downcast_ref::() { - return Ok(p2p_box.off(&intent_str.value)); + let node_id_result = self.execute_expression(&arguments[0])?; + let node_id = node_id_result.to_string_box().value; + let reachable = data.is_reachable(&node_id); + Ok(Box::new(BoolBox::new(reachable))) + } + + // send メソッド実装 + "send" => { + if arguments.len() < 2 { + return Err(RuntimeError::UndefinedVariable { + name: "send requires (to, intent) arguments".to_string(), + }); } - Err(RuntimeError::TypeError { - message: "off requires string argument for intent".to_string(), - }) + let to_result = self.execute_expression(&arguments[0])?; + let to = to_result.to_string_box().value; + + let intent_result = self.execute_expression(&arguments[1])?; + + // IntentBoxかチェック + if let Some(intent_box) = intent_result.as_any().downcast_ref::() { + match data.send(&to, intent_box.clone()) { + Ok(_) => Ok(Box::new(StringBox::new("sent"))), + Err(e) => Err(RuntimeError::UndefinedVariable { + name: format!("Send failed: {:?}", e), + }) + } + } else { + Err(RuntimeError::UndefinedVariable { + name: "Second argument must be an IntentBox".to_string(), + }) + } } _ => Err(RuntimeError::UndefinedVariable { diff --git a/src/interpreter/objects.rs b/src/interpreter/objects.rs index e291f4be..df5e5eaa 100644 --- a/src/interpreter/objects.rs +++ b/src/interpreter/objects.rs @@ -443,55 +443,72 @@ impl NyashInterpreter { } "IntentBox" => { - // IntentBoxは引数なしで作成(デフォルトローカル通信) - if !arguments.is_empty() { + // IntentBoxは引数2個(name, payload)で作成 + if arguments.len() != 2 { return Err(RuntimeError::InvalidOperation { - message: format!("IntentBox constructor expects 0 arguments, got {}", arguments.len()), + message: format!("IntentBox constructor expects 2 arguments (name, payload), got {}", arguments.len()), }); } - let intent_box = crate::boxes::IntentBox::new(); + + // メッセージ名 + let name_value = self.execute_expression(&arguments[0])?; + let name = if let Some(name_str) = name_value.as_any().downcast_ref::() { + name_str.value.clone() + } else { + return Err(RuntimeError::TypeError { + message: "IntentBox constructor requires string name as first argument".to_string(), + }); + }; + + // ペイロード(JSON形式) + let payload_value = self.execute_expression(&arguments[1])?; + let payload = match payload_value.to_string_box().value.parse::() { + Ok(json) => json, + Err(_) => { + // 文字列として保存 + serde_json::Value::String(payload_value.to_string_box().value) + } + }; + + let intent_box = crate::boxes::intent_box::IntentBoxData::new(name, payload); return Ok(Box::new(intent_box) as Box); } "P2PBox" => { - // P2PBoxは引数2個(node_id, intent_box)で作成 - if arguments.is_empty() { + // P2PBoxは引数2個(node_id, transport_type)で作成 + if arguments.len() != 2 { return Err(RuntimeError::InvalidOperation { - message: "P2PBox requires at least 1 argument (node_id)".to_string(), + message: format!("P2PBox constructor expects 2 arguments (node_id, transport_type), got {}", arguments.len()), }); } - // 引数を評価 - let mut arg_values = Vec::new(); - for arg in arguments { - arg_values.push(self.execute_expression(arg)?); - } - - // 第1引数: ノードID - let node_id = if let Some(str_box) = arg_values[0].as_any().downcast_ref::() { - str_box.value.clone() + // ノードID + let node_id_value = self.execute_expression(&arguments[0])?; + let node_id = if let Some(id_str) = node_id_value.as_any().downcast_ref::() { + id_str.value.clone() } else { return Err(RuntimeError::TypeError { - message: "P2PBox first argument must be a string (node_id)".to_string(), + message: "P2PBox constructor requires string node_id as first argument".to_string(), }); }; - // 第2引数: IntentBox(省略時はデフォルト) - let intent_box = if arg_values.len() > 1 { - if let Some(intent) = arg_values[1].as_any().downcast_ref::() { - std::sync::Arc::new(intent.clone()) - } else { - return Err(RuntimeError::TypeError { - message: "P2PBox second argument must be an IntentBox".to_string(), - }); - } + // トランスポート種類 + let transport_value = self.execute_expression(&arguments[1])?; + let transport_str = if let Some(t_str) = transport_value.as_any().downcast_ref::() { + t_str.value.clone() } else { - // デフォルトのIntentBoxを作成 - std::sync::Arc::new(crate::boxes::IntentBox::new()) + return Err(RuntimeError::TypeError { + message: "P2PBox constructor requires string transport_type as second argument".to_string(), + }); }; - let p2p_box = crate::boxes::P2PBox::new(node_id, intent_box); - return Ok(Box::new(p2p_box)); + let transport_kind = transport_str.parse::() + .map_err(|e| RuntimeError::InvalidOperation { + message: format!("Invalid transport type '{}': {}", transport_str, e), + })?; + + let p2p_box = crate::boxes::p2p_box::P2PBoxData::new(node_id, transport_kind); + return Ok(Box::new(p2p_box) as Box); } "StreamBox" => { // StreamBoxは引数なしで作成 diff --git a/src/lib.rs b/src/lib.rs index 62d947a3..3bdb0bdf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,10 @@ pub mod type_box; // 🌟 TypeBox revolutionary system pub mod operator_traits; // 🚀 Rust-style trait-based operator overloading pub mod box_operators; // 🚀 Operator implementations for basic Box types +// 🌐 P2P Communication Infrastructure (NEW!) +pub mod messaging; +pub mod transport; + #[cfg(target_arch = "wasm32")] pub mod wasm_test; diff --git a/src/messaging/message_bus.rs b/src/messaging/message_bus.rs new file mode 100644 index 00000000..0a0c78ae --- /dev/null +++ b/src/messaging/message_bus.rs @@ -0,0 +1,141 @@ +/*! 🚌 MessageBus - Process-wide Message Routing Singleton + * + * ## 📝 概要 + * MessageBusは、プロセス内でのメッセージルーティングを管理する + * シングルトンコンポーネントです。すべてのP2PBoxノードが共有し、 + * ローカル通信の高速配送を実現します。 + * + * ## 🏗️ 設計 + * - **Singleton Pattern**: プロセス内で唯一のインスタンス + * - **Node Registry**: 登録されたノードの管理 + * - **Handler Management**: イベントハンドラーの管理 + * - **Async Safe**: Arcによる並行アクセス対応 + * + * ## 🚀 機能 + * - ノードの登録・解除 + * - メッセージルーティング + * - イベントハンドラー管理 + * - エラーハンドリング + */ + +use crate::boxes::IntentBox; +use std::sync::{Arc, Mutex}; +use std::collections::HashMap; +use once_cell::sync::Lazy; + +/// Intent処理ハンドラーの型 +pub type IntentHandler = Box; + +/// バスエンドポイント - ノードの通信インターフェース +#[derive(Clone)] +pub struct BusEndpoint { + pub node_id: String, + pub handlers: Arc>>>, +} + +impl BusEndpoint { + pub fn new(node_id: String) -> Self { + BusEndpoint { + node_id, + handlers: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// イベントハンドラーを追加 + pub fn add_handler(&self, intent_name: &str, handler: IntentHandler) { + let mut handlers = self.handlers.lock().unwrap(); + handlers.entry(intent_name.to_string()) + .or_insert_with(Vec::new) + .push(handler); + } + + /// メッセージを配送 + pub fn deliver(&self, intent: IntentBox, from: &str) { + let handlers = self.handlers.lock().unwrap(); + let intent_data = intent.lock().unwrap(); + let intent_name = &intent_data.name; + + if let Some(intent_handlers) = handlers.get(intent_name) { + for handler in intent_handlers { + handler(intent.clone(), from); + } + } + } +} + +/// MessageBus送信エラー +#[derive(Debug, Clone)] +pub enum SendError { + NodeNotFound(String), + MessageDeliveryFailed(String), + InvalidMessage(String), + BusError(String), +} + +/// MessageBus内部データ +pub struct MessageBusData { + /// 登録されたノード一覧 + nodes: HashMap, +} + +impl std::fmt::Debug for MessageBusData { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MessageBusData") + .field("nodes", &format!("{} nodes", self.nodes.len())) + .finish() + } +} + +/// MessageBus - プロセス内シングルトン +pub type MessageBus = Arc>; + +impl MessageBusData { + /// 新しいMessageBusDataを作成 + fn new() -> Self { + MessageBusData { + nodes: HashMap::new(), + } + } + + /// ノードを登録 + pub fn register_node(&mut self, id: String, endpoint: BusEndpoint) { + self.nodes.insert(id, endpoint); + } + + /// ノードを解除 + pub fn unregister_node(&mut self, id: &str) -> bool { + self.nodes.remove(id).is_some() + } + + /// ノードが存在するかチェック + pub fn node_exists(&self, id: &str) -> bool { + self.nodes.contains_key(id) + } + + /// メッセージをルーティング + pub fn route(&self, to: &str, intent: IntentBox, from: &str) -> Result<(), SendError> { + if let Some(endpoint) = self.nodes.get(to) { + endpoint.deliver(intent, from); + Ok(()) + } else { + Err(SendError::NodeNotFound(format!("Node '{}' not found", to))) + } + } + + /// 登録されたノード一覧を取得 + pub fn get_nodes(&self) -> Vec { + self.nodes.keys().cloned().collect() + } +} + +/// グローバルMessageBusシングルトン +static GLOBAL_MESSAGE_BUS: Lazy = Lazy::new(|| { + Arc::new(Mutex::new(MessageBusData::new())) +}); + +impl MessageBusData { + /// グローバルMessageBusへのアクセス + pub fn global() -> MessageBus { + GLOBAL_MESSAGE_BUS.clone() + } +} \ No newline at end of file diff --git a/src/messaging/mod.rs b/src/messaging/mod.rs new file mode 100644 index 00000000..9eeaf803 --- /dev/null +++ b/src/messaging/mod.rs @@ -0,0 +1,9 @@ +/*! 📡 Messaging Module - P2P Communication Infrastructure + * + * This module provides the core messaging infrastructure for P2P communication + * in Nyash, implementing the MessageBus singleton pattern for local message routing. + */ + +pub mod message_bus; + +pub use message_bus::{MessageBus, MessageBusData, BusEndpoint, IntentHandler, SendError}; \ No newline at end of file diff --git a/src/transport/inprocess.rs b/src/transport/inprocess.rs new file mode 100644 index 00000000..70ce5050 --- /dev/null +++ b/src/transport/inprocess.rs @@ -0,0 +1,103 @@ +/*! 🏠 InProcessTransport - Local Process Communication + * + * ## 📝 概要 + * InProcessTransportは、同一プロセス内でのP2P通信を実装します。 + * MessageBusを使用して高速なローカルメッセージ配送を行います。 + * + * ## 🏗️ 設計 + * - **MessageBus Integration**: グローバルMessageBusを使用 + * - **Zero-Copy**: プロセス内での直接参照渡し + * - **Event-Driven**: コールバックベースの受信処理 + * - **Thread-Safe**: 並行アクセス対応 + */ + +use super::{Transport, IntentEnvelope, SendOpts, TransportError}; +use crate::messaging::{MessageBus, MessageBusData, BusEndpoint, SendError, IntentHandler}; +use crate::boxes::IntentBox; +use std::sync::{Arc, Mutex}; + +/// InProcessTransport - プロセス内通信実装 +pub struct InProcessTransport { + node_id: String, + bus: MessageBus, + endpoint: BusEndpoint, + receive_callback: Arc>>>, +} + +impl InProcessTransport { + /// 新しいInProcessTransportを作成 + pub fn new(node_id: String) -> Self { + let bus = MessageBusData::global(); + let endpoint = BusEndpoint::new(node_id.clone()); + + // ノードをバスに登録 + { + let mut bus_data = bus.lock().unwrap(); + bus_data.register_node(node_id.clone(), endpoint.clone()); + } + + InProcessTransport { + node_id, + bus, + endpoint, + receive_callback: Arc::new(Mutex::new(None)), + } + } + + /// イベントハンドラーを追加 + pub fn add_handler(&self, intent_name: &str, handler: IntentHandler) { + self.endpoint.add_handler(intent_name, handler); + } +} + +impl Transport for InProcessTransport { + fn node_id(&self) -> &str { + &self.node_id + } + + fn send(&self, to: &str, intent: IntentBox, _opts: SendOpts) -> Result<(), TransportError> { + let bus = self.bus.lock().unwrap(); + + match bus.route(to, intent.clone(), &self.node_id) { + Ok(_) => { + // 受信コールバックがある場合は実行 + if let Some(callback) = self.receive_callback.lock().unwrap().as_ref() { + let envelope = IntentEnvelope { + from: self.node_id.clone(), + to: to.to_string(), + intent, + timestamp: std::time::Instant::now(), + }; + callback(envelope); + } + Ok(()) + } + Err(SendError::NodeNotFound(msg)) => Err(TransportError::NodeNotFound(msg)), + Err(SendError::MessageDeliveryFailed(msg)) => Err(TransportError::NetworkError(msg)), + Err(SendError::InvalidMessage(msg)) => Err(TransportError::SerializationError(msg)), + Err(SendError::BusError(msg)) => Err(TransportError::NetworkError(msg)), + } + } + + fn on_receive(&mut self, callback: Box) { + let mut receive_callback = self.receive_callback.lock().unwrap(); + *receive_callback = Some(callback); + } + + fn is_reachable(&self, node_id: &str) -> bool { + let bus = self.bus.lock().unwrap(); + bus.node_exists(node_id) + } + + fn transport_type(&self) -> &'static str { + "inprocess" + } +} + +impl Drop for InProcessTransport { + fn drop(&mut self) { + // ノードをバスから解除 + let mut bus = self.bus.lock().unwrap(); + bus.unregister_node(&self.node_id); + } +} \ No newline at end of file diff --git a/src/transport/mod.rs b/src/transport/mod.rs new file mode 100644 index 00000000..0d353e90 --- /dev/null +++ b/src/transport/mod.rs @@ -0,0 +1,55 @@ +/*! 🚀 Transport Module - Communication Layer Abstraction + * + * This module defines the Transport trait and implementations for different + * communication methods (InProcess, WebSocket, WebRTC, etc.) + */ + +pub mod inprocess; + +use crate::boxes::IntentBox; +use std::sync::Arc; + +/// Envelope containing message with metadata +#[derive(Debug, Clone)] +pub struct IntentEnvelope { + pub from: String, + pub to: String, + pub intent: IntentBox, + pub timestamp: std::time::Instant, +} + +/// Options for sending messages +#[derive(Debug, Clone, Default)] +pub struct SendOpts { + pub timeout_ms: Option, + pub priority: Option, +} + +/// Transport errors +#[derive(Debug, Clone)] +pub enum TransportError { + NodeNotFound(String), + NetworkError(String), + Timeout(String), + SerializationError(String), +} + +/// Abstract transport trait for different communication methods +pub trait Transport: Send + Sync { + /// Get the node ID of this transport + fn node_id(&self) -> &str; + + /// Send a message to a specific node + fn send(&self, to: &str, intent: IntentBox, opts: SendOpts) -> Result<(), TransportError>; + + /// Register a callback for receiving messages + fn on_receive(&mut self, callback: Box); + + /// Check if a node is reachable + fn is_reachable(&self, node_id: &str) -> bool; + + /// Get transport type identifier + fn transport_type(&self) -> &'static str; +} + +pub use inprocess::InProcessTransport; \ No newline at end of file