concurrency in f#
Cats: Uncategorized
As i'm trying to get comfortable with f#, i started playing around with a standard problem concurrency. There's a number of standard-ish ways of doing things in f# for concurrency - either async workflows, message passing or good 'ole shared memory synchronization. Previously, i wrote a log parsing app, that took a 200mb log file, and ran through it looking for hits on one of two regexs. On my quad core machine, the excercise took several minutes, utilized ~20% of the cpu and produced a 7 meg file. The inital pass looked like so
#light open System.IO open System.Text.RegularExpressions (* Returns the first match group if applicable. *) let (|Match|_|) (pat:string) (inp:string) = let m = Regex.Match(inp, pat, RegexOptions.Compiled) in // Note the List.tl, since the first group is always the entirety of the matched string. if m.Success then Some (List.tl [ for g in m.Groups -> g.Value ]) else None (* Expects the match to find exactly three groups, and returns them. *) let (|Match4|_|) (pat:string) (inp:string) = match (|Match|_|) pat inp with | Some (fst :: snd :: trd :: fth :: []) -> Some (fst, snd, trd, fth) | _ -> None let parser s w = match s with | Match4 "^\w* ([0-9/ :AP]*M): (\d*) ([^ ]*).*(Serializing|Deserializing)" (a,b,c,d) -> Some (w (a,b,c,d)) | Match4 "^\w* ([0-9/ :AP]*M): (\d*) ([^ ]*).*already User in ([\w/]*)" (a,b,c,d) -> Some (w (a,b,c,d)) | _ -> None let reader = seq { use reader = new StreamReader(File.OpenRead("c:\\test.txt")) while not reader.EndOfStream do yield reader.ReadLine() } let writer (w: TextWriter) (a,b,c,d) = let time, thread, session, action = (a,b,c,d) w.Write("{0},{1},", time, thread) w.WriteLine("{0},{1}", session, action) let w = new StreamWriter("c:\\test123.out") for s in reader do ignore <| parser s (writer w) w.Close()
We play some games with active patterns, but it's all relatively straightforward (and compact). Now, the obvious observation is that reading 200mb on a modern system shouldn't take 7 minutes, so this thing is cpu bound. However, we dont' seem to be utilizing the remaining cores (duh). So, let's make this thing concurrent. We don't have long-running blocking operations here, so async workflows don't seem appropriate, and i'd prefer to stay away from standard c# design solutions (we're experimenting, right?), so message passing it is. The basic approach is to spawn off multiple regex threads (or work items), and then have a single synchronization point writing results back out. Simple - you throw your results at the mailbox, and then go through it one by one.
The first mistake i made (a rather rookie one, i admit) was misunderstanding how the agent works. When we declare the agent, we declaring two things, actually - a mailbox *and* an agent, so the agent should be doing the work, not some external entity. I first started down the path of writing to the mailbox, and then trying to fetch messages out of it for writing, which certainly didn't work - it would block on the fetch as there was an influx of async messages. The correct (or one that worked) approach is this
#light open Microsoft.FSharp.Control open Microsoft.FSharp.Control.CommonExtensions open System.IO open System.Text.RegularExpressions open System.Threading open System.Diagnostics type MessageGroup = string * string * string * string type Message = Match of MessageGroup | Fetch of AsyncReplyChannel<int> | Stop type RequestGate(n:int) = let semaphore = new Semaphore(initialCount=n,maximumCount=n) member x.AcquireAsync(?timeout) = async { let! ok = semaphore.AsyncWaitOne(?millisecondsTimeout=timeout) if ok then return { new System.IDisposable with member x.Dispose() = ignore <| semaphore.Release() } else return! failwith "couldn't acquire semaphore" } let writer (w: TextWriter) (a,b,c,d) = let time, thread, session, action = (a,b,c,d) w.Write("{0},{1},", time, thread) w.WriteLine("{0},{1}", session, action) w.Flush() let w = new StreamWriter("c:\\test123.out") let replyProcessor = MailboxProcessor.Start(fun inbox -> let rec loop(matchCount) = async { let! msg = inbox.Receive() match msg with | Match m -> writer w m return! loop(matchCount + 1) | Stop -> return () | Fetch replyChannel -> do replyChannel.Reply (matchCount) return! loop(matchCount) } loop(0)) let notifier (a,b,c,d) = replyProcessor.Post(Match(a,b,c,d)) let gate = RequestGate(4) let main = let s = new Stopwatch() let autoResetEvent = new AutoResetEvent(false) s.Start() Async.Spawn( async{ use! holder = gate.AcquireAsync() for s in reader do Async.Spawn(async { parser s notifier }) do replyProcessor.Post(Stop) ignore <| autoResetEvent.Set() }) while (not(autoResetEvent.WaitOne(100, false))) && replyProcessor.PostAndReply(fun replyChannel -> Fetch(replyChannel)) > 0 do () printf "execution took %A" s.Elapsed w.Close() mai
Spawn off a number of processing threads (using the request gate to limit the number of concurrent requests), write the results to the mailbox, which will process them sequentially. Notice how there are some games in main that get notified of when processing is done.