Category: Uncategorized

Tracking batch queue fanouts

When working in any resilient distributed system invariably queues come into play. You fire events to be handled into a queue, and you can horizontally scale workers out to churn through events.

One thing though that is difficult to do is to answer a question of when is a batch of events finished? This is a common scenario when you have a single event causing a fan-out of other events. Imagine you have an event called ProcessCatalog and a catalog may have N catalog items. The first handler for ProcessCatalog may pull some data and fire N events for catalog items. You may want to know when all catalog items are processed by downstream listeners for the particular event.

It may seem easy though right? Just wait for the last item to come through. Ah, but distributed queues are loosely ordered. If an event fails and retries what used to be the last event is no longer the last.

What about having a counter somewhere? Well the same issue arises on event failure. If an event decrements the counter twice (because it was retried) now the counter is no longer deterministic.

To do this thoroughly you need to track every individual item and ack it as its processed. That would work but it could be really costly. Imagine tracking 1B items some data store for every batch!

Lets pretend we go this route though, what else do we need to do? We need the concept of a batch, and items to track in the batch. If we have a workflow of opening a batch (creating a new one), adding items to it, then sealing the batch (no more items can be added to it) we can safely determine if the batch is empty. Without the concept of a close/seal you can have situations where you open a batch, fire off N events, N events are consumed ack’d by downstream systems, then you close the batch. When all N are ack’d by downstream systems they can’t know that there are no more incoming items since the batch is still open. Only when the batch is closed can you tell if the batch has been fully consumed. To that note, both the producer AND consumer need to check if the batch is closed. In the previous example, if all N items are ack’d before the batch is closed, when you go to close the batch it needs to return back that the batch is empty! In the other situation if the batch is closed, then all N items are ack’d the last item to be ack’d needs to return that the batch is empty.

Back to the problem of storing and transferring N batch item ID’s though. What if instead of storing each item you leveraged a bitfield representing a set of items? Now instead of N items you only need N bits to logically track every item. But you also may now need to send N logical ID’s back to the client. We can also get around that by knowing that anytime you add a batch of items to a batch, for example, adding 1000 items to batch id 1, that this sub group batch can be inserted as a unique hash corresponding to a bitfield set to all 1’s with 1000 bits (any extra bits set to 0 and ignored).

Returning to the client all you need to send back is the hash and how many items are related to that hash. Now determistic id’s can be created client side that are of the form batchId:hash:index. When a client goes back to ack a message (or batch of messages) the ID contains enough information to

  1. Locate all other hashes related to the batch
  2. Get the bitfield for the ash
  3. Flip the appropriate bit represented by the index

Something maybe like this

case class BatchItemGroupId(value: UUID) extends UuidValue

object BatchItemId {
  def apply(batchId: BatchId, batchItemGroupId: BatchItemGroupId, index: Long): BatchItemId = BatchItemId(s"${batchId.value}:$batchItemGroupId:$index")

  def generate(batchId: BatchId, batchItemGroupId: BatchItemGroupId, numberOfItems: Int): Iterable[BatchItemId] = {
    Stream.from(0, step = 1).take(numberOfItems).map(idx => apply(batchId, batchItemGroupId, idx))
  }

  def generate(batchId: BatchId, batchItemGroupId: List[BatchItemGroupInsert]): Iterable[BatchItemId] = {
    batchItemGroupId.toStream.flatMap(group => generate(batchId, group.id, group.upto))
  }
}

case class BatchItemId(value: String) extends StringValue {
  val (batchId, batchItemGroupId, index) = value.split(':').toList match {
    case batch::hash::index::_ => (BatchId(batch.toLong), BatchItemGroupId(UUID.fromString(hash)), index.toLong)
    case _ => throw new RuntimeException(s"Invalid batch item id format $value")
  }
}

case class BatchId(value: Long) extends LongValue

We also need an abstraction on top of a byte array that lets us toggle bits in it. It also lets you count how many bits are set. We’ll need to know that so we can answer the question of “is this subgroup hash empty”. I’ve shown bitmasking but I can show it again at this gist.

Now that we have that, all our sql needs to do is given a subgroup batch, pull the bitfield, put the bitfield into a BitGroup, flip the appropriate bits, write the bitfield back. Then it needs to query all batch groups for a batch, pull their blobs, and count their bits to determine if the batch is pending or complete.

If we’re smart about it and limit the bitfield in MySQL to be large enough to get compression, but small enough to minimize over the wire overhead… say 1000 bytes =~ 10k (which works out to be about 8000 bits) we can store quit a bit with quite a little!

Unfortunately MySQL doesn’t support bitwise operations on blobs (not till MySQL 8 apparently) so you need to pull the blob, manipulate it, then write it back. But you can safely do this in a transaction if you select the row using FOR UPDATE which provides row level read locking.

One last thing to think about before we call it a day though. There are all sorts of situations where batches can be opened but never closed. Imagine a client opens a batch, adds items to it, then dies. It may retry and create a new batch and add new items, but there is effectively an orphaned batch. To make our queue batcher work long term we need some asynchronous cleanup. You arbitrarily decide that any non-closed batches with no activity for N days get automatically deleted. Same with any closed-batches with no activity (maybe at a different interval). This lets the system deal with batches/events that are never going to complete.

Package this all up into a service with an API and blamo! Efficient queue batch tracking!

CassieQ at the Seattle Cassandra Users Meetup

Last night Jake and I presented CassieQ (the distributed message queue on cassandra) at the seattle cassandra users meetup at the Expedia building in Bellevue. Thanks for everyone who came out and chatted with us, we certainly learned a lot and had some great conversations regarding potential optimizations to include in CassieQ.

A couple good points that came up where how to minimize the use of compare and set with the monoton provider, whether we can move to time UUID’s for “auto” incrementing monotons. Another interesting tidbit was the discussion of using potential time based compaction strategies that are being discussed that could give a big boost given the workflow cassieq has.

But my favorite was the suggestion that we create “kafka” mode and move the logic of storing pointer offsets out of cassieq and onto the client, in which case we could get enormous gains since we no longer need to do compare and sets for multiple consumers. If we do see that pull request come in I think both Jake and I would be pretty stoked.

Anyways, the slides of our presentation are available here: paradoxical.io/slides/cassieq (keynote)

AngularJS for .Net developers

A few months ago I was asked to be a technical reviewer on a new packt pub book called AngularJS for .Net developers. It mostly revolves around ServiceStack (not web API) and building a full stack application with angular. I actually really enjoyed reading it and thought it touched on a lot of great points that a developer who is serious needs to know about.

Unfortunately I think the book doesn’t do a very good job at explaining angular in general. It’s certainly geared to the experienced developer who has worked with angular and servicestack/c# REST before.

Still, if you are interested in using angular as a .net developer its an informative and quick read!

Leveraging message passing to do currying in ruby

I’m not much of a ruby guy, but I had the inkling to play with it this weekend. The first thing I do when I’m in a new language is try to map constructs that I’m familiar with, from basic stuff like object instantiation, singletons, inheritance, to more complicated paradigms like lambdas and currying.

I came across this blog post that shows that ruby has a way to auto curry lambdas, which is actually pretty awesome. However, I was a little confused by the syntax

a.send(fn, b)

I’m more used to ML style where you would do

fn a b 

So what is a.send doing?

Message passing

Ruby exposes its dynamic dispatch as a message passing mechanism (like objective c), so you can send “messages” to objects. It’s like being able to say “hey, execute this function (represented by a string) on this context”.

If you think of it that way, then a.send(fn, b) translates to “execute function ‘fn’ on the context a, with the argument of b”. This means that fn better exist on the context of ‘a’.

As an example, this curries the multiplication function:

apply_onContext = lambda do |fn, a, b|
  a.send(fn, b)
end

mult = apply_onContext.curry.(:*, 5)

puts mult.(2)

This prints out 10. First a lambda is created that sends a message to the object ‘a’ asking it to execute the the function * (represented as an interned string).

Then we can leverage the curry function to auto curry the lambda for us creating almost F# style curried functions. The syntax of “.(” is a shorthand of .call syntax which executes a lambda.

If we understand message passing we can construct other lambdas now too:

class Test
  def add(x, y)
    x + y
  end

  def addOne
    apply_onClass = lambda do |fn, x, y|
      send(fn, x, y)
    end

    apply_onClass.curry.(:add, 1)
  end
end

puts Test.new.addOne.(4)

This returns a curried lambda that invokes a message :add on the source object.

Getting rid of the dot

Ruby 1.9 doesn’t let you define what () does so you are forced to call lambdas with the dot syntax. However, ruby has other interesting features that let you alias a method to another name. It’s like moving the original method to a new name.

You can do this to any method you have access to so you can get the benefits of method overriding without needing to actually do inheritance.

Taking advantage of this you can actually hook into the default missing message exception on object (which is invoked when a “message” isn’t caught). Catching the missing method exception and then executing a .call on the object (if it accepts that message) lets us fake the parenthesis.

Here is a blog post that shows how to do it.

Obviously it sucks to leverage exception handling, but hey, still neat.

Conclusion

While nowhere near as succinct as f#

let addOne = (+) 1

But learning new things about other languages is interesting :)

Machine Learning with disaster video posted

A few weeks ago we had our second DC F# meetup with speaker Phil Trelford where he led a hands on session introducing decision trees. The goal of meetup was to see how good of a predictor we could make of who would live and die on the titanic. Kaggle has an excellent data set that shows age, sex, ticket price, cabin number, class, and a bunch of other useful features describing Titanic passengers.

Phil followed Mathias‘ format and had an excellent .fsx script that walked everyone through it. I think the best predictor that someone made was close to 84%, though it was surprisingly difficult to exceed that in the short period of time that we had to work on it. I’d implemented my own shannon entropy based ID3 decision tree in C# so this wasn’t my first foray into decision tree’s, but the compactness of the tree in F# was great to see. On top of that Phil extended the tree to test not just features, but also combinations of features by having the tree accept functions describing features. This was cool and something I hadn’t thought of. By the end you sort of had built out a small DSL describing the feature relationships of what you were trying to test. I like it when a problem domain devolves into a series of small DSL like functions!

If anyone is interested Phil let us post all of his slides and information on our github. Anyways, here is the video of the session!

Thinking about haskell functors in .net

I’ve been teaching myself haskell lately and came across an interesting language feature called functors. Functors are a way of describing a transformation when you have a boxed container. They have a generic signature of

('a -> 'b) -> f 'a -> f 'b

Where f isn’t a “function”, it’s a type that contains the type of 'a.

The idea is you can write custom map functions for types that act as generic containers. Generic containers are things like lists, an option type, or other things that hold something. By itself a list is nothing, it has to be a list OF something. Not to get sidetracked too much, but these kinds of boxes are called Monads.

Anyways, let’s do this in C# by assuming that we have a box type that holds something.


public class Box<T>
{
    public T Data { get; set; }   
}

var boxes = new List<Box<string>>();

IEnumerable<string> boxNames  = boxes.Select(box => box.Data);

We have a type Box and a list of boxes. Then we Select (or map) a box’s inner data into another list. We could extract the projection into a separate function too:

public string BoxString(Box<string> p)
{
    return p.Data;
}

The type signature of this function is

Box-> string

But wouldn’t it be nice to be able to do work on a boxes data without having to explicity project it out? Like, maybe define a way so that if you pass in a box, and a function that works on a string, it’ll automatically unbox the data and apply the function to its data.

For example something like this (but this won’t compile obviously)

public String AddExclamation(String input){
   return input + "!";
}

IEnumerable<Box<string>> boxes = new List<Box<string>>();

IEnumerable<string> boxStringsExclamation = boxes.Select(AddExclamation);

In C# we have to add the projection step (which in this case is overloaded):

public String AddExclamation(Box<String> p){
   return AddExclamation(p.Data);
}

In F# you have to do basically the same thing:

type Box<'T> = { Data: 'T }

let boxes = List.init 10 (fun i -> { Data= i.ToString() })

let boxStrings = List.map (fun i -> i.Data) boxes

But in Haskell, you can define this projection as part of the type by saying it is an instance of the Functor type class. When you make a generic type an instance of the functor type class you can define how maps work on the insides of that class.

data Box a = Data a deriving (Show)

instance Functor Box where
    fmap f (Data inside) = Data(f inside)    

main =
    print $ fmap (++"... your name!") (Data "my name")

This outputs

Data "my name... your name!"

Here I have a box that contains a value, and it has a value. Then I can define how a box behaves when someone maps over it. As long as the type of the box contents matches the type of the projection, the call to fmap works.

ParsecClone on nuget

Today I published the first version of ParsecClone to nuget. I blogged recently about creating my own parser combinator and it’s come along pretty well. While FParsec is more performant and better optimized, mine has other advantages (such as being able to work on arbitrary consumption streams such as binary or bit level) and work directly on strings with regex instead of character by character. Though I wouldn’t recommend using ParsecClone for production string parsing if you have big data sets, since the string parsing isn’t streamed. It works directly on a string. That’s still on the todo list, however the binary parsing does work on streams.

Things included:

  • All your favorite parsec style operators: <|>, >>., .>>, |>>, etc. I won’t list them all since there are a lot.
  • String parsing. Match on full string terms, do regular expression parsing, inverted regular expressions, etc. I have a full working CSV parser written in ParsecClone
  • Binary parsing. Do byte level parsing with endianness conversion for reading byte arrays, floats, ints, unsigned ints, longs, etc.
  • Bit level parsing. Capture a byte array from the byte parsing stream and then reprocess it with bit level parsing. Extract any bit, fold bits to numbers, get a list of zero and ones representing the bits you captured. Works for any size byte array (though converting to int will only work for up to 32 bit captures).
  • The fun thing about ParsecClone is you can now parse anything you want as long as you create a streamable container. The combinator libraries don’t care what they are consuming, just that they are combining and consuming. This made it easy to support strings, bytes, and bits, all as separate consumption containers.

    Anyways, maybe someone will find it useful, as I don’t think there are any binary combinator libraries out there for F# other than this one. I’d love to get feedback if anyone does use it!

Machine learning from disaster

If any of my readers are in the DC/MD/VA area you should all come to the next DC F# meetup that I’m organizing on september 16th (monday). The topic this time is machine learning from disaster, and we’ll get to find out who lives and dies on the Titanic! We’re bringing in guest speaker Phil Trelford so you know its going to be awesome! Phil is in the DC area on his way to the F# skills matters conference in NYC a few days later. I won’t be there but I expect that it will be top notch since all the big F# players are there (such as Don Syme and Tomas Petricek)!.

For more info check out our meetup page.

F# and Machine learning Meetup in DC

As you may have figured out, I like F# and I like functional languages. At some point I tweeted to the f# community lamenting that there was a dearth of F# meetups in the DC area. Lo and behold, tons of people replied saying they’d be interested in forming one, and some notable speakers piped up and said they’d come and speak if I set something up.

So, If any of my readers live in the DC metro area, I’m organizing an F# meetup featuring Mathias Brandewinder. We’ll be doing a hands on F# and machine learning coding dojo which should be a whole buttload of fun. Here’s the official blurb:

Machine Learning is the art of writing programs that get better at performing a task as they gain experience, without being explicitly programmed to do so. Feed your program more data, and it will get smarter at handling new situations.

Some machine learning algorithms use fairly advanced math, but simple approaches can be surprisingly effective. In this Session, we’ll take a classic Machine Learning challenge from Kaggle.com, automatically recognizing hand-written digits (http://www.kaggle.com/c/digit-recognizer), and build a classifier, from scratch, using F#. So bring your laptop, and let’s see how smart we can make our machines!

This session will be organized as an interactive workshop. Come over, and learn yourself a Machine Learning and F# for great good! No prior experience with Machine Learning required, and F# beginners are very welcome – it will be a great opportunity to see F# in action, and why it’s awesome.

To get the most from the session please try and bring a laptop along with F# installed (ideally either MonoDevelop or Visual Studio Web Express/Full Edition).

Mathias Brandewinder has been writing software in C# for 7+ years, and loving every minute of it, except maybe for a few release days. He is an F# MVP, enjoys arguing about code and how to make it better, and gets very excited when discussing TDD or F#. His other professional interests are applied math and probability. If you want to know more about him, you can check out his blog at www.clear-lines.com/blog or find him on Twitter as @Brandewinder.

For more info go RSVP at meetup.com

Qconn NYC 2013

If anyone is at qconn this year come find me (I’m wearing an adult swim hoodie)! There won’t be a tech talk this week since I’m busy at the conf but things will return back to normal next week.