Category: Code

Unit testing DNS failovers

Something that’s come up a few times in my career is the difficulty of validating if and when your code can handle actual DNS changes. A lot of times testing that you have the right JVM settings and that your 3rd party clients can handle it involves mucking with hosts files, nameservers, or stuff like Route53 and waiting around. Then its hard to automate and deterministically reproduce. However, you can hook into the DNS resolution in the JVM to control what gets resolved to what. And this way you can tweak the resolution in a test and see what breaks! I found some info at this blog post and cleaned it up a bit for usage in scala.

The magic sauce to pull this off is to make sure you override the default sun.net.spi.nameservice.NameServiceDescriptor. Internally in the InetAddress class it tries to load an instance of the interface NameServiceDescriptor using the Service loader mechanism. The service loader looks for resources in META-INF/services/fully.qualified.classname.to.override and instantiates whatever fully qualified class name is that class name override file.

For example, if we have

cat META-INF/services/sun.net.spi.nameservice.NameServiceDescriptor
io.paradoxical.test.dns.LocalNameServerDescriptor

Then the io.paradoxical.test.dns.LocalNameServerDescriptor will get created. Nice.

What does that class actually look like?

class LocalNameServerDescriptor extends NameServiceDescriptor {
  override def getType: String = "dns"

  override def createNameService(): NameService = {
    new LocalNameServer()
  }

  override def getProviderName: String = LocalNameServer.dnsName
}

The type is of dns and the name service implementation is our own class. The provider name is something we have custom defined as well below:

object LocalNameServer {
  Security.setProperty("networkaddress.cache.ttl", "0")

  protected val cache = new ConcurrentHashMap[String, String]()

  val dnsName = "local-dns"

  def use(): Unit = {
    System.setProperty("sun.net.spi.nameservice.provider.1", s"dns,${dnsName}")
  }

  def put(hostName: String, ip: String) = {
    cache.put(hostName, ip)
  }

  def remove(hostName: String) = {
    cache.remove(hostName)
  }
}

class LocalNameServer extends NameService {

  import LocalNameServer._

  val default = new DNSNameService()

  override def lookupAllHostAddr(name: String): Array[InetAddress] = {
    val ip = cache.get(name)
    if (ip != null && !ip.isEmpty) {
      InetAddress.getAllByName(ip)
    } else {
      default.lookupAllHostAddr(name)
    }
  }

  override def getHostByAddr(bytes: Array[Byte]): String = {
    default.getHostByAddr(bytes)
  }
}

Pretty simple. We have a cache that is stored in a singleton companion object with some helper methods on it, and all we do is delegate looking into the cache. If we can resolve the data in the cache we return it, otherwise just proxy it to the default resolver.

The use method sets a system property that says to use the dns resolver of name local-dns as the highest priority nameservice.provider.1 (lower numbers are higher priority)

Now we can write some tests and see if this works!

@RunWith(classOf[JUnitRunner])
class DnsTests extends FlatSpec with Matchers {
  LocalNameServer.use()

  "DNS" should "resolve" in {
    val google = resolve("www.google.com")

    google.getHostAddress shouldNot be("127.0.0.1")
  }

  it should "be overridable" in {
    LocalNameServer.put("www.google.com", "127.0.0.1")

    val google = resolve("www.google.com")

    google.getHostAddress should be("127.0.0.1")

    LocalNameServer.remove("www.google.com")
  }

  it should "be undoable" in {
    LocalNameServer.put("www.google.com", "127.0.0.1")

    val google = resolve("www.google.com")

    google.getHostAddress should be("127.0.0.1")

    LocalNameServer.remove("www.google.com")

    resolve("www.google.com").getHostAddress shouldNot be("127.0.0.1")
  }

  def resolve(name: String) = InetAddress.getByName(name)
}

Happy dns resolving!

Consistent hashing for fun

I think consistent hashing is pretty fascinating. It lets you define a ring of machines that shard out data by a hash value. Imagine that your hash space is 0 -> Int.Max, and you have 2 machines. Well one machine gets all values hashed from 0 -> Int.Max/2 and the other from Int.Max/2 -> Int.Max. Clever. This is one of the major algorithms of distributed systems like cassandra and dynamoDB.

For a good visualization, check out this blog post.

The fun stuff happens when you want to add replication and fault tolerance to your hashing. Now you need to have replicants and manage when machines join and add. When someone joins, you need to re-partition the space evenly and re-distribute the values that were previously held.

Something similar when you have a node leave, you need to make sure that whatever it was responsible for in its primray space AND the things it was responsible for as a secondary replicant, are re-redistributed amongst the remaining nodes.

But the beauty of consistent hashing is that the replication basically happens for free! And so does redistribution!

Since my new feature is in all in Scala, I figured I’d write something up to see how this might play out in scala.

For the impatient, the full source is here.

First I started with some data types

case class HashValue(value: String) extends AnyRef

case class HashKey(key: Int) extends AnyRef with Ordered[HashKey] {
  override def compare(that: HashKey): Int = key.compare(that.key)
}

object HashKey {
  def safe(key: Int) = new HashKey(Math.abs(key))
}

case class HashRange(minHash: HashKey, maxHash: HashKey) extends Ordered[HashRange] {
  override def compare(that: HashRange): Int = minHash.compare(that.minHash)
}

I chose to wrap the key in a positive space since it made things slightly easier. In reality you want to use md5 or some actual hashing function, but I relied on the hash code here.

And then a machine to hold values:

import scala.collection.immutable.TreeMap

class Machine[TValue](val id: String) {
  private var map: TreeMap[HashKey, TValue] = new TreeMap[HashKey, TValue]()

  def add(key: HashKey, value: TValue): Unit = {
    map = map + (key -> value)
  }

  def get(hashKey: HashKey): Option[TValue] = {
    map.get(hashKey)
  }

  def getValuesInHashRange(hashRange: HashRange): Seq[(HashKey, TValue)] ={
    map.range(hashRange.minHash, hashRange.maxHash).toSeq
  }

  def keepOnly(hashRanges: Seq[HashRange]): Seq[(HashKey, TValue)] = {
    val keepOnly: TreeMap[HashKey, TValue] =
      hashRanges
      .map(range => map.range(range.minHash, range.maxHash))
      .fold(map.empty) { (tree1, tree2) => tree1 ++ tree2 }

    val dropped = map.filter { case (k, v) => !keepOnly.contains(k) }

    map = keepOnly
    
    dropped.toSeq
  }
}

A machine keeps a sorted tree map of hash values. This lets me really quickly get things within ranges. For example, when we re-partition a machine, it’s no longer responsible for the entire range set that it was before. But it may still be responsible for parts of it. So we want to be able to tell a machine hey, keep ranges 0-5, 12-20, but drop everything else. The tree map lets me do this really nicely.

Now for the fun part, the actual consistent hashing stuff.

Given a set of machines, we need to define how the circular partitions is defined

private def getPartitions(machines: Seq[Machine[TValue]]): Seq[(HashRange, Machine[TValue])] = {
  val replicatedRanges: Seq[HashRange] = Stream.continually(defineRanges(machines.size)).flatten

  val infiteMachines: Stream[Machine[TValue]] = 
       Stream.continually(machines.flatMap(List.fill(replicas)(_))).flatten

  replicatedRanges
  .zip(infiteMachines)
  .take(machines.size * replicas)
  .toList
}

What we want to make sure is that each node sits on multiple ranges, this gives us the replication factor. To do that I’ve duplicated the machines in the list by the replication factor, and made sure all the lists cycle around indefinteily, so while they are not evenly distributed around the ring (they are clustered) they do provide fault tolerance

Lets look at what it takes to put a value into the ring:

private def put(hashkey: HashKey, value: TValue): Unit = {
  getReplicas(hashkey).foreach(_.add(hashkey, value))
}

private def getReplicas(hashKey: HashKey): Seq[Machine[TValue]] = {
  partitions
  .filter { case (range, machine) => hashKey >= range.minHash && hashKey < range.maxHash }
  .map(_._2)
}

We need to make sure that for each replica in the ring that sits on a hash range, that we insert it into that machine. Thats pretty easy, though we can improve this later with better lookups

Lets look at a get

def get(hashKey: TKey): Option[TValue] = {
  val key = HashKey.safe(hashKey.hashCode())

  getReplicas(key)
  .map(_.get(key))
  .collectFirst { case Some(x) => x }
}

Also similar. Go through all the replicas, and find the first one to return a value

Now lets look how to add a machine into the ring

def addMachine(): Machine[TValue] = {
  id += 1

  val newMachine = new Machine[TValue]("machine-" + id)

  val oldMachines = partitions.map(_._2).distinct

  partitions = getPartitions(Seq(newMachine) ++ oldMachines)

  redistribute(partitions)

  newMachine
}

So we first create a new list of machines, and then ask how to re-partition the ring. Then the keys in the ring need to redistribute themselves so that only the nodes who are responsible for certain ranges contain those keys

def redistribute(newPartitions: Seq[(HashRange, Machine[TValue])]) = {
  newPartitions.groupBy { case (range, machine) => machine }
  .flatMap { case (machine, ranges) => machine.keepOnly(ranges.map(_._1)) }
  .foreach { case (k, v) => put(k, v) }
}

Redistributing isn’t that complicated either. We group all the nodes in the ring by the machine they are on, then for each machine we tell it to only keep values that are in its replicas. The machine keepOnly function takes a list of ranges and will remove and return anything not in those ranges. We can now aggregate all the things that are “emitted” by the machines and re insert them into the right location

Removing a machine is really similiar

def removeMachine(machine: Machine[TValue]): Unit = {
  val remainingMachines = partitions.filter { case (r, m) => !m.eq(machine) }.map(_._2)

  partitions = getPartitions(remainingMachines.distinct)

  redistribute(partitions)
}

And thats all there is to it! Now we have a fast, simple consistent hasher.

A toy generational garbage collector

Had a little downtime today and figured I’d make a toy generational garbage collector, for funsies. A friend of mine was once asked this as an interview question so I thought it might make for some good weekend practice.

For those not familiar, a common way of doing garbage collection in managed languages is to have the concept of multiple generations. All newly created objects go in gen0. New objects are also the most probably to be destroyed, as there is a lot of transient stuff that goes in an application. If an element survives a gc round it gets promoted to gen1. Gen1 doesn’t get GC’d as often. Same with gen2.

A GC cycle usually consists of iterating through application root nodes (so starting at main and traversing down) and checking to see where a reference lays in which generation. If we’re doing a gen1 collection, we’ll also do gen0 and gen1. However, if you’re doing gen0 only and a node is already laying in gen1, you can just bail early and say “meh, this node and all its references are probably ok for now, we’ll try this later“.

For a really great visualization checkout this msdn article on generation garabage collection.

And now on to the code! First lets start with what is an object

@Data
@EqualsAndHashCode(of = "id")
public class Node {
    private final String id;

    public Node(String id) {
        this.id = id;
    }

    private final List<Node> references = new ArrayList<>();

    public void addReference(Node node) {
        references.add(node);
    }

    public void removeReference(Node node) {
        references.removeIf(i -> i.getId().equals(node.getId()));
    }
}

For the purposes of the toy, its just some node with some unique id.

Lets also define an enum of the different generations we’ll support and their ordinal values

public enum Mode {
    Gen0,
    Gen1
}

Next, lets make an allocator who can allocate new nodes. This would be like a new syntax behind the scenes

public class Allocator {
    @Getter
    private Set<Node> gen0 = new HashSet<>();

    @Getter
    private Set<Node> gen1 = new HashSet<>();

    public Node newNode() {
        return newNode("");
    }

    public Node newNode(String tag) {
        final Node node = new Node(tag + UUID.randomUUID());

        getGen0().add(node);

        return node;
    }

    public Mode locateNode(Node tag) {
        if (gen1.contains(tag)) {
            return Mode.Gen1;
        }

        return Mode.Gen0;
    }
    ....

At this point we can allocate a new node, and assign nodes references.

final Allocator allocator = new Allocator();

final Node root = allocator.newNode();

root.addReference(allocator.newNode());
root.addReference(allocator.newNode());
root.addReference(allocator.newNode());

Still haven’t actually collected anything though yet. So lets write a garbage collector

public class Gc {
    private final Allocator allocator;

    public Gc(Allocator allocator) {
        this.allocator = allocator;
    }

    public void collect(Node root, Mode mode) {
        final Allocator.Marker marker = allocator.markBuilder(mode);

        mark(root, marker, mode);

        marker.sweep();
    }

    private void mark(Node root, Allocator.Marker marker, Mode mode) {
        final Mode found = allocator.locateNode(root);

        if (found.ordinal() > mode.ordinal()) {
            return;
        }

        marker.mark(root);

        root.getReferences().forEach(ref -> mark(ref, marker, mode));
    }
}

The GC dos a DFS on the root object reference and marks all visible nodes with some marker builder (yet to be shown). If the generational heap that the node lives in is less than or equal to the mode we are on, we’ll mark it, otherwise just skip it. This works because later we’ll only prune from generation heaps according to the mode

Now comes the fun part, and its the marker

public static class Marker {

    private final Set<String> marks;
    private final Allocator allocator;

    private final Mode mode;

    public Marker(Allocator allocator, final Mode mode) {
        this.allocator = allocator;
        this.mode = mode;
        marks = new HashSet<>();
    }

    public void mark(Node node) {
        marks.add(node.getId());
    }

    public void sweep() {
        Predicate<Node> remove = node -> !marks.contains(node.getId());

        allocator.getGen0().removeIf(remove);

        allocator.promote(Mode.Gen0);

        switch (mode) {
            case Gen0:
                break;
            case Gen1:
                allocator.getGen1().removeIf(remove);
                allocator.promote(Mode.Gen1);
                break;
        }
    }
}

All we do here is when we mark, tag the node in a set. When we go to sweep, go through the generations less than or equal to the current and remove unmarked nodes, as well as promote the surviving nodes to the next heap!

Still missing two last functions in the allocator which are promote and the marker builder

public Marker markBuilder(final Mode mode) {
    return new Marker(this, mode);
}

private void promote(final Mode mode) {
    switch (mode) {
        case Gen0:
            gen1.addAll(gen0);
            gen0.clear();
            break;
        case Gen1:
            break;
    }
}

Now we can put it all together and write some tests:

Below you can see the promotion in action.

final Allocator allocator = new Allocator();

final Gc gc = new Gc(allocator);

final Node root = allocator.newNode();

root.addReference(allocator.newNode());
root.addReference(allocator.newNode());
root.addReference(allocator.newNode());

final Node removable = allocator.newNode("remove");

removable.addReference(allocator.newNode("dangle1"));
removable.addReference(allocator.newNode("dangle2"));

root.addReference(removable);

assertThat(allocator.getGen0().size()).isEqualTo(7);

gc.collect(root, Mode.Gen0);

assertThat(allocator.getGen0().size()).isEqualTo(0);
assertThat(allocator.getGen1().size()).isEqualTo(7);

Nothing can be collected since all nodes have references, but we’ve cleared the gen0 and moved all nodes to gen1

root.removeReference(removable);

gc.collect(root, Mode.Gen1);

assertThat(allocator.getGen0().size()).isEqualTo(0);
assertThat(allocator.getGen1().size()).isEqualTo(4);

Now we can actually remove the reference and do a gen1 collection. You can see now that the gen1 heap size went down by 3 (so the removable node, plus its two children) since those nodes are no longer reachable

And just for fun, lets show that gen1 collection works as well

final Node gen1Remove = allocator.newNode();

root.addReference(gen1Remove);

gc.collect(root, Mode.Gen1);

assertThat(allocator.getGen0().size()).isEqualTo(0);
assertThat(allocator.getGen1().size()).isEqualTo(5);

root.removeReference(gen1Remove);

gc.collect(root, Mode.Gen1);

assertThat(allocator.getGen0().size()).isEqualTo(0);
assertThat(allocator.getGen1().size()).isEqualTo(4);

And there you have it, a toy generational garbage collector :)

For the full code, check out this gist

Logging the easy way

This is a cross post from the original posting at godaddy’s engineering blog. This is a project I have spent considerable time working on and leverage a lot.

Logging is a funny thing. Everyone knows what logs are and everyone knows you should log, but there are no hard and fast rules on how to log or what to log. Your logs are your first line of defense against figuring out issues live. Sometimes logs are the only line of defense (especially in time sensitive systems).

That said, in any application good logging is critical. Debugging an issue can be made ten times easier with simple, consistent logging. Inconsistent or poor logging can actually make it impossible to figure out what went wrong in certain situations. Here at GoDaddy we want to make sure that we encourage logging that is consistent, informative, and easy to search.

Enter the GoDaddy Logger. This is a SLF4J wrapper library that encourages us to fall into the pit of success when dealing with our logging formats and styles in a few ways:

  • Frees you from having to think about what context fields need to be logged and removes any worries about forgetting to log a value,
  • Provides the ability to skip personal identifiable information from being logged,
  • Abstracts out the actual format of the logs from the production of them. By decoupling the output of the framework from the log statements themselves, you can easily swap out the formatter when you want to change the structure and all of your logging statements will be consistently logged using the new format.

A lot of teams at GoDaddy use ELK (Elasticsearch, Logstash, Kibana) to search logs in a distributed system. By combining consistent logging with ELK (or Splunk or some other solution), it becomes relatively straight forward for developers to correlate and locate related events in their distributed systems.

THE GODADDY LOGGER

In an effort to make doing the right thing the easy thing, our team set out to build an extra layer on top of SLF4J – The GoDaddy Logger. While SLF4J is meant to abstract logging libraries and gives you a basic logging interface, our goal was to extend that interface to provide for consistent logging formats. One of the most important things for us was that we wanted to provide an easy way to log objects rather than having to use string formatting everywhere.

CAPTURING THE CONTEXT

One of the first things we did was expose what we call the ‘with’ syntax. The ‘with’ syntax builds a formatted key value pair, which by default is “key=value;”, and allows logging statements to be more human readable. For example:

logger.with(“first-name”, “GoDaddy”)
     .with(“last-name”, “Developers!”)
     .info(“Logging is fun”);

Using the default logging formatter this log statement outputs:

Logging is fun; first-name=“GoDaddy”; last-name=”Developers!”.
We can build on this to support deep object logging as well. A good example is to log the entire object from an incoming request. Instead of relying on the .toString() of the object to be its loggable representation, we can crawl the object using reflectasm and format it globally and consistently. Let’s look at an example of how a full object is logged.

Logger logger = LoggerFactory.getLogger(LoggerTest.class);
Car car = new Car(“911”, 2015, “Porsche”, 70000.00, Country.GERMANY, new Engine(“V12”));
logger.with(car).info(“Logging Car”);

Like the initial string ‘with’ example, the above log line produces:

14:31:03.943 [main] INFO com.godaddy.logger.LoggerTest – Logging Car; cost=70000.0; country=GERMANY; engine.name=”V12”; make=”Porsche”; model=”911”; year=2015

All of the car objects info is cleanly logged in a consistent way. We can easily search for a model property in our logs and we won’t be at the whim of spelling errors of forgetful developers. You can also see that our logger nests object properties in dot object notation like “engine.name=”V12””. To accomplish the same behavior using SLF4J, we would need to do something akin to the following:

Use the Car’s toString functionality:

Implement the Car object’s toString function:

String toString() {
     Return “cost=” + cost + “; country=” + country + “; engine.name=” + (engine == null ? “null” : engine.getName()) … etc.
}

Log the car via it’s toString() function:

logger.info(“Logging Car; {}”, car.toString());

Use String formatting

logger.info("Logging Car; cost={}; country={};e.name=\"{}\"; make=\"{}\"; model=\"{}\"; " + "year={}; test=\"{}\"", car.getCost(), car.getCountry(), car.getEngine() == null ? null : car.getEngine().getName(), car.getMake(), car.getModel(), car.getYear());

Our logger combats these unfortunate scenarios and many others by allowing you to set the recursive logging level, which defines the amount of levels deep into a nested object you want to have logged and takes into account object cycles so there isn’t infinite recursion.

SKIPPING SENSITIVE INFORMATION

The GoDaddy Logger provides annotation based logging scope support giving you the ability to prevent fields/methods from being logged with the use of annotations. If you don’t want to skip the entity completely, but would rather provide a hashed value, you can use an injectable hash processor to hash the values that are to be logged. Hashing a value can be useful since you may want to log a piece of data consistently but you may not want to log the actual data value. For example:

import lombok.Data;


@Data

public class AnnotatedObject {
     private String notAnnotated;
     

@LoggingScope(scope = Scope.SKIP)
     private String annotatedLogSkip;
     public String getNotAnnotatedMethod() {
          return "Not Annotated";
     }


     @LoggingScope(scope = Scope.SKIP)
     public String getAnnotatedLogSkipMethod() {
          return "Annotated";
     }
     

@LoggingScope(scope = Scope.HASH)
     public String getCreditCardNumber() {
          return "1234-5678-9123-4567";
     }
}

If we were to log this object:

AnnotatedObject annotatedObject = new AnnotatedObject();
annotatedObject.setAnnotatedLogSkip(“SKIP ME”);
annotatedObject.setNotAnnotated(“NOT ANNOTATED”);

logger.with(annotatedObject).info(“Annotation Logging”);

The following would be output to the logs:

09:43:13.306 [main] INFO com.godaddy.logging.LoggerTest – Annotating Logging; creditCardNumber=”5d4e923fe014cb34f4c7ed17b82d6c58; notAnnotated=”NOT ANNOTATED”; notAnnotatedMethod=”Not Annotated”

Notice that the annotatedLogSkip value of “SKIP ME” is not logged. You can also see that the credit card number has been hashed. The GoDaddy Logger uses Guava’s MD5 hashing algorithm by default which is not cryptographically secure, but definitely fast. And you’re able to provide your own hashing algorithm when configuring the logger.

LOGGING CONTEXT

One of the more powerful things of the logger is that the ‘with’ syntax returns a new immutable captured logger. This means you can do something like this:

Logger contextLogger = logger.with(“request-id”, 123);
contextLogger.info(“enter”);

// .. Do Work

contextLogger.info(“exist”);

All logs generated off the captured logger will include the captured with statements. This lets you factor out common logging statements and cleans up your logs so you see what you really care about (and make less mistakes).

CONCLUSION

With consistent logging we can easily search through our logs and debug complicated issues with confidence. As an added bonus, since our log formatting is centralized and abstracted, we can also make team-wide or company-wide formatting shifts without impacting developers or existing code bases.

Logging is hard. There is a fine line between logging too much and too little. Logging is also best done while you write code vs. as an afterthought. We’ve really enjoyed using the GoDaddy Logger and it’s really made logging into a simple and unobtrusive task. We hope you take a look and if you find it useful for yourself or your team let us know!

For more information about the GoDaddy Logger, check out the GitHub project, or if you’re interested in working on these and other fun problems with us, check out our jobs page.

Serialization of lombok value types with jackson

For anyone who uses lombok with jackson, you should checkout jackson-lombok which is a fork from xebia that allows lombok value types (and lombok generated constructors) to be json creators.

The original authors compiled their version against jackson-core 2.4.* but the new version uses 2.6.*. Props needs to go to github user kazuki-ma for submitting a PR that actually addresses this. Paradoxical just took those fixes and published.

Anyways, now you get the niceties of being able to do:

@Value
public class ValueType{
    @JsonProperty
    private String name;
    
    @JsonProperty
    private String description;
}

And instantiate your mapper:

new ObjectMapper().setAnnotationIntrospector(new JacksonLombokAnnotationIntrospector());

Enjoy!

Cassandra DB migrations

When doing any application that involves a persistent data storage you usually need a way to upgrade and change your database using a set of scripts. Working with patterns like ActiveRecord you get easy up/down by version migrations. But with cassandra, which traditionally was schemaless, there aren’t that many tools out there to do this.

One thing we have been using at my work and at paradoxical is a simple java based cassandra loader tool that does “up” migrations based on db version scripts.

Assuming you have a folder in your application that stores db scripts like

db/scripts/01_init.cql
db/scripts/02_add_thing.cql
..
db/sripts/10_migrate_users.cql
..

Then each script corresponds to a particular db version state. It’s current state depends on all previous states. Our cassandra loader tracks db versions in a db_version table and lets you apply runners against a keyspace to move your schema (and data) to the target version. If your db is already at a version it does nothing, or if your db is a few versions back the runner will only run the required versions to get you to latest (or to the version number you want).

Taking this one step further, when working at least in Java we have the luxury of using cassandra-unit to actually run an embedded cassandra instance available for unit or integration tests. This way you don’t need to mock out your database, you actually run all your db calls through the embedded cassandra. We use this heavily in cassieq (a distributed queue based on cassandra).

One thing our cassandra loader can do is be run in library mode, where you give it the same set of db scripts and you can build a fresh db for your integration tests:

public static Session create() throws Exception {
    return CqlUnitDb.create("../db/scripts");
}

Running the loader in standalone mode (by downloading the runner maven classifier) lets you run the migration runner in your console:

> java -jar cassandra.loader-runner.jar

Unexpected exception:Missing required options: ip, u, pw, k
usage: Main
 -f,--file-path <arg>         CQL File Path (default =
                              ../db/src/main/resources)
 -ip <arg>                    Cassandra IP Address
 -k,--keyspace <arg>          Cassandra Keyspace
 -p,--port <arg>              Cassandra Port (default = 9042)
 -pw,--password <arg>         Cassandra Password
 -recreateDatabase            Deletes all tables. WARNING all
                              data will be deleted! 
 -u,--username <arg>          Cassandra Username
 -v,--upgrade-version <arg>   Upgrade to Version

The advantage to unifying all of this is that you can test your db scripts in isolation and be confident that they work!

Dalloc – coordinating resource distribution using hazelcast

A fun problem that has come up during the implementation of cassieq (a distributed queue based on cassandra) is how to evenly distribute resources across a group of machines. There is a scenario in cassieq where writes can be delayed, and as such there is a custom worker in the app (by queue) who watches a queue to see if a delayed write comes in and republishes the message to a bucket later on. It’s transparent to the user, but if we have multiple workers on the same queue we could potentially republish the message twice. While technically that falls within the SLA we’ve set for cassieq (at least once delivery) it’d be nice to avoid this particular race condition.

To solve this, I’ve clustered the cassieq instances together using hazelcast. Hazelcast is a pretty cool library since it abstracts away member discovery/connection and gives you events on membership changes to make it easy for you to build distributed data grids. It also has a lot of great primitives that are useful in building distributed workflows. Using hazelcast, I’ve built a simple resource distributor that uses shared distributed locks and a master set of allocations across cluster members to coordinate who can “grab” which resource.

For the impatient you can get dalloc from

<dependency>
    <groupId>io.paradoxical</groupId>
    <artifactId>dalloc</artifactId>
    <version>1.0</version>
</dependency>

The general idea in dalloc is that each node creates a resource allocator who is bound to a resource group name (like “Queues”). Each node supplies a function to the allocator that generates the master set of resources to use, and a callback for when resources are allocated. The callback is so you can wire in async events and when allocations need to be rebalanced outside of a manual invocation (like cluster member/join).

The entire resource allocation library API deals with abstractions on what a resource is, and lets the client map their internal resource into a ResourceIdentity. For cassieq, it’s a queue id.

When an allocation is triggered (either manually or via a member join/leave event) the following occurs:

  • Try and acquire a shared lock for a finite period of time
  • If you acquired the lock, acquire a map of what has been allocated to everyone else and compare what is available from your master set to what is available
  • Given the size of the current cluster, determine how many resources you are allowed to claim (by even distribution). If you don’t have your entire set claimed, take as many as you can to fill up. If you have too many claimed, give some resources up
  • Persist your changes to the master state map
  • Dispatch to your callback what the new set of resources should be

Hazelcast supports distributed maps, where part of the map is sharded by its map key on different nodes. However, I’m actually explicitly NOT distributing the map across the cluster. I’ve put ownership of the resource set on “one” node (but the map is replicated so if that node goes down the map still exists). This is because each node is going to have to try and do a claim. If each node claims, and then calls to every other node, thats n^2 IO operations. Compare that to every node making N operations.

The library also supports bypassing this mechanism and instead supports a much more “low-tech” solution of manual allocation. All this means is that you pre-define how many nodes there should be, and which node number a node is. Then each node sorts the input data and grabs a specific slice out of the input set based on its id. It doesn’t give any guarantees to non-overlap, but it does give you an 80% solution to a hard problem.

Jake, the other paradoxical member suggested that there could be a nice alternative solution using a similar broadcast style of quorum using paxos. Each node broadcasts what it’s claiming and the nodes agree on who is allowed to do what. I probably wouldn’t use hazelcast for that, though the primitives of paxos (talking to all members of a cluster) are there and it’d be interesting to build paxos on top of hazelcast now that I think about it…

Anyways, abstracting distributed resource allocation is nice, because as we make improvements to how we want to tune the allocation algorithms all dependent services get it for free. And free stuff is my favorite.

Leadership election with cassandra

Cassandra has a neat feature that lets you expire data in a column. Using this handy little feature, you can create simple leadership election using cassandra. The whole process is described here which talks about leveraging Cassandras consensus and the column expiration to create leadership electors.

The idea is that a user will try and claim a slot for a period of time in a leadership table. If a slot is full, someone else has leadership. While the leader is still active they needs to heartbeat the table faster than the columns TTL to act as a keepalive. If it fails to heartbeat (i.e. it died) then its leadership claim can be relinquished and someone else can claim it. Unlike most leadership algorithms that claim a single “host” as a leader, I needed a way to create leaders sharded by some “group”. I call this a “LeadershipGroup” and we can leverage the expiring columns in cassandra to do this!

To make this easier, I’ve wrapped this algorithm in a java library available from paradoxical. For the impatient

<dependency>
    <groupId>io.paradoxical</groupId>
    <artifactId>cassandra-leadership</artifactId>
    <version>1.0</version>
</dependency>

The gist here is that you need to provide a schema similar to

CREATE TABLE leadership_election (
    group text PRIMARY KEY,
    leader_id text
);

Though the actual column names can be custom defined. You can define a leadership election factory using Guice like so

public class LeadershipModule extends AbstractModule {
    @Override
    protected void configure() {
        bind(LeadershipSchema.class).toInstance(LeadershipSchema.Default);

        bind(LeadershipStatus.class).to(LeadershipStatusImpl.class);

        bind(LeadershipElectionFactory.class).to(CassandraLeadershipElectionFactory.class);
    }
}
  • LeadershipStatus is a class that lets you query who is leader for what “group”. For example, you can have multiple workers competing for leadership of a certain resource.
  • LeadershipSchema is a class that defines what the column names in your schema are named. By default if you use the sample table above, the Default schema maps to that
  • LeadershipElectionFactory is a class that gives you instances of LeadershipElection classes, and I’ve provided a cassandra leadership factory

Once we have a leader election we can try and claim leadership:

final LeadershipElectionFactory factory = new CassandraLeadershipElectionFactory(session);

// create an election processor for a group id
final LeadershipElection leadership = factory.create(LeadershipGroup.random());

final LeaderIdentity user1 = LeaderIdentity.valueOf("user1");

final LeaderIdentity user2 = LeaderIdentity.valueOf("user2");

assertThat(leadership.tryClaimLeader(user1, Duration.ofSeconds(2))).isPresent();

Thread.sleep(Duration.ofSeconds(3).toMillis());

assertThat(leadership.tryClaimLeader(user2, Duration.ofSeconds(3))).isPresent();

When you claim leadership you claim it for a period of time and if you get it you get a leadership token that you can heartbeat on. And now you have leadership!

As usual, full source available at my github

Plugin class loaders are hard

Plugin based systems are really common. Jenkins, Jira, wordpress, whatever. Recently I built a plugin workflow for a system at work and have been mired in the joys of the class loader. For the uninitiated, a class in Java is identified uniquely by the class loader instance it is created from as well as its fully qualified class name. This means that foo.bar class loaded by class loader A is not the same as foo.bar class loaded by class loader B.

There are actually some cool things you can do with this, especially in terms of code isolation. Imagine your plugins are bundled as shaded jars that contain all the internal dependencies. By leveraging class loaders you can isolate potentially conflicting versions of libraries from the host application and the plugin. But, in order to communicate to the host layer, you need a strict set of shared interfaces that the host layer always owns. When building the uber jar you exclude the host interfaces from being bundled (and all its transitive dependencies which in maven can be done by using scope provided). This means that they will always be loaded by the host.

In general, class loaders are heirarchical. They ask their parent if a class has been loaded, and if so returns that. In order to do plugins you need to invert that process. First look inside the uber-jar, and then if you can’t find a class then look up.

An example can be found here and copied for the sake of internet completeness:

import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLStreamHandlerFactory;
import java.util.UUID;

public class PostDelegationClassLoader extends URLClassLoader {

    private final UUID id = UUID.randomUUID();

    public PostDelegationClassLoader(URL[] urls, ClassLoader parent, URLStreamHandlerFactory factory) {
        super(urls, parent, factory);
    }

    public PostDelegationClassLoader(URL[] urls, ClassLoader parent) {
        super(urls, parent);
    }

    public PostDelegationClassLoader(URL[] urls) {
        super(urls);
    }

    public PostDelegationClassLoader() {
        super(new URL[0]);
    }

    @Override
    public Class<?> loadClass(String name) throws ClassNotFoundException {
        try (ThreadCurrentClassLoaderCapture capture = new ThreadCurrentClassLoaderCapture(this)) {
            Class loadedClass = findLoadedClass(name);

            // Nope, try to load it
            if (loadedClass == null) {
                try {
                    // Ignore parent delegation and just try to load locally
                    loadedClass = findClass(name);
                }
                catch (ClassNotFoundException e) {
                    // Swallow - does not exist locally
                }

                // If not found, just use the standard URLClassLoader (which follows normal parent delegation)
                if (loadedClass == null) {
                    // throws ClassNotFoundException if not found in delegation hierarchy at all
                    loadedClass = super.loadClass(name);
                }
            }
            return loadedClass;
        }
    }

    @Override
    public URL getResource(final String name) {
        final URL resource = findResource(name);

        if (resource != null) {
            return resource;
        }

        return super.getResource(name);
    }
}

But this is just the tip of the fun iceberg. If all your libraries play nice then you may not notice anything. But I recently noticed using the apache xml-rpc library that I would get a SAXParserFactory class def not found exception, specifically bitching about instantiating the sax parser factory. I’m not the only one apparenlty, here is a discussion about a JIRA plugin that wasn’t happy. After much code digging I found that the classloader being used was the one bound to the threads current context.

Why in the world is there a classloader bound to thread local? JavaWorld has a nice blurb about this

Why do thread context classloaders exist in the first place? They were introduced in J2SE without much fanfare. A certain lack of proper guidance and documentation from Sun Microsystems likely explains why many developers find them confusing.

In truth, context classloaders provide a back door around the classloading delegation scheme also introduced in J2SE. Normally, all classloaders in a JVM are organized in a hierarchy such that every classloader (except for the primordial classloader that bootstraps the entire JVM) has a single parent. When asked to load a class, every compliant classloader is expected to delegate loading to its parent first and attempt to define the class only if the parent fails.

Sometimes this orderly arrangement does not work, usually when some JVM core code must dynamically load resources provided by application developers. Take JNDI for instance: its guts are implemented by bootstrap classes in rt.jar (starting with J2SE 1.3), but these core JNDI classes may load JNDI providers implemented by independent vendors and potentially deployed in the application’s -classpath. This scenario calls for a parent classloader (the primordial one in this case) to load a class visible to one of its child classloaders (the system one, for example). Normal J2SE delegation does not work, and the workaround is to make the core JNDI classes use thread context loaders, thus effectively “tunneling” through the classloader hierarchy in the direction opposite to the proper delegation.

This means that whenever I’m delegating work to my plugins I need to be smart about capturing my custom plugin class loader and putting it on the current thread before execution. Otherwise if a misbehaving library accesses the thread classloader, it can now have access to the ambient root class loader and IFF the same class name exists in the host application it will load it. This could potentially conflict with other classes from the same package that aren’t loaded this way and in general cause mayhem.

The solution here was a simple class modeled after .NET’s disposable pattern using Java’s try/finally auto closeable.

public class ThreadCurrentClassLoaderCapture implements AutoCloseable {
    final ClassLoader originalClassLoader;

    public ThreadCurrentClassLoaderCapture(final ClassLoader newClassLoader) {
        originalClassLoader = Thread.currentThread().getContextClassLoader();

        Thread.currentThread().setContextClassLoader(newClassLoader);
    }

    @Override
    public void close() {
        Thread.currentThread().setContextClassLoader(originalClassLoader);
    }
}

Which is used before each and every invocation into the interface of the plugin (where connection is the plugin reference)

@Override
public void start() throws Exception {
    captureClassLoader(connection::start);
}

@Override
public void stop() throws Exception {
    captureClassLoader(connection::stop);
}

@Override
public void heartbeat() throws Exception {
    captureClassLoader(connection::heartbeat);
}

private void captureClassLoader(ExceptionRunnable runner) throws Exception {
    try (ThreadCurrentClassLoaderCapture capture = new ThreadCurrentClassLoaderCapture(connection.getClass().getClassLoader())) {
        runner.run();
    }
}

However, this isn’t the only issue. Imagine a scenario where you support both class path loaded plugins AND remote loaded plugins (via shaded uber-jar). And lets pretend that on the classpath is a jar with the same namespaces and classes as that in an uberjar. To be more succinct, you have a delay loaded shared library on the class path, and a version of that library that is shaded loaded via the plugin mechanism.

Technically there shouldn’t be any issues here. The class path plugin gets all its classes resolved from the root scope. The plugin gets its classes (of the same name) from the delegated provider. Both use the same shared set of interfaces of the host. The issue arrises if you have a library like reflectasm, which dynamically emits bytecode at runtime.

Look at this code:

AccessClassLoader loader = AccessClassLoader.get(type);
synchronized (loader) {
	try {
		accessClass = loader.loadClass(accessClassName);
	} catch (ClassNotFoundException ignored) {
		String accessClassNameInternal = accessClassName.replace('.', '/');
		String classNameInternal = className.replace('.', '/');
		ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_MAXS);

Which is a snippet from reflectasm as its generating a runtime byte code emitter that can access fields for you. It creates a class name like your.class.nameMethodAccess. If the class name isn’t found, it generates the bytecode and then writes it into the owning classes class loader.

In the scenario of a plugin using this library, it will check the loader and see that the plugin classloader AND rootscope loader do not have the emitted class name, and so a class not found exception is thrown. It will then write the class into the target types class loader. This would be the delegated loader, and provides the isolation we want.

However, if the class path plugin (what I call an embedded plugin) runs this code, the dynamic runtime class is written into the root scope loader. This means that all delegating class loaders will eventually find this type since they always do a delegated pass to the root!

The important thing to note here is that using a delegated loader does not mean every class that comes out of it is tied to the delegated loader. Only classes that are found inside of the delegated loader are bound to it. If a class is resolved by the parent, the class is linked to the parent.

In this scenario with the root class loader being polluted with the same class name, I don’t think there is much you can do other than avoid it.

Anyways, maybe I should have used OSGi…?

Project angelhair: Building a queue on cassandra

Edit: this project has since been moved to CassieQ: https://github.com/paradoxical-io/cassieq

A few weeks ago my work had a hack day and I got together with some of my coworker friends and we decided to build a queue on top of Cassandra.

For the impatient, give it a try (docker hub):

docker run -it \
    -e CLUSTER_NAME="" \
    -e KEYSPACE="" \
    -e CONTACT_POINTS="" \
    -e USERNAME="" \
    -e PASSWORD="" \
    -e USE_SSL="" \
    -e DATA_CENTER="" \
    -e METRICS_GRAPHITE "true" \
    -e GRAPHITE_PREFIX=" \
    -e GRAPHITE_URL=""  \
    onoffswitch/angelhair

The core features for what we called Project Angelhair was to handle:

– long term events (so many events that AMQ or RMQ might run out of storage space)
– connectionless – wanted to use http
– invisibility – need messages to disappear when they are processing but be able to come back
– highly scaleable – wanted to distribute a docker container that just did all the work

Building a queue on cassandra isn’t a trivial task and is rife with problems. In fact, this is pretty well known and in general the consensus is don’t build a queue on Cassandra.

But why not? There are a few reasons. In general, the question you want to answer with a queue is “what haven’t I seen“. A simple way to do this is when a message is consumed to delete it. However, with cassandra, deletes aren’t immediate. They are tombstoned, so they will exist for the compaction period. This means even if you have only 1 message in your queue, cassandra has to scan all the old deleted messages before it finds it. With high load this can be a LOT of extra work. But thats not the only problem. You have problems of how to distribute your messages across the ring. If you put all your messages for a queue into one partition key now you haven’t evenly distributed your messages and have a skewed distribution of work. This is going to manifest in really poor performance.

On top of all of that, cassandra has poor support for atomic transactions, so you can’t easily say “let me get, process, and consume” in one atomic action. Backing stores that are owned by a master (like sqlserver) let you do atomic actions much better since they have either have an elected leader who can manage this or are a single box. Cassandra isn’t so lucky.

Given all the problems described, it may seem insane to build a queue on Cassandra. But cassandra is a great datastore that is massively horizontally scaleable. It also exists at a lot of organizations already. Being able to use a horizontally scaleable data store means you can ingest incredible amounts of messages.

How does angelhair work?

Angelhair works with 3 pointers into a queue.

A reader bucket pointer
A repair bucket pointer
An invisibility pointer

In order to scale and efficiently act as a queue we need to leverage cassandra partitioning capabilities. Queues are actually messages bucketized into a fixed size group called a bucket. Each message is assigned a monotonically increasing id that maps itself into a bucket. For example, if the bucket is size 20 and you have id 21, that maps into bucket 1 (21/20). This is done using a table in cassandra whose only job is to provide monotonic values for a queue:

CREATE TABLE monoton (
  queuename text PRIMARY KEY,
  value bigint
);

By bucketizing messages we can distribute messages across the cassandra clusters.

Messages are always put into the bucket they correlate to, regardless if previous buckets are full. This means that messages just keep getting put into the end, as fast as possible.

Given that messages are put into their corresponding bucket, the reader has a pointer to its active bucket (the reader bucket pointer) and scans the bucket for unacked visible messages. If the bucket is full it tombstones the bucket indicating that the bucket is closed for processing. If the bucket is NOT full, but all messages in the bucket are consumed (or being processed) AND the monotonic pointer has already advanced to the next bucket, the current bucket is also tombstoned. This means no more messages will ever show up in the current bucket… sort of

Repairing delayed writes

Without synchronizing reads and writes you can run into a situation where you can have a delayed write. For example, assume you generate monotonic ids in this sequence:

Id 19
Id 20

Write 20 <-- bucket advances to bucket 1 
             (assuming bucket size of 20) and 
             bucket 0 is tombstoned (closed)

Write 19 <-- but message 19 writes into 
             bucket 0, even though 0 
             was tombstoned!

In this scenario id 20 advances the monotonic bucket to bucket 1 (given buckets are size 20). That means the reader tombstones bucket 0. But what happens to message 19? We don’t want to lose it, but as far as the reader is concerned it’s moved onto bucket 1 and off of bucket 0.

This is where the concept of a repair worker comes into play. The repair worker’s job is to slowly follow the reader and wait for tombstoned buckets. It has its own pointer (the repair bucket pointer) and polls to find when a bucket is tombstoned. When a bucket is tombstoned the repair worker will wait for a configured timeout for out of order missing messages to appear. This means if a slightly delayed write occurs then the repair worker will actually pick it up and then republish it to the last active bucket. We’re gambling on probability here, the assumption is that if a message is going to be successfully written then it will be written within time T. That time is configurable when you create the queue.

But there is also a scenario like this:

Id 19
Id 20

!!Write 19 ---> This actually dies and fails to write!
Write 20

In this scenario we claimed Id’s 19 and 20, but 19 failed to write. Once 20 is consumed the reader tombstones the bucket and the repair worker kicks in. But 19 isn’t ever going to show up! In this case, the repair worker waits for the configured time and if after that time the message isn’t written then we assume that that message is dead and will never be processed. Then the repair worker advances its pointer and moves on.

This means we don’t necessarily guarantee FIFO, however we do (reasonably) guarantee messages will appear. The repair worker never moves past a non completed bucket, though since its just a pointer we can always repair the repair worker by moving the pointer back.

Invisibility

Now the question comes up as how to deal with invisibility of messages. Invisible messages are important since with a conncectionless protocol (like http) we need to know if a message worker is dead and its message has to go back for processing. In queues like RMQ this is detected when a channel is disconnected (i.e. the connection is lost). With http not so lucky.

To track invisibility there is a separate pointer tracking the last invisible pointer. When a read comes in, we first check the invisibility pointer to see if that message is now visible.

If it is, we can return it. If not, get the next available message.

If the current invisible pointer is already acked then we need to find the next invisible pointer. This next invisible pointer is the first non-acked non-visible message. If there isn’t one in the current bucket, the invisibility pointer moves to the next bucket until it finds one or no messages exist, but never move past a message that hasn’t been delivered before. This way it won’t accidentally skip a message that hasn’t been sent out yet.

If however, there are two messages that get picked up at the same time the invis pointer is scanning through the invis pointer could choose the wrong id. In order to prevent this, we update the invis pointer to the destination if it’s less than the current (i.e. we need to move back), or if its not then only update if the current reader owns the current invis pointer (doing an atomic update).

API

Angelhair has a simple API.

– Put a message into a queue (and optionally specify an initial invisiblity)
– Get a message from a queue
– Ack the message using the message pop reciept (which is an encoded version and id metadata). The pop reciept is unique for each message dequeue. If a message comes back alive and is available for processing again it gets a new pop recipet. This also lets us identify a unique consumer of a message since the current atomic version of the message is encoded in the pop reciept.

Doesn’t get much easier than that!

Conclusion

There are a couple implementations of queues on cassandra out there that we found while researching this. One is from netflix but their implementation builds a lock system on top of cassandra and coordinates reads/writes using locking. Some other implementations used wide rows (or CQL lists in a single row) to get around the tombstoning, but that limits the number of messages in your “queue” to 64k messages.

While we haven’t tested angelhair in a stressed environment, we’ve decided to give it a go in some non critical areas in our internal tooling. But so far we’ve had great success with it!