F#でプロセス間通信
MailboxProcessorというクラスを使ってやると、スレッド間で簡単にメッセージのやり取りをすることができます。
でもこれがプロセス間となると一工夫が必要。F#側には用意されていないので、BCLの力を借りることになります。
しかし.NETに用意されたプロセス間通信のためのクラスは、F#のMailboxProcessorとはかなり毛色が異なりますし、使い方も少々面倒。というわけで、これらの仕組みをラップしてF#色に染め直してみました。
MailboxProcessorに比べると機能は少ないですが、ほとんど同じようにして利用することができます。
ちなみに、.NET(CLR)にはアプリケーションドメインという概念があるため、「プロセス間通信」というよりは、「アプリケーション間通信」と言ってやった方が適切かもしれません。
結構長いのでソースは一番最後に載せます。まずはこのクラスの利用方法から。
利用例その1
サーバアプリケーション:
open System open Imuziok.Actor let Sample1 () = let server = MailServer<string>.Start (fun box -> async { let! msg = box.Receive () Console.WriteLine msg box.Post "さようなら" Console.WriteLine "サーバおしまい" }, name = "ServerName", max = 1) Console.ReadLine () |> ignore // 待機 server.Close () [<STAThreadAttribute>] do Sample1 ()
クライアントアプリケーション:
open System open Imuziok.Actor let Sample1 () = let client = MailClient<string>.Start (fun box -> async { box.Post "おはようございます" let! msg = box.Receive () Console.WriteLine ("{0} \n クライアントおしまい", msg) }, name = "ServerName") Console.ReadLine () |> ignore client.Close () [<STAThreadAttribute>] do Sample1 ()
最も単純な例です。
ひとつのサーバに対してひとつのクライアントが接続しています。
サーバに名前をつけてやる必要があります。ここでは"ServerName"という命名をしてあります。ネーミングセンスが光っています。
Startメソッドを呼びだすと非同期に処理が走り始めます。
利用例その2
今度は、ひとつのサーバで複数のクライアントを処理するサンプルです。
サーバアプリケーション:
open System open Imuziok.Actor let Sample2 n = for i = 1 to n do MailServer<string>.Start (fun box -> async { let! msg = box.Receive () Console.WriteLine ("[{0}] : {1}", i, msg) }, name = "unko", max = n) |> ignore Console.WriteLine "サーバすたーと" Console.ReadLine () |> ignore [<STAThreadAttribute>] do Sample2 3
クライアントアプリケーション:
open System open Imuziok.Actor let Sample2 () = Console.Write ">" let msg = Console.ReadLine () use client = MailClient<string>.Start (fun box -> async { box.Post msg }, name = "unko") Console.ReadLine () |> ignore [<STAThreadAttribute>] do Sample2 ()
サーバ側のプログラムでは、max = 3で処理を開始しています。
最大何個のクライアントを処理するのかをここで決めます。ここでは試しに最大3個です。数選びのセンスが光っています。
実際にクライアントを3個以上立ち上げて試してみてください。やっぱり試さなくていいです。
利用例その3
冒頭でアプリケーションドメインの話をしたので、そのサンプルです。
ひとつのプロセス(メモリ空間)を複数の部屋(アプリケーションドメイン)に区分けして、それぞれの部屋で1つずつアプリケーションを実行します。プロセス的には1つですが、複数のアプリケーションが共存しています。アプリケーションドメイン間の通信でも問題ありません。
サーバアプリケーション:
open System open Imuziok.Actor let SetServers n = for i = 1 to n do Console.WriteLine ("start {0}", i) MailServer<string>.Start (fun box -> async { let! msg = box.Receive () Console.WriteLine ("{0} : {1}", i, msg) }, name = "十二指腸", max = n) |> ignore let SetClients n = let f name = (AppDomain.CreateDomain name).ExecuteAssembly "Client.exe" |> ignore for i = 1 to n do async { f ("room" + i.ToString ()) } |> Async.Start [<STAThreadAttribute>] do SetServers 5 SetClients 5 Console.ReadLine () |> ignore
クライアントアプリケーション:
open System open Imuziok.Actor let Sample3 () = let r = new Random () let client = MailClient<string>.Start (fun box -> async { System.Threading.Thread.Sleep (r.Next 350) box.Post ("hello! @" + AppDomain.CurrentDomain.FriendlyName) }, name = "十二指腸") () [<STAThreadAttribute>] do Sample3 ()
サーバのデフォルト・アプリケーションドメインの他に5つ部屋を追加して、その中でクライアントを実行します。
各クライアントはランダムに待って、サーバにメッセージをポストします。それをサーバ側でまとめて表示。
以下は実行例。
start 1
start 2
start 3
start 4
start 5
1 : hello! @room1
4 : hello! @room3
2 : hello! @room2
3 : hello! @room4
5 : hello! @room5続行するには何かキーを押してください . . .
というわけで例をいくつか挙げてみました。
つくりは完璧ではないので、実用したい方は適当に改造して使ってください。
では以下がソースです。バグでもあったら教えてください。教えなくてもいいです。(!)
namespace Imuziok open System open System.IO open System.IO.Pipes module Actor = open System.Runtime.Serialization.Formatters.Binary type IMail<'Msg> = interface inherit IDisposable abstract Post : 'Msg -> unit abstract Receive : unit -> Async<'Msg> abstract Close : unit -> unit end [<AbstractClass>] type MailBase<'Msg, 'S when 'S :> IO.Pipes.PipeStream> (initial : IMail<'Msg> -> Async<unit>, stream : 'S) = let reader = new StreamReader (stream) let writer = new StreamWriter (stream) let formatter = new BinaryFormatter () let mutable initialized = false let mutable started = false let ResetPosition (s : #Stream) = s.Position <- int64 0 let ReadToEnd (s : #PipeStream) : byte [] = let buffer : byte [] = Array.zeroCreate 1024 let collection = new ResizeArray<byte> () let rec dowhile () = s.Read (buffer, 0, buffer.Length) |> ignore collection.AddRange buffer if not s.IsMessageComplete then dowhile () dowhile () collection.ToArray () abstract Setup : unit -> unit default self.Setup () = initialized <- true member self.Stream with get () = stream member private self.Serialize (x : #obj) : byte [] = try use stream = new MemoryStream () formatter.Serialize (stream, x) stream.ToArray () with | e -> new Exception ("データのシリアライズに失敗!", e) |> raise member private self.Deserialize<'T> (arr : byte []) : 'T = try use stream = new MemoryStream () stream.Write (arr, 0, arr.Length) stream.Flush () ResetPosition stream formatter.Deserialize stream |> unbox<'T> with | e -> new Exception ("データのデシリアライズに失敗!", e) |> raise member self.Start () : unit = if started then InvalidOperationException "戦いはすでに始まっている!" |> raise else started <- true async { if not initialized then do! async { self.Setup () } do! initial self } |> Async.Start interface IMail<'Msg> with member self.Post (message : 'Msg) : unit = try let arr = self.Serialize message stream.Write (arr, 0, arr.Length) stream.Flush () with | e -> new Exception("Postできませんでした", e) |> raise member self.Receive () : Async<'Msg> = async { let buffer = ReadToEnd stream if buffer.Length = 0 then failwith "受信したものが空っぽだった" return self.Deserialize<'Msg> buffer } member self.Close () = stream.Close () interface IDisposable with /// memo : スコープから抜けると、たとえスレッドが走っていても解放されてしまうため、 /// useステートメントの使用には注意する member self.Dispose () = stream.Dispose () type private Server = NamedPipeServerStream // alias type private Client = NamedPipeClientStream // alias [<Sealed>] type MailServer<'Msg> private (initial, name, max) = inherit MailBase<'Msg, Server>(initial, new Server (name, PipeDirection.InOut, max, PipeTransmissionMode.Message)) override self.Setup () = self.Stream.WaitForConnection () base.Setup () static member Start (initial, name, max) : IMail<'Msg> = let server = new MailServer<'Msg> (initial, name, max) server.Start () server :> IMail<'Msg> [<Sealed>] type MailClient<'Msg> private (initial, name) = inherit MailBase<'Msg, Client>(initial, new Client (".", name, PipeDirection.InOut)) override self.Setup () = self.Stream.Connect () self.Stream.ReadMode <- PipeTransmissionMode.Message base.Setup () static member Start (initial, name) : IMail<'Msg> = let client = new MailClient<'Msg> (initial, name) client.Start () client :> IMail<'Msg>