F#でプロセス間通信

MailboxProcessorというクラスを使ってやると、スレッド間で簡単にメッセージのやり取りをすることができます。
でもこれがプロセス間となると一工夫が必要。F#側には用意されていないので、BCLの力を借りることになります。
しかし.NETに用意されたプロセス間通信のためのクラスは、F#のMailboxProcessorとはかなり毛色が異なりますし、使い方も少々面倒。というわけで、これらの仕組みをラップしてF#色に染め直してみました。
MailboxProcessorに比べると機能は少ないですが、ほとんど同じようにして利用することができます。


ちなみに、.NET(CLR)にはアプリケーションドメインという概念があるため、「プロセス間通信」というよりは、「アプリケーション間通信」と言ってやった方が適切かもしれません。


結構長いのでソースは一番最後に載せます。まずはこのクラスの利用方法から。

利用例その1

サーバアプリケーション:

open System
open Imuziok.Actor

let Sample1 () =
    let server = MailServer<string>.Start (fun box -> async {
        let! msg = box.Receive ()
        Console.WriteLine msg
        box.Post "さようなら"
        Console.WriteLine "サーバおしまい"
    }, name = "ServerName", max = 1)
    
    Console.ReadLine () |> ignore // 待機
    server.Close ()

[<STAThreadAttribute>]
do
    Sample1 ()


クライアントアプリケーション:

open System
open Imuziok.Actor

let Sample1 () =
    let client = MailClient<string>.Start (fun box -> async {
        box.Post "おはようございます"
        let! msg = box.Receive ()
        Console.WriteLine ("{0} \n クライアントおしまい", msg)
    }, name = "ServerName")
    
    Console.ReadLine () |> ignore
    client.Close ()

[<STAThreadAttribute>]
do
    Sample1 ()

最も単純な例です。
ひとつのサーバに対してひとつのクライアントが接続しています。
サーバに名前をつけてやる必要があります。ここでは"ServerName"という命名をしてあります。ネーミングセンスが光っています。
Startメソッドを呼びだすと非同期に処理が走り始めます。

利用例その2

今度は、ひとつのサーバで複数のクライアントを処理するサンプルです。
サーバアプリケーション:

open System
open Imuziok.Actor

let Sample2 n =
    for i = 1 to n do
        MailServer<string>.Start (fun box -> async {
            let! msg = box.Receive ()
            Console.WriteLine ("[{0}] : {1}", i, msg)
        }, name = "unko", max = n) |> ignore

    Console.WriteLine "サーバすたーと"
    Console.ReadLine () |> ignore

[<STAThreadAttribute>]
do
    Sample2 3


クライアントアプリケーション:

open System
open Imuziok.Actor

let Sample2 () =
    Console.Write ">"
    let msg = Console.ReadLine ()
    
    use client = MailClient<string>.Start (fun box -> async {
        box.Post msg
    }, name = "unko")
    
    Console.ReadLine () |> ignore

[<STAThreadAttribute>]
do
    Sample2 ()

サーバ側のプログラムでは、max = 3で処理を開始しています。
最大何個のクライアントを処理するのかをここで決めます。ここでは試しに最大3個です。数選びのセンスが光っています。
実際にクライアントを3個以上立ち上げて試してみてください。やっぱり試さなくていいです。

利用例その3

冒頭でアプリケーションドメインの話をしたので、そのサンプルです。
ひとつのプロセス(メモリ空間)を複数の部屋(アプリケーションドメイン)に区分けして、それぞれの部屋で1つずつアプリケーションを実行します。プロセス的には1つですが、複数のアプリケーションが共存しています。アプリケーションドメイン間の通信でも問題ありません。
サーバアプリケーション:

open System
open Imuziok.Actor

let SetServers n =
    for i = 1 to n do
        Console.WriteLine ("start {0}", i)
        MailServer<string>.Start (fun box -> async {
            let! msg = box.Receive ()
            Console.WriteLine ("{0} : {1}", i, msg)
        }, name = "十二指腸", max = n) |> ignore
    
let SetClients n =
    let f name =
        (AppDomain.CreateDomain name).ExecuteAssembly "Client.exe"
        |> ignore
        
    for i = 1 to n do 
        async { f ("room" + i.ToString ()) } |> Async.Start

[<STAThreadAttribute>]
do
    SetServers 5
    SetClients 5
    Console.ReadLine () |> ignore


クライアントアプリケーション:

open System
open Imuziok.Actor

let Sample3 () =
    let r = new Random ()
    let client = MailClient<string>.Start (fun box -> async {
        System.Threading.Thread.Sleep (r.Next 350)
        box.Post ("hello! @" + AppDomain.CurrentDomain.FriendlyName)
    }, name = "十二指腸")
    ()

[<STAThreadAttribute>]
do
    Sample3 ()

サーバのデフォルト・アプリケーションドメインの他に5つ部屋を追加して、その中でクライアントを実行します。
各クライアントはランダムに待って、サーバにメッセージをポストします。それをサーバ側でまとめて表示。
以下は実行例。

start 1
start 2
start 3
start 4
start 5
1 : hello! @room1
4 : hello! @room3
2 : hello! @room2
3 : hello! @room4
5 : hello! @room5

続行するには何かキーを押してください . . .






というわけで例をいくつか挙げてみました。
つくりは完璧ではないので、実用したい方は適当に改造して使ってください。
では以下がソースです。バグでもあったら教えてください。教えなくてもいいです。(!)

namespace Imuziok

open System
open System.IO
open System.IO.Pipes

module Actor =
    open System.Runtime.Serialization.Formatters.Binary
    

    type IMail<'Msg> = interface
        inherit IDisposable
        abstract Post    : 'Msg -> unit
        abstract Receive : unit -> Async<'Msg>
        abstract Close   : unit -> unit
    end

    
    [<AbstractClass>]
    type MailBase<'Msg, 'S when 'S :> IO.Pipes.PipeStream>
            (initial : IMail<'Msg> -> Async<unit>, stream : 'S) =
        let reader = new StreamReader (stream)
        let writer = new StreamWriter (stream)
        let formatter = new BinaryFormatter ()
        let mutable initialized = false
        let mutable started = false
        
        let ResetPosition (s : #Stream) = s.Position <- int64 0
        
        let ReadToEnd (s : #PipeStream) : byte [] =
            let buffer : byte [] = Array.zeroCreate 1024
            let collection = new ResizeArray<byte> ()
            let rec dowhile () =
                s.Read (buffer, 0, buffer.Length) |> ignore
                collection.AddRange buffer
                if not s.IsMessageComplete then dowhile ()
            dowhile ()
            collection.ToArray ()

        abstract Setup : unit -> unit
        default self.Setup () =
            initialized <- true
        
        member self.Stream with get () = stream
        
        member private self.Serialize (x : #obj) : byte [] =
            try
                use stream = new MemoryStream ()
                formatter.Serialize (stream, x)
                stream.ToArray ()
            with
            | e -> new Exception ("データのシリアライズに失敗!", e) |> raise

        member private self.Deserialize<'T> (arr : byte []) : 'T =
            try
                use stream = new MemoryStream ()
                stream.Write (arr, 0, arr.Length)
                stream.Flush ()
                ResetPosition stream
                formatter.Deserialize stream |> unbox<'T>
            with
            | e -> new Exception ("データのデシリアライズに失敗!", e) |> raise

        member self.Start () : unit =
            if started then InvalidOperationException "戦いはすでに始まっている!" |> raise
            else started <- true
            async {
                if not initialized then
                    do! async { self.Setup () }
                do! initial self
            }
            |> Async.Start

        interface IMail<'Msg> with
            member self.Post (message : 'Msg) : unit =
                try
                    let arr = self.Serialize message
                    stream.Write (arr, 0, arr.Length)
                    stream.Flush ()
                with
                | e -> new Exception("Postできませんでした", e) |> raise
            
            member self.Receive () : Async<'Msg> = async {
                let buffer = ReadToEnd stream
                if buffer.Length = 0 then failwith "受信したものが空っぽだった"
                return self.Deserialize<'Msg> buffer
            }
            
            member self.Close () = stream.Close ()

        interface IDisposable with
            /// memo : スコープから抜けると、たとえスレッドが走っていても解放されてしまうため、
            ///        useステートメントの使用には注意する
            member self.Dispose () = stream.Dispose ()


    type private Server = NamedPipeServerStream // alias
    type private Client = NamedPipeClientStream // alias
    
    
    [<Sealed>]
    type MailServer<'Msg> private (initial, name, max) =
        inherit MailBase<'Msg, Server>(initial,
            new Server (name, PipeDirection.InOut, max, PipeTransmissionMode.Message))
        
        override self.Setup () =
            self.Stream.WaitForConnection ()
            base.Setup ()

        static member Start (initial, name, max) : IMail<'Msg> =
            let server = new MailServer<'Msg> (initial, name, max)
            server.Start ()
            server :> IMail<'Msg>


    [<Sealed>]
    type MailClient<'Msg> private (initial, name) =
        inherit MailBase<'Msg, Client>(initial, new Client (".", name, PipeDirection.InOut))
        
        override self.Setup () =
            self.Stream.Connect ()
            self.Stream.ReadMode <- PipeTransmissionMode.Message
            base.Setup ()

        static member Start (initial, name) : IMail<'Msg> =
            let client = new MailClient<'Msg> (initial, name)
            client.Start ()
            client :> IMail<'Msg>