December 2008





concurrency in f#

Cats: Uncategorized

Wed - 31 Dec 2008 - 02:33 PM

As i'm trying to get comfortable with f#, i started playing around with a standard problem concurrency. There's a number of standard-ish ways of doing things in f# for concurrency - either async workflows, message passing or good 'ole shared memory synchronization. Previously, i wrote a log parsing app, that took a 200mb log file, and ran through it looking for hits on one of two regexs. On my quad core machine, the excercise took several minutes, utilized ~20% of the cpu and produced a 7 meg file. The inital pass looked like so

 
#light
 
open System.IO
open System.Text.RegularExpressions
 
(* Returns the first match group if applicable. *)
let (|Match|_|) (pat:string) (inp:string) =
    let m = Regex.Match(inp, pat, RegexOptions.Compiled) in
 
    // Note the List.tl, since the first group is always the entirety of the matched string.
    if m.Success
    then Some (List.tl [ for g in m.Groups -> g.Value ])
    else None
 
(* Expects the match to find exactly three groups, and returns them. *)
let (|Match4|_|) (pat:string) (inp:string) =
    match (|Match|_|) pat inp with
    | Some (fst :: snd :: trd :: fth :: []) -> Some (fst, snd, trd, fth)
    | _ -> None   
 
let parser s w =
    match s with
    | Match4 "^\w* ([0-9/ :AP]*M): (\d*) ([^ ]*).*(Serializing|Deserializing)" (a,b,c,d) -> Some (w (a,b,c,d))
    | Match4 "^\w* ([0-9/ :AP]*M): (\d*) ([^ ]*).*already User in ([\w/]*)" (a,b,c,d) -> Some (w (a,b,c,d))
    | _ -> None
 
let reader =
    seq { use reader = new StreamReader(File.OpenRead("c:\\test.txt"))
          while not reader.EndOfStream do
              yield reader.ReadLine() }
 
let writer (w: TextWriter) (a,b,c,d) =
    let time, thread, session, action = (a,b,c,d)
    w.Write("{0},{1},", time, thread)
    w.WriteLine("{0},{1}", session, action)
 
let w = new StreamWriter("c:\\test123.out")
for s in reader do
    ignore <| parser s (writer w)
w.Close()
 

We play some games with active patterns, but it's all relatively straightforward (and compact). Now, the obvious observation is that reading 200mb on a modern system shouldn't take 7 minutes, so this thing is cpu bound. However, we dont' seem to be utilizing the remaining cores (duh). So, let's make this thing concurrent. We don't have long-running blocking operations here, so async workflows don't seem appropriate, and i'd prefer to stay away from standard c# design solutions (we're experimenting, right?), so message passing it is. The basic approach is to spawn off multiple regex threads (or work items), and then have a single synchronization point writing results back out. Simple - you throw your results at the mailbox, and then go through it one by one.

The first mistake i made (a rather rookie one, i admit) was misunderstanding how the agent works. When we declare the agent, we declaring two things, actually - a mailbox *and* an agent, so the agent should be doing the work, not some external entity. I first started down the path of writing to the mailbox, and then trying to fetch messages out of it for writing, which certainly didn't work - it would block on the fetch as there was an influx of async messages. The correct (or one that worked) approach is this

 
#light
 
open Microsoft.FSharp.Control
open Microsoft.FSharp.Control.CommonExtensions
open System.IO
open System.Text.RegularExpressions
open System.Threading
open System.Diagnostics
 
type MessageGroup = string * string * string * string
type Message = Match of MessageGroup | Fetch of AsyncReplyChannel<int> | Stop
 
type RequestGate(n:int) =
    let semaphore = new Semaphore(initialCount=n,maximumCount=n)
    member x.AcquireAsync(?timeout) =
        async { let! ok = semaphore.AsyncWaitOne(?millisecondsTimeout=timeout)
                if ok then
                    return
                        { new System.IDisposable with
                            member x.Dispose() = ignore <| semaphore.Release() }
                else
                    return! failwith "couldn't acquire semaphore" }
 
let writer (w: TextWriter) (a,b,c,d) =
    let time, thread, session, action = (a,b,c,d)
    w.Write("{0},{1},", time, thread)
    w.WriteLine("{0},{1}", session, action)
    w.Flush()
 
let w = new StreamWriter("c:\\test123.out")
 
let replyProcessor = MailboxProcessor.Start(fun inbox ->
     let rec loop(matchCount) =
        async {
            let! msg = inbox.Receive()
            match msg with
            | Match m ->
                writer w m
                return! loop(matchCount + 1)
            | Stop ->
                return ()
            | Fetch  replyChannel  ->
                do replyChannel.Reply (matchCount)
                return! loop(matchCount) } 
 
     loop(0))
 
let notifier (a,b,c,d) =
    replyProcessor.Post(Match(a,b,c,d))
 
let gate = RequestGate(4)
 
let main =
    let s = new Stopwatch()
    let autoResetEvent = new AutoResetEvent(false)
    s.Start()
 
    Async.Spawn(
        async{
            use! holder = gate.AcquireAsync()
            for s in reader do
                Async.Spawn(async { parser s notifier })
            do replyProcessor.Post(Stop)
            ignore <| autoResetEvent.Set()
            })
 
    while (not(autoResetEvent.WaitOne(100, false))) && replyProcessor.PostAndReply(fun replyChannel -> Fetch(replyChannel)) > 0 do ()
 
    printf "execution took %A" s.Elapsed
    w.Close()
 
mai

Spawn off a number of processing threads (using the request gate to limit the number of concurrent requests), write the results to the mailbox, which will process them sequentially. Notice how there are some games in main that get notified of when processing is done.

No Comments »


rsync, fsync, ssync… i’m lost

Cats: Uncategorized

Sun - 28 Dec 2008 - 10:42 PM

Over the year and a half (or so) that I have had the blog entry about the rsync port, I have gotten a fairly significant amount of interest about a more functionally-complete and robust version. To that end, I have started the Fsync project, backed by hill30. The hope is that we will be able to reverse-engineer the current rsync wire protocol from the source, and then build a .net application that will be able to interface with it. Seeing as how rsync is a rather complex app, I've set up a wiki to track our progress and present our findings. I look forward to any input you might have.

2 Comments »


f# exploration part 101. mvc with f#

Cats: Uncategorized

Fri - 19 Dec 2008 - 01:49 PM

i've mentioned the bistro deal before. one of the things i've been exploring with it recently is seeing how well f# would work as the language for the controllers and what (if any) advantages there would be. i took a first stab at it, here's what i've got so far. the code below is two version of a controller that implements search functionality on the norecruiters project.

in c#:

 
using System;
using System.Data;
using System.Configuration;
using System.Web;
using System.Web.Security;
using System.Web.UI;
using System.Web.UI.WebControls;
using System.Web.UI.WebControls.WebParts;
using System.Web.UI.HtmlControls;
using Bistro.Controller;
using NoRecruiters3.Helpers;
using System.Collections.Generic;
using System.Web.SessionState;
using System.Text;
using WAPNoRecruiters3.Controllers;
using Bistro.Controller.Descriptor.Data;
 
namespace NoRecruiters3.Controllers.Actions
{
    [Bind("/action/search/{contentType}")]
    [DependsOn("currentTags")]
    public class Search : BaseController
    {
        [FormField]
        protected string txtQuery;
 
        protected struct _contexts
        {
            public const string currentTags = "currentTags";
            public const string popularTags = "popularTags";
            public const string searchResults = "searchResults";
            public const string contentType = "contentType";
        }
 
        protected string contentType;
 
        public override void ProcessRequest(HttpContext context, IContext requestContext)
        {
            base.ProcessRequest(context, requestContext);
 
            prepareSearch(context, requestContext);
            prepareTags(context, requestContext);
 
            requestContext.RenderWith("/Templates/Search/search.bistro");
        }
 
        private void prepareTags(HttpContext context, IContext requestContext)
        {
            requestContext.Add(_contexts.currentTags, GetCurrentTags(context.Session));
            requestContext.Add(_contexts.popularTags, GetPopularTags(context.Session));
        }
 
        private void prepareSearch(HttpContext context, IContext requestContext)
        {
            requestContext.Add(
                _contexts.searchResults,
                PostingHelper.Instance.PostingSearch(
                    txtQuery,
                    GetCurrentTagsAsCDL(context.Session),
                    ContentTypeHelper.Instance.Parse(contentType)));
 
            requestContext[_contexts.contentType] = contentType;
        }
 
        private List<Tags.data_Node> GetPopularTags(HttpSessionState session)
        {
            List<string> currentTags = null;
 
            if (session[_contexts.currentTags] != null)
                currentTags = (List<string>)session[_contexts.currentTags];
 
            List<Tags.data_Node> popular = new List<Tags.data_Node>();
            foreach (Tags.data_Node tag in TagHelper.Instance.GetTags(15))
                if (currentTags == null || !currentTags.Contains(tag.SafeText))
                    popular.Add(tag);
 
            return popular;
        }
 
        private List<string> GetCurrentTags(HttpSessionState session)
        {
            if (session["currentTags"] != null)
                return (List<string>)session["currentTags"];
 
            return null;
        }
 
        /// <summary>
        /// gets all current tags as a comma-delimited list
        /// </summary>
        ///
<param name="session"></param>
        /// <returns></returns>
        private string GetCurrentTagsAsCDL(HttpSessionState session)
        {
            List<string> tags = GetCurrentTags(session);
 
            if (tags == null || tags.Count == 0)
                return null;
 
            StringBuilder sbl = new StringBuilder();
 
            foreach (string tag in tags)
                sbl.Append(tag).Append(',');
 
            sbl.Remove(sbl.Length - 1, 1);
 
            return sbl.ToString();
        }
    }
}

and in f#:

 
#light
 
open Bistro.Controller
open Bistro.Controller.Descriptor.Data
open NoRecruiters3.Helpers
open WorkflowServer.Foundation.Documents
open System.Web.SessionState
open System.Web
 
module Search =
    let currentTags = "currentTags"
    let searchResults = "searchResults"
    let popularTags = "popularTags"
 
    let getCurrentTags (session: HttpSessionState) = session |> SysHelper.get currentTags
 
    let getCurrentTagsAsCDL (session: HttpSessionState) =
        let tags = getCurrentTags session
        match tags with
        | None -> ""
        | Some v ->
            let r = (Set.fold_left (fun state elem -> state ^ " ," ^ elem) "" (v <img src='http://www.russiantequila.com/wordpress/wp-includes/images/smilies/icon_confused.gif' alt=':?' class='wp-smiley' /> > Set<string>))
            r.[0..(r.Length-2)]
 
    let getPopularTags (session: HttpSessionState) =
        // make a map out of curent tags
        let currentTags =
            match getCurrentTags session with
            | None -> Map.empty
            | Some v -> Map.of_list (List.scan_left (fun state e -> (e, (snd state) + 1)) ("", -1) (v <img src='http://www.russiantequila.com/wordpress/wp-includes/images/smilies/icon_confused.gif' alt=':?' class='wp-smiley' /> > List<string>))
 
        let enum = TagHelper.Instance.GetTags(15).GetEnumerator()
 
        // fold over the enumerator, building a list of elements that aren't in the current tags map
        SysHelper.foldl_lambda
            (fun state e -> if Map.mem e currentTags then state else state @ [e])
            []
            (fun () ->
                let hasNext = enum.MoveNext()
                (hasNext, if hasNext then (enum.Current <img src='http://www.russiantequila.com/wordpress/wp-includes/images/smilies/icon_confused.gif' alt=':?' class='wp-smiley' /> > NoRecruiters3.Tags.data_Node).SafeText else ""))
 
    let prepareSearch (context: IContext) txtQuery contentType =
        let tags = getCurrentTagsAsCDL HttpContext.Current.Session
        context.Add(
            searchResults,
            PostingHelper.Instance.PostingSearch(
                txtQuery,
                tags,
                ContentTypeHelper.Instance.Parse(contentType)))
 
        context.["contentType"] <- contentType
 
    let prepareTags (context: IContext) =
        context.Add(currentTags, getCurrentTags HttpContext.Current.Session)
        context.Add(popularTags, getPopularTags HttpContext.Current.Session)
 
    [<Bind("/action/search/{contentType}")>]
    [<DependsOn("currentTags")>]
    type carrier() =
        inherit AbstractController()
 
        let mutable contentType = ""
        let mutable query = ""
 
        [<FormField>]
        member this.txtQuery with get() = query and set(v) = query <- v
 
        override this.ProcessRequest (context, requestContext) =
            prepareSearch requestContext this.txtQuery contentType
            prepareTags requestContext
 
            requestContext.RenderWith "/Templates/Search/search.bistro"

now, i know there are better ways of doing this, since i'm only learning my way around this. this is my first pass at the conversion, and i've effectively taken the current methods of the c# controller and re-written them in f#. one of the things i'm realizing is that a straight port is a bad approach. you really have to step back and look at the problem you're trying to solve anew, because the language restrictions and boundarys you typically work with in an imperative language don't necessarily apply to a functional one, and vice versa.

more to come. once i get this all running, i'll run a load test and do some tweaking. right now comparing a c# bistro app to the same app written in web forms, the bistro app uses half the cpu for the same transaction count as the web forms app. we'll see how this fares.

No Comments »
Meta
Posts | Comments | RDF | Atom | Valid XHTML | CSS | Log in