Category: Imported

Reading input in F#

This article was originally published at tech.blinemedical.com

I’ve been playing with F# lately, much to the the chagrin of Sam, but I still think it’s fun as an excersize in thinking differently. I also find its terseness lets you prototype ideas quickly, encouraging you to experiment and tweak your code. However, I’m much more used to imperative programming, so when I started writing an F# program that needed user input I hit a small roadblock: how do I get input, validate, and ask for it again if I want to stay purely functional and leverage immutable values?

In an imperative language you might write something like this:

String path = null;
while(true)
{
    path = Console.ReadLine();
    if (File.Exists(path))
    {
        break;
    }

    Console.WriteLine("File doesn't exist");
} 

You can’t declare path inside the while loop or its loses its scope. If you need to use path outside of the while loop, then it might seem like you have to let path be mutable. But, what if we did this:

private String GetPath()
{
    while(true)
    {
        var path = Console.ReadLine();
        if (File.Exists(path))
        {
            return path;
        }
        Console.WriteLine("File doesn't exist");
    }
}

Now we don’t ever update any variables. We only ever use direct assignment. This sounds pretty functional to me. But, we still can’t directly translate into F#. Remembering that in F# the last statement is the return value, what does this return?

let falseItem =
    while true do
        false

This is actually an infinite loop; the while loop won’t ever return false. In F#, a while loop can’t return from it’s body, since the body expression return type has to be of type unit. If you imagine the while loop as a function that takes a predicate and a lambda for the body then this makes sense. The whileLoop function will execute the body as long as the predicate returns true. So, in psuedocode, it kind of looks like this

whileLoop(predicate, body) = {
  while predicate() do {
     body()
  }
}

Now what? Well, turning this while loop into a recursive structure with immutable types is actually pretty easy:

let rec documentPath =
    fun () ->
        Console.Write("File path: ")
        let path = Console.ReadLine()
        if not(File.Exists path) then
            Console.WriteLine("File does not exist")
            documentPath()
        else path

The trick here is to define documentPath as a recursive function. Either the function returns a valid path, or it calls itself executing the next “step” in our while loop. Also, since we don’t need to do any work after the recursive function call, F# can optimize this to use tail call optimization. The documentPath variable is of type unit -> string meaning it’s a function that takes a unit type and returns a string. To actually get the path, we execute documentPath(), where () is the unit type.

Now we have a function that uses immutable types, but continuously reads in user input and won’t return until the input is valid.

Though, if you really want to use imperative style loop breaks, you can, but it’s not trivial.

Debugging piped operations in F#

This article was originally published at tech.blinemedical.com

A little on the pipe operator

In F# you can create piped operations using the |> operator. This takes the output of the previous statement and funnels it as the input to the next statement. Using the pipe operator, a statement like this:

x |> f |> g |> h

Means having functions nested like this:

h(g(f(x))

So a piece of code like this:

let print item = Console.WriteLine(item.ToString)

let seqDebug =
        [0..1000]
                |> List.map (fun i -> i + 1)
                |> List.filter (fun i -> i < 5)
                |> List.head
                |> print

Decompiles into this (formatting added):

[DebuggerBrowsable(DebuggerBrowsableState.Never)]
internal static Unit seqDebugu00407;

public static void mainu0040()
{
    Program.print(
        ListModule.Head(
            ListModule.Filter((FSharpFunc<int, bool>) new Program.seqDebugu004010(),
                ListModule.Map<int, int>((FSharpFunc<int, int>) new Program.seqDebugu00409u002D1(),
                    SeqModule.ToList(Operators.CreateSequence(
                        Operators.OperatorIntrinsics.RangeInt32(0, 1, 1000)))))));

    u0024Program.seqDebugu00407 = (Unit) null;
}

Which really boils down to:

seqDebug = Print(Head(Filter(Map(sequence))))

The F# syntax is nice because it lets us write code from the outside in, instead of inside out.

Debugging it

Now that we know what F# is doing, lets say we want to debug the print statement. You can’t use your normal “Step Over” F10 key to go through your piped statement here because it compiles down to a one line group of nested functions. We could use the “Step Into” key (F11) to step into the entire sequence but then we have to execute the anonymous map lambda 1001 times just to get to the next statement. Then another 1001 for the filter. Then the head statement, and finally, our print. No thanks.

Thankfully, Visual Studio has thought of this and you can use the Step Into Specific functionality. This lets you see the list of nested functions at that line and you can jump into whatever you need to here. Step Into Specific isn’t an F# only feature, but I never realized it existed until I ran into this scenario.

Step into Specific

The example is a little trivial, since you would’ve just put a breakpoint in the print statement, right? But what if you are piping through F# operators like List.map and List.filter? In these cases it can be hard to know what is the direct input to these functions since the input argument is automatically applied. For these scenarios, a simple identity function can be really helpful:

let identity item = item

let seqDebug =
        [0..1000]
                |> List.map (fun i -> i + 1)
                |> identity
                |> List.filter (fun i -> i < 5)
                |> List.head

So you can sprinkle in your identity function and put breakpoints there. This way you can inject yourself into the middle of this sequence.

Piping with functions that return unit

Taking this one step further, sometimes I want to print out a value in the middle of the sequence, or call a function that has a return type of unit but continue piping. Because let’s be honest here, when all else fails nothing beats a well placed printf in your code. But, we’re left with a small dilemma: since pipes take the output of the last function and use it as the input to the next function we can’t really use print statements. Both printf and Console.WriteLine effectively return a void. Putting them in the middle of a chain won’t work since their output won’t map to the next functions input (unless that next function takes unit).

However, F# lets you define your own operators, so I created one that I like to call the “argument identity” that executes a function which returns void and then returns the original argument (acting as an argument identity function):

let (~~) (func:'a-> unit) (arg:'a) = (func arg) |> fun () -> arg

The ~~ symbol is a prefix operator that takes a function of one argument that returns unit, then closes the argument into a function with type unit -> 'a. Then I pipe the return value (unit) to the anonymous function (that takes unit) which will return the closed value of the original argument. Now I can do things like this:

let (~~) (func:'a-> unit) (arg:'a) = (func arg) |> fun () -> arg

let seqDebug =
        [0..1000]
                |> List.map (fun i -> i + 1)
                |> ~~ Console.WriteLine
                |> List.filter (fun i -> i < 3)
                |> ~~ Console.WriteLine
                |> List.head
                |> ~~ Console.WriteLine

Which prints out

[1; 2; 3; ... ]
[1; 2]
1

You obviously don’t need your own operator, you can make it a named helper function if you want. Either way, some sort of argument identity function is useful in these scenarios.

Disassemble the pipe

And of course, when all else fails, you can break up the sequence into a series of let statements to debug it the old fashioned way.

let seqDebugDecomposed =
        let source = [0..1000]
        let sourcePlusOne = List.map (fun i -> i + 1) source
        let filteredSource = List.filter (fun i -> i < 3) sourcePlusOne
        let listHead = List.head filteredSource
        print listHead
RESTful web endpoints on Netduino Plus

This article was originally published at tech.blinemedical.com

I have a Netduino plus at home and I love it. Not only can you use C# to write for it, but you get full visual studio integration including live breakpoints! I got the Netduino plus over the Netduino because the Netduino plus has a built in ethernet jack and ethernet stack support. This way I could access my microcontroller over the web if I wanted to (and who wouldn’t?).

But to expose your Netduino to the web you need to write a simple web server. Basically open a socket at port 80 and read/write requests out to it. The Netduino community is great at sharing code and I quickly found a nice web server by Jasper Schuurmans. His server code let you define RESTful routes like this

http://NetduinoIPAddress/targetFunction/arg1/arg2/...

Which was super cool. It even filtered out non-registered commands, allowing you to control what requests would trigger a “command found” event. Here is the basic main of his demo.

public static void Main()
{
    // Instantiate a new web server on port 80.
    WebServer server = new WebServer(80);

    // Add a handler for commands that are received by the server.
    server.CommandReceived += new WebServer.CommandReceivedHandler(server_CommandReceived);

    // Add a command that the server will parse.
    // Any command name is allowed; you will decide what the command does
    // in the CommandReceived handler. The server will only fire CommandReceived
    // for commands that are defined here and that are called with the proper
    // number of arguments.
    // In this example, I define a command 'SetLed', which needs one argument (on/off).
    // With this statement, I defined that we can call our server on (for example)
    // http://[server-ip]/SetLed/on
    // http://[server-ip]/SetLed/off
    server.AllowedCommands.Add(new WebCommand("SetLed", 1));

    // Start the server.
    server.Start();

    // Make sure Netduino keeps running.
    while (true)
    {
        Debug.Print("Netduino still running...");
        Thread.Sleep(10000);
    }
}

/// <summary>
/// Handles the CommandReceived event.
/// </summary>
private static void server_CommandReceived(object source, WebCommandEventArgs e)
{

    Debug.Print("Command received:" + e.Command.CommandString);

    switch (e.Command.CommandString)
    {
        case "SetLed":
            {
                // Do you stuff with the command here. Set a led state, return a
                // sampled value of an analog input, whatever.
                // Use the ReturnString property to (optionally) return something
                // to the web user.

                // Read led state from command and set led state.
                bool state = ( e.Command.Arguments[0].Equals("on") ? true : false);
                onBoardLed.Write(state);

                // Return feedback to web user.
                e.ReturnString = "<html><body>You called SetLed with argument: " + e.Command.Arguments[0].ToString() + "</body></hmtl>";
                break;
            }
    }
}

While this certainly works, there were a few things I didn’t like about this setup:

  • You have to route the logic from a single switch statement. If you were building more than one restful endpoint in your Netduino, this centralized switch statement would get messy.
  • You have to declare the target argument length when registering the command. This means that if you update the target function’s argument parameters, you also have to update the registration code.
  • The server was single-threaded. It uses events to alter program flow. But since events execute in the dispatchers thread, if your execution code took a while, you basically stalled the entire server.
  • REST endpoints were actually case sensitive

The reworked final copy

Before we dig into what I changed, lets look at my final reworked main and you can compare it to the original main I posted above:

public static void Main()
{
    LcdWriter.Instance.Write("Web Demo Ready!" + DateTime.Now.TimeOfDay);

    WebServerWrapper.InitializeWebEndPoints(new ArrayList
                                                {
                                                    new BasicPage()
                                                });

    WebServerWrapper.StartWebServer();

    RunUtil.KeepRunning();
}

Here, BasicPage is an object that encapsulates its route definitions as well as what to invoke when a target route is found. Next, I’m registering the object with a web service wrapper and then starting the web server. This way, I’ve removed the command handling from our main loop and encapsulated logic into individual components.

Injecting endpoints

In order to get rid of the central switch statement, I wanted to encapsulate all the logic of endpoint name, endpoint arguments, and target function to invoke in a single object. This would let me build a single class whose sole job was to be executed when the web server routed it the command. On top of that, you now can cleanly maintain endpoint state and other information all within a single object. So, if you were building an endpoint whose job is to show you the temperature of your refrigerator over the last 3 hours, you can store that information in your endpoint object and when the endpoint is invoked, print out some nice html that shows the current and historical data.

As an example, let’s create a class that prints whatever arguments were received from the server onto a connected LCD. First we’ll have it implement a target interface called IEndPointProvider which looks like this:

public interface IEndPointProvider
{
    void Initialize();
    ArrayList AvailableEndPoints();
}
  • Initialize would be class specific initialization logic. If we don’t need to use resources until we are about to fire up the server then we can put that logic into here.
  • AvailableEndPoints is a list of EndPoints that we can use to register with the server. In case you’re wondering about the ArrayList, .NET Micro doesn’t support generics, so we’re not using something like List<T>

And here is my implementation of IEndPointProvider which echos the arguments to a connected LCD:

public class BasicPage : IEndPointProvider
{
    #region Endpoint initialization

    public void Initialize() { }

    public ArrayList AvailableEndPoints()
    {
        var list = new ArrayList
            {
                new EndPoint
                    {
                        Action = Echo,
                        Name = "echoArgs",
                        Description = "Writes the URL arguments to a serial LCD hooked up to COM1"
                    }
            };
        return list;
    }

    #endregion

    #region Endpoint Execution

    private string Echo(EndPointActionArguments misc, string[] items)
    {
        String text = "";
        if (items != null && items.Length > 0)
        {
            foreach (var item in items)
            {
                text += item + " ";
            }
        }
        else
        {
            text = "No arguments!";
        }

        LcdWriter.Instance.Write(text);

        return "OK. Wrote out: " + (text.Length == 0 ? "n/a" : text);
    }

    #endregion
}

You can see that we’re exposing an array list of EndPoint objects that define the action to execute, what the target action’s name is (i.e. the REST endpoint), and a short description about what the endpoint does (for an API listing we can create later).

The target function Echo takes an EndPointActionArguments object that contains some state about the current connection, and a list of objects representing the variable arguments to the REST endpoint.

End point

Let’s take a look at what an endpoint is.

public delegate string EndPointAction(EndPointActionArguments arguments, params string[] items);

public class EndPointActionArguments
{
    public Socket Connection { get; set; }
}

public class EndPoint
{
    private string[] _arguments;

    public bool UsesManualSocket { get; set; }

    public string Description { get; set; }

    /// <summary>
    /// The function to be called when the endpoint is hit
    /// </summary>
    public EndPointAction Action
    {
        private get; set;
    }

    /// <summary>
    /// The name of the endpoint, this is basically the servers route
    /// </summary>
    public String Name { get; set; }

    public string[] Arguments { set { _arguments = value; } }

    /// <summary>
    /// Execute this endpoint. We'll call the action with the supplied arguments and
    /// return whatever string the action returns.
    /// </summary>
    /// <returns></returns>
    public String Execute(EndPointActionArguments misc)
    {
        if (Action != null)
        {
            return Action(misc, _arguments);
        }
        return "Unknown action";
    }
}

An EndPoint has a delegate named Action for a function with a signature

string Foo(EndPointActionArguments arguments, params string[] items)

The Action would return a string that the web server will then write back out onto the target socket. We also pass an EndPointActionArguments to the delegate which contains a reference to the original socket request (outgoing to the client) and serves as encapsulation if we want to add more parameters to send through to the endpoint later. The last argument is a variable list of strings that relates to the REST url argument list.

An endpoint Description defines what the endpoint does; we’ll use this to describe the endpoint in a default API listing if the server gets a request it doesn’t know about.

UseManualSocket is a boolean that will indicate to the server that the endpoint handled the socket request manually (i.e. it held onto the request) and that the server shouldn’t close the socket; the endpoint will deal with socket cleanup.

Getting the endpoint to the server

Now that I’ve encapsulated action/state information into a single class, I wrapped Jasper’s original web server with a new facade. The facade will hide some of the internals of the server such as starting the web server, registering endpoints (from IEndPointProvider instances), and provides a single entry point for found commands. When we start the server we’ll pass along our registered endpoints with the actual server. If we wanted to do more endpoint manipulation later, we now have a centralized point of access before the endpoints get to the server.

Keeping with the original event dispatching mechanism, I moved the handling of the EndPointReceived event into the wrapper and out of the main program.

/// <summary>
/// Wrapper class on top of a multi threaded web server
/// Allows classes to register REST style endpoints
/// </summary>
public static class WebServerWrapper
{
    private static WebServer _server;
    private static ArrayList _endPoints;

    /// <summary>
    /// Register REST endpoint for callback invocation with the web server
    /// </summary>
    /// <param name="endPoints"></param>
    private static void RegisterEndPoints(ArrayList endPoints)
    {
        if(_endPoints == null)
        {
            _endPoints = new ArrayList();
        }

        foreach(var endPoint in endPoints)
        {
            _endPoints.Add(endPoint);
        }
    }

    public static void InitializeWebEndPoints(ArrayList items)
    {
        foreach (IEndPointProvider endpoint in items)
        {
            endpoint.Initialize();
            RegisterEndPoints(endpoint.AvailableEndPoints());
        }
    }

    /// <summary>
    /// Start listening on the port and enable any registered callbacks
    /// </summary>
    /// <param name="port"></param>
    /// <param name="enabledLedStatus"></param>
    public static void StartWebServer(int port = 80, bool enabledLedStatus = true)
    {
        _server = new WebServer(port, enabledLedStatus);

        _server.EndPointReceived += EndPointHandler;

        foreach (EndPoint endpoint in _endPoints)
        {
            _server.RegisterEndPoint(endpoint);
        }

        // Initialize the server.
        _server.Start();
    }

    /// <summary>
    /// We'll get an endpoint invocation from the web server
    /// so we can execute the endpoint action and response based on its supplied arguments
    /// in a separate thread, hence the event. we'll set the event return string
    /// so the web server can know how to respond back to the ui in a seperate thread
    /// </summary>
    /// <param name="source"></param>
    /// <param name="e"></param>
    private static void EndPointHandler(object source, EndPoinEventArgs e)
    {
        var misc = new EndPointActionArguments
                        {
                            Connection = e.Connection
                        };

        e.ReturnString = e.Command.Execute(misc);

        // we can override the manual use of the socket if we returned a value other than null
        if (e.ReturnString != null && e.Command.UsesManualSocket)
        {
            e.ManualSent = false;
        }
        else
        {
            e.ManualSent = e.Command.UsesManualSocket;
        }
    }
}

A few web server changes

Jaspers web server is simple and ingenious. I like it’s simplicity and it was easy to extend. When the web server receives a request, it parses the first line of a raw http GET from the header to figure out it’s “route”. As an example, here is a request I generated for http://localhost/function/arg1/arg2. Everything after the first line is discarded since we just care about the /function/arg1/arg2 part

GET /function/arg1/arg2 HTTP/1.1
Host: localhost
User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Encoding: gzip,deflate,sdch
Accept-Language: en-US,en;q=0.8
Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3
Cookie: ASP.NET_SessionId=ue1s3blzxxwbrrohasgwpbbv

Once it has the right request url from the header, the server will see if any registered endpoint Name property matches the request name. If it does it’ll parse the remaining arguments. This all happens in InterpretRequest. I didn’t change any of this logic. What I changed was what InterpretRequest returns and how the final command is dispatched. Here is the main server listening loop:

using (var server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
{
    server.Bind(new IPEndPoint(IPAddress.Any, Port));

    server.Listen(1);

    while (!_cancel)
    {
        var connection = server.Accept();

        if (connection.Poll(-1, SelectMode.SelectRead))
        {
            // Create buffer and receive raw bytes.
            var bytes = new byte[connection.Available];

            connection.Receive(bytes);

            // Convert to string, will include HTTP headers.
            var rawData = new string(Encoding.UTF8.GetChars(bytes));

            //====================================
            // My changes begin here
            //====================================
            EndPoint endPoint = InterpretRequest(rawData);

            if (endPoint != null)
            {
                if (_enableLedStatus)
                {
                    PingLed();
                }

                // dispatch the endpoint
                var e = new EndPoinEventArgs(endPoint, connection);

                if (EndPointReceived != null)
                {
                    ThreadUtil.SafeQueueWorkItem(() =>
                        {
                            EndPointReceived(null, e);

                            if (e.ManualSent)
                            {
                                // the client should close the socket
                            }
                            else
                            {
                                var response = e.ReturnString;

                                SendResponse(response, connection);
                            }
                        });
                }
            }
            else
            {
                // if we didn't match a response return with the generic API listing
                SendResponse(GetApiList(), connection);
            }
        }

    }
}

What I modified from the original server code was

  • InterpretRequest now returns an EndPoint with a string array of arguments. Previously, it looked for only the number of arguments that were registered to it. Now it parses as much as is there giving you a clean variable argument list.
  • Events are now dispatched in a custom threadpool, since .NET Micro doesn’t have any built in threadpooling. The threadpool is a collection of 3 threads that pull off an event queue and execute. This way the web server is asynchronous and won’t ever block for other requests. You could easily just have it fire off independent threads if you wanted to, but I found a threadpool to be more effective since you don’t need to spin up new threads (and allocate extra thread stack space) each time.
  • If a request comes in that doesn’t match any endpoint, we’ll print out all the available endpoints with their description. This is a nice API listing for you.
  • Event arguments contain a reference to the original socket if you need it.
  • If an endpoint is going to to manually write to the socket and close the socket later, it can set the UsesManualSocket flag on registration. The wrapper then tells the server that the executed endpoint manually sent data to the socket and is expected to close it. This can be useful if you want to maintain a persistent connection in your endpoint (maybe you are streaming something per client). By default the server will write out the string response from the endpoint and close the socket.
  • Optionally pulse the onboard LED whenever a request comes in. This is useful for debugging and viewing activity.
  • I updated the code that searched for endpoint name and compared it to url request to be case insensitive. Even though the w3c spec says comparing urls should be case sensitive (with some exceptions), by convention REST endpoints are case insensitive

Review

Lets take a look again at our main program block.

public static void Main()
{
    LcdWriter.Instance.Write("Web Demo Ready!" + DateTime.Now.TimeOfDay);

    WebServerWrapper.InitializeWebEndPoints(new ArrayList
                                                {
                                                    new BasicPage()
                                                });

    WebServerWrapper.StartWebServer();

    RunUtil.KeepRunning();
}

You can see that we’ve now decoupled public interaction with the web server, as well as allow each class to define whatever routes it wants. If we wanted to rename the route EchoArgs and have it point to another function, it’d be trivial to change that within BasicPage. If we wanted BasicPage to implement two functions such as EchoArgs and BlinkLEDABunch we could do that, all without having to update our main entrypoint.

Just to recap, the basic pattern here is:

Program Flow Diagram

  • First register all IEndPointProviders with the web server wrapper.
  • Then start web server.
  • When a request comes in, the server will find a matching endpoint by name and dispatch the EndPointReceived event which is caught by the wrapper.
  • The wrapper executes the target endpoint in a separate thread and returns the endpoints result.

From a users perspective, you just create your class, expose your endpoint, and everything works.

Demo

Firing up the app

Netduion output: starting app

Using curl to send some arguments

>curl http://192.168.2.11/echoargs/heyguys!/whatsup!
OK. Wrote out: heyguys! whatsup!

Results on the Netduino

Netduion output: endpoint executed

The API listing (this prints when no known route was found)

The API listing

The source

Full source and demo code available at our github. Note, the project is built against .net micro 4.2. I’ve run the code on .net micro 4.1 and 4.2 and everything worked fine. For reference, currently my Netduino is on firmware 4.2.0.0. RC3, though I’m not relying on any major framework specific choices here so it should continue to work fine for later revisions.

Async producer/consumer the easy way

This article was originally published at tech.blinemedical.com

In .net 4, a new class called BlockingCollection was introduced, which let you have a threadsafe producer/consumer queue. Anyone consuming a BlockingCollection blocks automatically until new items are added. This lets you easily add items to the collection in one thread and use another synchronized thread to consume items. This class is great since before this existed, you had to do all this work with mutexes and it was a lot of extra work (and more error prone). In general, a good time to use a decoupled producer consumer pattern is when you have a slow consuming function and a producer thread that is time sensitive.

Even though BlockingCollection effectively synchronizes your producer/consumer, you still have to create boilerplate to manage the producer thread and the consumer thread. Also if you wanted to add extra exception handling or a cancellation token, you’d have to add all that yourself too. I wrapped this all up in a BlockingCollectionWrapper class that handles all this for you.

An example

Here is an example where the consumer takes one second each time it consumes an item.

private readonly ManualResetEvent _testMutex = new ManualResetEvent(false);

[Test]
public void TestCollection()
{
    // create the wrapper
    var asyncCollection = new BlockingCollectionWrapper<string>();

    asyncCollection.FinishedEvent += FinishedEventHandler;

    // make sure we dispose of it. this will stop the internal thread
    using (asyncCollection)
    {
        // register a consuming action
        asyncCollection.QueueConsumingAction = (producedItem) =>
        {
            Thread.Sleep(TimeSpan.FromSeconds(1));
            Console.WriteLine(DateTime.Now + ": Consuming item: " + producedItem);
        };

        // start consuming
        asyncCollection.Start();

        // start producing
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine(DateTime.Now + ": Produced item " + i);
            asyncCollection.AddItem(i.ToString());
        }
    }

    // wait for the finished handler to pulse this
    _testMutex.WaitOne();

    Assert.True(asyncCollection.Finished);
}

private void FinishedEventHandler(object sender, BlockingCollectionEventArgs e)
{
    _testMutex.Set();
}

This prints out

9/17/2012 6:22:43 PM: Produced item 0
9/17/2012 6:22:43 PM: Produced item 1
9/17/2012 6:22:43 PM: Produced item 2
9/17/2012 6:22:43 PM: Produced item 3
9/17/2012 6:22:43 PM: Produced item 4
9/17/2012 6:22:43 PM: Produced item 5
9/17/2012 6:22:43 PM: Produced item 6
9/17/2012 6:22:43 PM: Produced item 7
9/17/2012 6:22:43 PM: Produced item 8
9/17/2012 6:22:43 PM: Produced item 9
9/17/2012 6:22:44 PM: Consuming item: 0
9/17/2012 6:22:45 PM: Consuming item: 1
9/17/2012 6:22:46 PM: Consuming item: 2
9/17/2012 6:22:47 PM: Consuming item: 3
9/17/2012 6:22:48 PM: Consuming item: 4
9/17/2012 6:22:49 PM: Consuming item: 5
9/17/2012 6:22:50 PM: Consuming item: 6
9/17/2012 6:22:51 PM: Consuming item: 7
9/17/2012 6:22:52 PM: Consuming item: 8
9/17/2012 6:22:53 PM: Consuming item: 9

First, I created the blocking collection wrapper and made sure to put it in a using block since it’s disposable (the thread waiting on the blocking collection will need to be cleaned up). Then I registered a function to be executed each time an item is consumed. Calling Start() begins consuming. Once I’m done – even after the using block disposes of the wrapper – the separate consumer thread could still be running (processing whatever is left), but it is no longer blocking on additions and will complete consuming any pending items.

The wrapper

When you call .Start() we start our independent consumer thread.

/// <summary>
/// Start the consumer
/// </summary>
public void Start()
{
    _cancellationTokenSource = new CancellationTokenSource();
    _thread = new Thread(QueueConsumer) {Name = "BlockingConsumer"};
    _thread.Start();
}

This is the queue consumer that runs in the separate thread that executes the registered consumer action. The consuming action is locked to make changing the consuming action threadsafe.

/// <summary>
/// The actual consumer queue that runs in a seperate thread
/// </summary>
private void QueueConsumer()
{
    try
    {
        // Block on _queue.GetConsumerEnumerable 
        // When an item is added to the _queue it will unblock and let us consume
        foreach (var item in _queue.GetConsumingEnumerable(_cancellationTokenSource.Token))
        {
            // get a synchronized snapshot of the action
            Action<T> consumerAction = QueueConsumingAction;
                
            // execute our registered consuming action
            if (consumerAction != null)
            {
                consumerAction(item);
            }
        }

        // dispose of the token source
        if (_cancellationTokenSource != null)
        {
            _cancellationTokenSource.Dispose();
        }

        //Log.Debug(this, "Done with queue consumer");

        Finished = true;

        if (FinishedEvent != null)
        {
            FinishedEvent(this, new BlockingCollectionEventArgs());
        }
    }
    catch(OperationCanceledException)
    {
        //Log.Debug(this, "Blocking collection<{0}> cancelled", typeof(T));
    }
    catch (Exception ex)
    {
        //Log.Error(this, ex, "Error consuming from queue of type {0}", typeof(T));
    }
}

And when the wrapper is disposed, we set CompleteAdding on the blocking collection which tells the collection to stop waiting for new additions and finish out whatever is left in the queue.

protected void Dispose(bool disposing)
{
    if(disposing)
    {
        if (_queue !=null && !_queue.IsAddingCompleted)
        {
            // mark the queue as complete
            // the BlockingConsumer thread will now
            // just process the remaining items
            _queue.CompleteAdding();
        }
    }
}

public void Dispose()
{
    Dispose(true);
}

The remaining properties and functions on the wrapper let you

  • Force abort the consumer thread
  • Register a Finished event handler; disposing of the wrapper doesn’t mean that no more work is being done. It means that you are no longer adding items and the queue is effectively “closed”. Depending on your consumer function though, this could take some time to complete. This is why it’s good to hook into the finished event so you can be sure that all your processing is complete.
  • Manually mark the queue as AddedComplete (so the thread stops blocking)
  • Manually cancel the queue
  • Check if the queue is ended by looking at the Finished property

So to reiterate, the basic idea here is

  • Create a separate thread that has appropriate exception handling to be blocked while consuming the queued items
  • Handle cancellation gracefully
  • Be able to properly end our spawned thread so we don’t have anything leftover

It should be noted that even though this wrapper is built for a single consumer/single producer design, since we are leveraging GetConsumingEnumerable we could modify the wrapper to allow for multiple threads acting as consumers on the same enumerable. This could give us a single producer/multiple synchronized consumer pattern where only one consumer thread gets the particular item but multiple consumer threads exist and can do work.

Full source and tests provided at our github.

Dropped packets with promiscuous raw sockets and winsock

This article was originally published at tech.blinemedical.com

Lately in my spare time, I’ve been working on a tool that will decode serialized AMF over a tcp connection. AMF stands for action message format and is used to serialize binary data to actionscript applications. The idea is to have the tool work the way Charles does for JSON/AMF over http/https, except over TCP sockets. I really like the way Charles works, and it’d be nice to not have to go to Wireshark and try and piece through binary data when I’m debugging.

So how would I do this? TCP sockets are connection oriented, you connect to some host and port and you only recieve and send data to that port. That’s great and all, but you can’t always inject yourself as a proxy in a connection; it’d be nice to be able to just sit in the middle of a conversation and observe without interfering. Thankfully you actually can do this by creating a raw promiscuous socket which captures all information regardless of port. This lets you inspect data like ip headers and tcp/udp/icmp/etc headers of all packets going through your network card (regardless if they are even for you!).

Raw sockets work the way they sound; they gives you the raw information including IP headers and other protocol headers (depending on which mode you set the socket in). Promiscuous mode tells your network card to not filter packets based on port or IP, just to give you everything. This way you can inspect all the packets going through your machine. For my project, a coworker suggested I use WinPcap but I didn’t want to create a hard dependency (you need to install a driver) to it for what I thought would be some basic c++ so I started off with just raw sockets.

Initially, this worked great. I was able to re-assemble fragmented tcp packets, inspect IP/TCP headers, correlate data by source/destination port, etc. pretty easily. But I noticed that sometimes when I was reassembling a large packet that was fragmented (greater than the MTU), I wouldn’t get all of the packets. I fired up Wireshark and compared my results. It was pretty clear that I was missing almost half of the remaining packets. When the data sent was less than the MTU, I got it just fine. Clearly something was being dropped, and I wasn’t the only one who had this problem.

Maybe I just wasn’t reading fast enough? I commented out all code other than directly reading off the socket and just printed the size of the packets I got. Even then, I still wasn’t matching up to what Wireshark had.

Here was a stripped down version of my basic capture code for reference, nothing fancy. Just reading off a socket created with SOCK_RAW

#define High4Bits(x)  ((x>>4) & 0x0F)

void Run()
{
	sockaddr_in socketDefinition;

	socketPtr = socket( AF_INET, SOCK_RAW, IPPROTO_IP   );

	BindSocketToIp(socketPtr, socketDefinition);

	CreatePromisciousSocket(socketPtr);

	while(true){
		int bytesRead = recv( socketPtr, packet, LS_MAX_PACKET_SIZE, 0 );

		if ( bytesRead->ver_ihl) != 4 ){
			delete packet;
			return;
		}

		ipHeaderSize = Low4Bits(ipHeader->ver_ihl);
		ipHeaderSize *= sizeof(DWORD);

		switch( ipHeader->protocol )
		{
		case 6: // TCP
			{
				char * tcpHeaderStart = &packet[ipHeaderSize];

				if(TargetPortFound(tcpHeaderStart)){

					printf("Got tcp/ip packet size %dn", bytesRead);
				}

				break;
			}
		}
	}
}

void CreatePromisciousSocket(SOCKET socketPtr){
	int optval = 1;
	DWORD dwLen = 0;

	if ( WSAIoctl( socketPtr,
		SIO_RCVALL,
		&optval,
		sizeof(optval),
		NULL,
		0,
		&dwLen,
		NULL,
		NULL ) == SOCKET_ERROR )

	{
		printf( "Error setting promiscious mode: WSAIoctl  = %ldn", WSAGetLastError() );
		throw "Error setting promsocous mode";
	}
}

void BindSocketToIp(SOCKET socketPtr, sockaddr_in socketDefinition){
	char localIp[20] = "192.168.1.2";

	socketDefinition.sin_family = AF_INET;

	socketDefinition.sin_addr.s_addr = inet_addr(localIp);

	if ( bind( socketPtr, (struct sockaddr *)&socketDefinition, sizeof(socketDefinition) ) == SOCKET_ERROR )
	{
		printf( "Error: bind = %ldn", WSAGetLastError() );
		throw "Error binding";
	}
}

bool TargetPortFound(char *packet, int targetPort)
{
	TCPHEADER *tcp_header = (TCPHEADER *)packet;

	if(htons(tcp_header->source_port) == targetPort){
		return true;
	}

	return false;
}

The side by side comparison of filtering on port 21935 was:

Bear with me while I explain what we’re looking at here. This is a capture of a 50k AMF response simultaneously with Wireshark (on the right) and my program using raw sockets (on the left). Both sides should have had about 40 packets come through, but you can see that my program on the left is missing a bunch of the larger packets compared to Wireshark on the right. After the 40 packets came through a small 38 byte AMF message was sent and you can see that both my program and Wireshark got the packet. Somehow a bunch of packets went missing for me! Don’t be confused by the different numbers. On the left hand side (my program using raw sockets), the size includes IP header, TCP header, and payload length. On the right, in Wireshark, I’ve highlighted JUST the data payload. So the highlighted area on the right is 40 bytes LESS than the highlighted area on the left (IP header is 20 bytes, and TCP header is 20 bytes). So if you see 78 on the left, that’s really a 38 byte payload plus 40 bytes of header. This can make things a little confusing, but it all matches up.

TangentSoft’s advanced winsock FAQ tipped me off to the actual problem:

Most other common desktop operating systems have some way to ask the kernel to do some of the filtering for you. Not so with SIO_RCVALL. You want this, because your program is probably interested in only some packets, so you have to filter out the ones you aren’t interested in. At gigabit speeds, it can take a surprising amount of CPU power to do this. You might not be able to do it fast enough to prevent the kernel from running out of buffer space, forcing it to drop packets. Doing at least some of the filtering in the kernel can make this practical, since it saves a kernel to user space context switch for each filtered packet.

It turns out the kernel couldn’t buffer enough information for me. The raw socket was giving me all the packets going through my NIC, not just the ports that I wanted and each packet that I got required a kernel to user space context switch. Just watch Wireshark with just a tcp filter and see how much traffic is going through, it’s more than you think.

The socket’s default buffer size is only 8k so if I am making a single call that is sending 50k of AMF then that can easily bump out other packets and also get bumped out itself! While I could’ve increased the buffer size, the documentation says it’s only available on Windows Vista and later, and only some protocols (like TCP) support it. If I ever wanted to expand my tool to use other protocols then I could face this issue again. On top of that, in researching about the socket buffer size I found this quote from an old windows socket’s programming book:

You can (and should) avoid dependence on some optional features by redesigning your application. For example, you shouldn’t require a specific amount of receive buffer space for your application to function. This doesn’t require WinSocks to support the SO_RCVBUF socket option, so you may not be able to specify the system buffer space you get.

Through the course of my research I’d uncovered a whole slew of negative reasons to use raw sockets other than the dropped packets. Raw sockets aren’t supported on all Windows versions (like Windows XP) and you had to be an admin to run an application using raw sockets. This means distribution of this app or using it on a client to debug could be problematic.

At this point I was frustrated enough to scrap my raw sockets idea and switch to WinPcap. WinPcap works lower in the networking stack than a raw socket does. Raw sockets work at level 3 (network layer) but WinPcap and its associated driver sit at level 2, the data link layer. Just as a reminder, here is the ubiqutous “TCP/IP Stack” (image taken from tcpipguide.com)

The real power of WinPcap is the kernel-level filtering it can do based on filter text you pass it, alleviating you costly context switches. Remember that Wireshark filter you always put in? This is what it does.

Once I switched over to WinPcap instead of just pure raw sockets I started getting all my data without having to increase any buffer sizes. Thankfully the WinPcap examples are well documented, and it’s a pretty close drop-in for raw sockets anyways, so the amount of work to switch over was pretty minimal.

Here are the new side by side screenshots of my capture application vs Wireshark. This time, all the packets are there. The data sizes I’ve highlighted match up this time because with WinPcap I’m actually getting the ethernet frame header, the ip header, AND the tcp header. This matches with the “length” field in Wireshark.

After I had done all this socket research, and already changed my code to use WinPCap, I went back and increased the buffer size as a test and I was able to finally get all the packets I wanted. In the end all I really needed was the following snippet after I had bound my socket.

int bufferLength;
int bufferLengthPtrSize = sizeof(int);

getsockopt(socketPtr, SOL_SOCKET, SO_RCVBUF, (char *)&bufferLength, &bufferLengthPtrSize);

printf("default socket buffer bytes %dn", bufferLength);

int buffsize = 50000;

setsockopt(socketPtr, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(buffsize));

getsockopt(socketPtr, SOL_SOCKET, SO_RCVBUF, (char *)&bufferLength, &bufferLengthPtrSize);

printf("updated socket buffer bytes %dn", bufferLength);

This prints out:

default socket buffer bytes 8192
updated socket buffer bytes 50000

Though while this did work, I’m glad I went with WinPCap since depending on socket throughput I might still have run into issues.

Run with real data

This article was originally published at tech.blinemedical.com

More often than not the test data in development environments is full of garbage. Most applications have a baseline set of test data, just enough to get the database to function. As engineers develop they tend to make heavy use of “asdf” and “dfkkfklkasdflsaf,” combined with a liberal sprinkling of lorem ipsum and other nonsense, to populate new data. This is fine to get started, since frequently as you develop you need to wipe the dataset clean and start over but this false data gives an incorrect view of the applications interface and performance. No client is going to have 5 “asdf” fields as their data. Instead, visible fields get stressed and assumptions about how your application handles data are challenged. You may not have expected a particular combobox to display a 200 character item, but that’s what the client needs. Maybe you didn’t expect a certain list page to grow to 20,000 items, so you never paginated or set default filters. But, that’s the data the client created, and you need to account for it.

An application that is sleek and zippy at first, can become a monstrous behemoth when faced with data sets it doesn’t expect to handle. It can freeze or crash, but even worse is the “slow death.” The app loses speed slowly, over time, and becomes frustrating or annoying to use.

There are two sides to the story here; testing with empty data sets, and testing with real world large data sets (if you can get a hold of them).

Clean Data Sets

Clean data sets expose a specific set of problems relating to the client’s first application experience. It should have the bare minimum of what a client will see when they first install, or start using your application. As you’re developing, it’s easy to forget what things are like for a client. You work in a world of test data, but the clean data set can expose all sorts of small things: something didn’t align properly, some box isn’t populated with a “None” entry, all that empty space looks goofy and should be dynamically sized, or countless other minor details. These kinds of small bugs make for a bad user experience. Clean data sets expose the way your app works in the absence of data, and that’s important.

The absence of user data also can expose the kind of default data you should be shipping with your app. Is your usage that every time you load up the app with a clean data set, you create item x,  y, and z? Are these common items? Will a client appreciate these things being pre-populated? If so, you should include them. Casual users appreciate default values, since it can get them up and running quickly without the need for boilerplate. Maybe you should offer batch input functionality, so the client can quickly go from zero to usable. Working with a constant set of test data will never reveal these things, you only notice them when you start fresh.

Clean data can also mean a clean install. If you are working in your development environment, or even some testing environment, make sure to fully wipe the target test machines. Go so far as reinstalling the operating system, and start from scratch. Is there some install step that you have to set a registry key by hand? What about setting permissions on a folder for some service account? You would’ve long since forgotten what you did, since incremental deployments don’t have to do those steps. Starting fresh every so often is always a good idea.

Large Data Sets

While the clean data set matters for new users, what matters for keeping users in your app is testing the large data set. Maybe there is a power user who is using your app more than an average user, and generating tons of data. You need to account for this. Do you have automation functionality that can that be leveraged to create lots of data? If so, you should certainly be developing and testing your application with that same data. This is where you’ll really notice big performance problems. Service calls can grind to a halt, display pages take a long time to render, race conditions are exposed, bottlenecks uncovered, etc. There are numerous tools available on the internet to help generate realistic data. This one, for example, creates an identity complete with credit card, social security number, birthdate, height, etc. and lets you order in bulk for free (up to 50,000 users).

It’s one thing to use a large data set of test data you generate, but it’s another to get a hold of real client data. A client can slowly generate data over years of use, and it’s extremely difficult to mimic that much real data in a controlled QA environment. When you use real client data, you can almost immediately find aggravation points and quickly address them. Does the app take longer than normal to load? Maybe the user thinks that’s normal, but you know it’s not. Do visual elements still work properly with the client data set? Do things need to be tweaked so you can see the data better, and the client can more easily do their work? The client may never have reported an issue but just because they never reported it doesn’t mean it’s not there.

Client data also can have missing data in areas you wouldn’t expect. Test data often fills in all the blanks, but clients could be focusing heavily in an area that wasn’t originally designed to be used that heavily, or vice versa, they aren’t using an area designed for heavy load. These are details you only find when running “as the client does“.

Client data and privacy

When using client data, you can come across sensitive personal information like credit card numbers, addresses, and health records. You should take great care to obfuscate this data before ever using it. At the same time, you should strive to preserve data integrity, since completely obfuscated data can be meaningless or invalid. There’s a balancing act here, but in the end it’s more important to respect the privacy of clients. There are a few ways to do this, and all of these things can be automated.

  • Full masking. Here, you would replace all words and numbers with other random words and numbers. You want to preserve capitalization, punctuation, and word length, but you can replace words with garbage. This gives you an indication of length, format, and usage. Don’t just replace them randomly, though. If you find a word, create a replacement for it and keep track of it. If you see the same word again, use the same replacement. This can tell you word frequency, and if something is being continually re-iterated by the client. These patterns can also help identify areas to automate client actions.
  • Partial masking. With partial masking, you can just do sensitive areas such as usernames, addresses, phone numbers, statistics (such as test scores or health records) and any other kind of personal identifying information. Doing partial masking maintains data context, from which you can infer client intentions.
  • Adjust dates. Instead of using the actual date, offset all date groupings by a random time. By doing this you can maintain date relationships (i.e. if A and B are related and happened at 10 minutes apart, you will maintain that relationship) but you don’t need to know what was the original A and B. Offsetting all related groups gives you meaningful, but at the same time obfuscated, dates.
  • Network addresses. This is an easy one to overlook. If you store network addresses anywhere, you should change them to point to known local machines, or make them invalid. If your client is open via publicly accessible routes, or even through an internal provided VPN, you don’t want your development or test machines to accidentally contact their computers and apply edits.
  • Encryption. If possible, encrypt your client data when you store it. If your machines get compromised you don’t want to have accidentally also compromised your clients data.

Application under load

If you use a large data set you should simulate client load scenarios. The application may function wonderfully when only one or two people use it, assuming a shared distributed app. What happens when 200 people hammer on it at once? What about 2000? Is it possible for a client to have a virus scanner running? Is it thrashing the disk? Can we make optimizations that help these scenarios? Can we make these optimizations configurable? I frequently find myself writing test-apps that spawn multiple threads, and do actions at some insane interval (like every 10ms). This way, you can test areas of the application thoroughly for performance and reliability of your system.

I think it’s also worthwhile to use your application, like a user would, when you are stress testing it. Develop against it! You’ll find what annoys you, what doesn’t work, what works well, etc. Bullet proofing your app as best as possible against these scenarios is what is going to make a client love using your program.

Improvements

I’ve found that there are a few quick places you can always look to find improvements

  • Data over the wire. Always check data transport costs. Whether this is inter process communication, or client to server, it doesn’t matter. Sending data isn’t cheap and you should minimize what you send. Check the cost of serialization, remove extraneous data, sever object cycles, ideally map things to a DTO.
  • Add a facade. Sometimes things just aren’t designed to handle the data from the get go. An easy way to separate logical concerns is to put a facade in front of problem areas. This is mostly useful for storage classes and service calls. Having a facade that can translate your storage calls into view specific DTOs keeps your storage logic separate from your view logic. It also acts as a programmatic firewall. You can work behind the facade to fix underlying issues. As long as the front of the facade still works, then you’ve segmented the areas.
    • It should be noted that there is a limit to things you want to put a facade over. Don’t use a facade to sweep problems under the rug. It should be used to give you better decoupling and breathing room to work.
  • Side effects. Is there something that is hanging around after some action? Are file’s not cleaned up? Is the disk fragmented? Are sockets sitting in CLOSED_WAIT and not properly getting disposed of? Sometimes you won’t notice these things on the small scale, but in a larger scale they can become major issues.
  • Run a profiler. For managed (C#) applications, a profiler is a no-brainer. Find that random linq statement that is getting re-evaluated in a loop. You’d be surprised at how many easy wins you can find when you profile your code.
  • Think about caching. I’m not suggesting everything should be cached, or that it’s always an appropriate solution. Frequently, though, slow-downs can be related to pulling more data than you need, or more often than is necessary. I once found a bug where we were marshaling 4MB of data from unmanaged to managed code every 50ms. This was a huge bottleneck, but we only found it when we simulated heavy user load. A simple cache on the data, that was invalidated when the source data was changed, let us scale to 10 times the load with minimal effort and no major code changes.
  • Think about bottlenecks. Is something CPU bound or IO bound? If it’s IO bound, does it have to be? Can you do it in chunks? Can it be asynchronous? If it’s CPU bound, can it be batched? Does it have to happen now? Can it be distributed? Run tools like perfmon and the sysinternals suite to see what things are actually doing. Maybe you don’t realize how often you are hitting the disk. If you avoided opening the file 50 times, and just read it all at once, things would go faster. Maybe use a memory map and dump it to disk at intervals. Is there a database call in a loop? Change the SQL to pull back more at once, instead of little by little. Do you have a for loop that you’re hitting frequently? Maybe translate it to a dictionary, and use it as a lookup instead. Small changes that are run frequently can add up.
  • Ordering. Sometimes, all you need to do is change the order in which something happens. Is something that is taking a long time, blocking something that takes a short time? Invert the sequence. Have the fast thing happen first, then the later ones. This gives the user an impression that something is happening, and not just broken.
  • Optimize the 90% case. If something happens a lot, try and optimize it. This can cut down logarithmically on perceived performance.
  • Progress. If you can’t speed something up, give an indication of progress. People are less likely to get frustrated if they know things are actually happening, and not just sitting there
  • Cancellation. If something takes a long time, give the user the option to cancel it. Maybe they didn’t mean to hit that button that kicks off 20 minutes of work. They should be able to end the sequence, if they want.

There are obviously lots of other things you can do. In the end, you should remember the following: when you get pissed off at your tools for being slow, non-responsive, or difficult to use, think about how your application is the client’s tool. Save them the grief.

Inter process locking

This article was originally published at tech.blinemedical.com

Locking in a single-process multi-threaded application is important enough to understand, but locking in a multi-process application takes on a new level of complexity. Locking makes sure that only one execution unit ever accesses a critical section. This is just fancy way of saying everyone can’t access the same resource at the same time; the critical section is the code path that is synchronized.

Inter process locking

There are resources that can be accessed outside of the logical address space of a process, such as files, and these are available to all processes. If you are writing a multiple process application, and are sharing these resources, you should synchronize them. For these situations, you should use a named mutex. A named mutex registers a global handle in the operating system that any process can request and use.

By giving a mutex a name, anyone can access it via its name. This is cool, but it’s trickier than standard intra process locking (like using the lock statement on a reference object). If the mutex isn’t properly handled, you can easily corrupt other programs that are expecting this mutex to function. So now instead of just crashing (or deadlocking) your program you can crash a bunch of others! You have to really take care and understand the locking mechanism to get this right.

Though most of the complexity in locking is abstracted away you can still run into issues if you don’t handle your locks properly in .NET. We always look to encapsulate reusable, and especially complex, functionality into wrapper classes or utility functions so I created the InterProcessMutex class that handles the major pitfalls of named mutexes:

  • Permissions. One process can create a mutex that another process doesn’t have access to.
  • Abandoned mutexes. If a process or thread holds a mutex but doesn’t release it and then exits, it will count as an abandoned mutex
  • Initial ownership. It can be somewhat confusing as to who owns the mutex initially. The wrapper makes sure that nobody initially owns the mutex, so its open for the taking by anyone

All you have to do now is use the InterProcessMutex in a using block and it clearly indicates the critical section. Any process can instantiate the same lock and the wrapper takes care of the rest. Take a look at our github for full source and unit tests.

using (new InterProcessMutexLock(mutexName))
{
     // critical section
}

Beyond using locking mechanisms built into the framework and helpful wrapper classes, it’s also important to understand exactly how locking works (both intra- and inter-process locks). Since it’s good to know what the magic behind the scenes is doing, we’ll first go over a general definition of locking, and then delve into a couple different types of locks and how they are implemented. For anyone interested, Operating System Concepts is a great book and I recommend you read it if you are curious about operating system algorithms. It’s a fun read and has great easy to digest explanations with examples.

Locking overview

Locking is a general term describing the solution to the critical section problem. The solution has to satisfy three conditions. I’m going to use the term execution unit to describe both process and threads.

  • Mutual exclusion. If an execution unit is in a critical section, then no other execution unit can be in its critical section
  • Progress. Execution units outside of their critical section can’t block other execution units. If more than one execution unit wants to enter the critical section simultaneously, then there is some deterministic outcome of who can enter (i.e. nobody waits forever, someone gets to choose)
  • Bounded waiting. Anyone waiting on a critical section will eventually be able to get in

Locking by disabling preemption

In old style kernels, on single processor machines, the original way of doing critical sections was to disable preemptive interrupts. Processors can dispatch interrupts that the kernel can catch, block any currently executing processes, and execute some unit of work before returning processes back to what they were doing. Basically, it’s a “hey, stop what you are doing, do this other thing, then go back to what you are doing” kind of thing. When a critical section was going to be reached, the kernel paused all the interrupts. When the critical section was done it resumed them. This kind of sucks, though, because it would stop everything (like your clock) from getting updated. On multi-processor systems, which is most modern day computers, this isn’t even a feasible solution since you really don’t want to stop all interrupts from happening on all cores.

Spin locks

Spin locking is a type of busy wait lock and is used by the kernel internally when there isn’t going to be much contention. While it wastes CPU cycles, it saves on overhead in context switching and process rescheduling. It can also be implemented in a single space, user or kernel, so you save on space switching overhead. The downside is that if the spinlock is held for a long duration, it will be pretty wasteful. Just try putting in an empty while(true); in your code to see!

In a spinlock, the execution unit continually tests a condition to see if its true. If false, it continues with the critical section. If true, it then just keeps testing. In most modern architectures there are instructions that let us test and toggle a variable atomically (in one CPU instruction) which we can leverage to write a spinlock. There are two ways of doing this:

  • Test and Set. This sets the value of the passed in address to a new value, but returns the original value of the address. i.e. If you pass in &lock that is set to 0, it will set lock to 1 and return 0.
  • Compare and Swap. This is basically like test-and-set but only toggles the value if the testing address is the same as the input. Compare and swap is a more general version of test and set and is used in modern day architectures. We’ll trace through this later in the post

Test and Set

Test and set is an atomic function that generally looks like this (the function is atomic when it’s executed at the processor level, not in c pseudocode)

bool testAndSet(int * lock){
   int previousLockValue = *lock;
   *lock = 1;
   return previousLockValue == 1;
}

And can be used to spinlock a critical section like below

int lock = 0;

void synchroFunction(){
    // check if we can aquire the lock
    // spin wait here.

    while (testAndSet(&lock)){
       // do nothing
    }

    // critical section

    // make sure to release the lock
    // the next testAndSet will return false, reset the lock
    // and exit the spinlock
    lock = 0;
}

Following the example, if it’s not locked yet (initial lock is false), then the first execution unit acquires the lock and sets the lock to true. It also bails out of the while loop to execute its critical section, since it returned false from the testAndSet function (nobody held the lock). At this point it has the lock, and continues to have the lock, until it later sets the lock to false (which is usually an atomic function as well).

Compare and Swap

In the 1970’s, compare-and-swap replaced test-and-set for most architectures and is still used today for lock-free algorithms as well as lock handling. It looks something like this (again remember this example is not atomic code, this is only atomic when this instruction is implemented in the cpu):

compare_and_swap(int *addr, int currentValue, int newVal){
 	int addressValue = *addr;
 	if(addressValue == currentValue){
 		*addressValue = newVal;
 	}
 	return addressValue;
}

Compare and swap takes the address of an item storing the lock as well as a captured snapshot of whatever lock value an execution unit has and the expected new value. It only updates the lock reference if the captured value is equal to the value in the address.

int lock = 0;

const int LOCKED = 1;

void synchroFunction(){

    // check if we can aquire the lock
    // spin wait here.

    while (compare_and_swap(&lock, lock, LOCKED)){
       // do nothing
    }

    // critical section

    // make sure to release the lock
    toggleLock(&lock);
}

Lets trace it, remembering that the lock address only gets set if the passed in lock argument is the same as the address. If the initial value of lock = 0, the trace looks like this. Lets pretend the address of lock is 0xABC
[table]
PROCESS,ARGUMENTS, *(0xABC),RETURNS,END RESULT
ProcessA,(0xABC 0 1), 1, 0, aquired lock
ProcessB ,(0xABC 0 1), 1, 1, spins
[/table]

(*(0xABC) is the value at address 0xABC)

Process A does the compare and swap, passing in what it thinks the value of the current lock is (0). At the same time, Process B executes compare and swap, also passing in the value of 0 for the lock. But only one of them gets to execute the instruction, since the instruction is atomic. Assuming Process A executed first, it sets the value at the address of lock (0xABC) to 1 and returns 0 (the original lock value). This means it acquired the lock and exits its spinlock, since 0 was returned. Then Process B executes its compare-and-swap and finds that the value at address lock (0xABC) is already 1, but it passed it the original value of 0, so it does NOT get to acquire the lock and returns the current value of the lock (1). It keeps spinwaiting.

In C# a compare-and-set equivalent is the Interlocked.CompareExchanged function.

Spin locks without atomic instructions

On processors that didn’t have atomic swap functions, spin locks were implemented using petersons algorithm. The idea here is you have an array keeping track of which process is ready to enter its critical section and a variable that is tracking who is actually in the critical section. Each execution unit only writes to its index in the array, so no contention here, and they all share the tracking variable. Eventually someone “grabs” the lock by both being ready and setting the tracker variable (by being the last to write to it). Here is a rough approximation of what that looks like. ProcessId is the current process.

In a two process example it looks like this.

    // ready to be in the critical section
    readyArray[currentProcessId] = true;

    // let anyone else get into the critical section
    turndId = otherProcessId;

    while (readyArray[currentProcessId] == true && turndId == otherProcessId)
    {
        // busy wait
    }

    // critical section

    ...

    // end of critical section. we're no longer ready to be in the section anymore
    readyArray[currentProcessId] = false;

When a process who is ready to get into the critical section marks that its ready. The next variable turnId is the source of the contention. Someone is going to set it, but both won’t be able to set it. Whichever write actually succeeds blocks the other process forcing it to go into a spinlock. When the acquired process is done, it’ll toggle its readyArray value and the waiting process breaks out of its busy wait and executes.

Mutexes

Mutexes accomplish the same goals as spinlocks, but differ in that they are an operating system provided abstraction that tells the OS to put a thread to sleep, instead of busy wait. With a mutex, threads/processes wait on a certain memory address using a wait queue. When the value at that address is changed, the OS wakes up all the waiting execution units and they can attempt to re-acquire a lock. They’re more complicated to write, and I won’t go into them. For more info read up on futexes in linux which are a good explanation of how to build mutexes.

Locks in C#

Finally, we can briefly touch on the lock keyword. C# uses a monitor, which is basically a combination of kernel space mutexes and user space spinlocking to implement the lock keyword. Internally, it uses the compare-and-swap atomic instruction to first try and aquire the lock, using a spinwait lock. If a thread sits in a spinwait for too long, then it can be switched over to use a mutex. This way it tries to gracefully level the playing field: fast locking if the lock isn’t contended, but less cpu cycles if its going to wait too long in a spinlock.

More information

For more reading check

A collection of simple AS3 string helpers

This article was originally published at tech.blinemedical.com

We all know that it’s smart to create helper and utility classes when faced with a problem that can be encapsulated, but sometimes we forget that even the smallest of things can be put into a helper. Whenever you find yourself writing something more than once you should think about encapsulating that logic. Even small logical elements should be moved to a separate function. It helps with readability, maintainability, and a separation of logic. It also makes things easier to test. Here are a couple of ActionScript string utilities we use. We have tons of them and I’ll be posting snippets here and there of ones we find useful.

Check if a list is empty.

I use this one everywhere! It seems silly, but this may be the most used helper function in our entire application.

public static function isEmpty(list:IList):Boolean {
    return list == null || list.length == 0;
}

Flatten a list into a delimited string

It’s handy to be able to say given a list of objects, print out a comma (or delimiter) seperated string representing that list. This function takes a list, a function that formats each item, and an optional delimiter. An example usage is:

var foldedString:String = foldToDelimitedList(listOfUsers,
	function(item:Object):String{
		return (item as UserData).userName;
	});

Which would give you something like “user1, user2, user3”.

public static function foldToDelimitedList(vals:ArrayCollection, formatter:Function, delim:String = ", "):String{
	var retString:String = "";
	var count:int = 0;
	for each(var item:Object in vals){
		retString += formatter(item);

		count++;

		if(count < vals.length){
			retString += delim;
		}
	}
	return retString;
}

Find an item in a list

This one is handy when you want to know if something is in a list based on a certain property. If it finds the item it will return to you the index it found. You use it like this:

var index:int = findItem(list, "someProperty", "expectedPropertyValue");

For an element whose property someProperty matches the value expectedPropertyValue, it will return the first found index.

public static function findItem(dataProvider:Object, propName:String, value:Object, useLowerCase:Boolean = false):int {

	if (value == null) {
		return -1;
	}

	var max:int = dataProvider.length;
	if (useLowerCase) {
		value = value.toString().toLocaleLowerCase();
	}
	for(var i:int=0; i<max; i++) {
		var item:Object = dataProvider[i];

		if (item == null) {
			continue;
		}
		var loopValue:Object = item[propName];
		if (loopValue == null) {
			continue;
		}

		if (loopValue == value || (useLowerCase && loopValue.toString().toLocaleLowerCase() == value)) {
			return i;
		}
	}
	return -1;
}
Handle reconnections to signalR host

This article was originally published at tech.blinemedical.com

SignalR does a great job of dealing with reconnecting to a host when either the client disconnects or the server disconnects. This is pretty handy since it handles all the intricacies of a persistent http connection for you. But what it doesn’t deal with is the initial negotiation to a server. If that fails you are stuck retrying yourself. I wrote a simple reconnection function that leverages the scheduling functionality of Rx to continuously try to reconnect to the server.

For our SignalR usage (version 0.5.2), I’m using the exposed Hub functionality, not the persistent connections since I liked the encapsulation that Hub’s gave us. In the following example, we have a local member variable called Connection which is a HubConnection type created with this code.

HubConnection Connection = new HubConnection(Url);

HubConnection has a Start method that you use to initialize connections to the Url.  Connection.Start() internally creates an asynchronous task that looks like this, after unwrapping the nicely packaged methods:

private Task Negotiate(IClientTransport transport)
{
    var negotiateTcs = new TaskCompletionSource<object>();

    transport.Negotiate(this).Then(negotiationResponse =>
    {
        VerifyProtocolVersion(negotiationResponse.ProtocolVersion);

        ConnectionId = negotiationResponse.ConnectionId;

        var data = OnSending();
        StartTransport(data).ContinueWith(negotiateTcs);
    })
    .ContinueWithNotComplete(negotiateTcs);

    var tcs = new TaskCompletionSource<object>();
    negotiateTcs.Task.ContinueWith(task =>
    {
        try
        {
            // If there's any errors starting then Stop the connection
            if (task.IsFaulted)
            {
                Stop();
                tcs.SetException(task.Exception);
            }
            else if (task.IsCanceled)
            {
                Stop();
                tcs.SetCanceled();
            }
            else
            {
                tcs.SetResult(null);
            }
        }
        catch (Exception ex)
        {
            tcs.SetException(ex);
        }
    },
    TaskContinuationOptions.ExecuteSynchronously);

    return tcs.Task;
}

The comment above the IsFaulted check says that if the server fails to connect, an exception is set and the transport is closed. Since SignalR utilizes the task parallel library we can just call Start() again and get a new task.

Here is the snippet we use to continuously reconnect:

/// <summary>
/// Handles if the connection start task fails and retries every 5 seconds until
/// it succeeds
/// </summary>
/// <param name="startTask"></param>
/// <param name="connectionSucessAction"></param>
private void HandleConnectionStart(Task startTask)
{
    startTask.ContinueWith(task =>
    {
        try
        {
            if (task.IsFaulted)
            {
                // make sure to observe the exception or we can get an aggregate exception
                foreach (var e in task.Exception.Flatten().InnerExceptions)
                {
                    Log.WarnOnce(this, "Observed exception trying to handle connection start: " + e.Message);
                }

                Log.WarnOnce(this, "Unable to connect to url {0}, retrying every 5 seconds", Url);
                RetryConnectionStart();
            }
            else
            {
                // do success actions
            }
        }
        catch(Exception ex)
        {
            Log.ErrorOnce(this, "Error handling connection start, retrying", ex);
            RetryConnectionStartRescheduler();
        }
    });
}

private void RetryConnectionStartRescheduler()
{
    ThreadUtil.ScheduleToThreadPool(TimeSpan.FromSeconds(5),
        () =>
        {
            try
            {
                HandleConnectionStart(Connection.Start());
            }
            catch(Exception ex)
            {
                Log.ErrorOnce(this, "Error retrying connection start, retrying", ex);
                RetryConnectionStartRescheduler();
            }
        });

}

ThreadUtil.ScheduleToThreadPool is a wrapper we have on top of the Rx framework’s threadpool scheduler. Internally it looks like this

public static void ScheduleToThreadPool(TimeSpan executeTime, Action action)
{
    Scheduler.ThreadPool.Schedule(DateTime.Now.Add(executeTime), action);
}

It’s important to note that you have to touch the exception object of a faulted task or use the exception Handle method in order to avoid an UnobservedTaskExceptions. Those happen to unobserved exceptions which are then rethrown on the finalizer thread.

In conclusion, by leveraging tasks and a couple simple scheduling utilities, we can cleanly and asynchronously schedule a new task to connect periodically. When we finally connect we can continue with our initialization logic. At this point the remaining signalR reconnection logic is handled by the hub.