Category: Code

From Thrift to Finatra

Originally posted on the curalate engineering blog

There are a million and one ways to do (micro-)services, each with a million and one pitfalls. At Curalate, we’ve been on a long journey of splitting out our monolith into composable and simple services. It’s never easy, as there are a lot of advantages to having a monolith. Things like refactoring, code-reuse, deployment, versioning, rollbacks, are all atomic in a monolith. But there are a lot of disadvantages as well. Monoliths encourage poor factoring, bugs in one part of the codebase force rollbacks/changes of the entire application, reasoning about the application in general becomes difficult, build times are slow, transient build errors increase, etc.

To that end our first foray into services was built on top of Twitter Finagle stack. If you go to the page and can’t figure out what exactly finagle does, I don’t blame you. The documentation is lackluster and in and of itself is quite low-level. Finagle defines a service as a function that transforms a request into a response, and composes services with filters that manipulate requests/responses themselves. It’s a clean abstraction, given that this is basically what all web service frameworks do.

Thrift

Finagle by itself isn’t super opinionated. It gives you building blocks to build services (service discovery, circuit breaking, monitoring/metrics, varying protocols, etc) but doesn’t give you much else. Our first set of services built on finagle used Thrift over HTTP. Thrift, similiar to protobuf, is an intermediate declarative language that creates RPC style services. For example:

namespace java tutorial
namespace py tutorial

typedef i32 int // We can use typedef to get pretty names for the types we are using
service MultiplicationService
{
        int multiply(1:int n1, 2:int n2),
}

Will create an RPC service called MultiplicationService that takes 2 parameters. Our implementation at Curalate hosted Thrift over HTTP (serializing Thrift as JSON) since all our services are web based behind ELB’s in AWS.

We have a lot of services at Curalate that use Thrift, but we’ve found a few shortcomings:

Model Reuse

Thrift forces you to use primitives when defining service contracts, which makes it difficult to share lightweight models (with potentially useful utilities) to consumers. We’ve ended up doing a lot of mapping between generated Thrift types and shared model types. Curalate’s backend services are all written in Scala, so we don’t have the same issues that a company like Facebook (who invented Thrift) may have with varying languages needing easy access to RPC.

Requiring a client

Many times you want to be able to interact with a service without needing access to a client. Needing a client has made developers to get used to cloning service repositories, building the entire service, then entering a Scala REPL in order to interact with a service. As our service surface area expands, it’s not always feasible to expect one developer to build another developers service (conflicting java versions, missing SBT/Maven dependencies or settings, etc). The client requirement has led to services taking heavyweight dependencies on other services and leaking dependencies. While Thrift doesn’t force you to do this, this has been a side effect of it taking extra love and care to generate a Thrift client properly, either by distributing Thrift files in a jar or otherwise.

Over the wire inspection

With Thrift-over-HTTP, inspecting requests is difficult. This is due to the fact that these services use Thrift serialization, which unlike JSON, isn’t human-readable.

Because Thrift over HTTP is all POSTs to /, tracing access and investigating ELB logs becomes a jumbled mess of trying to correlate times and IP’s to other parts of our logging infrastructure. The POST issue is frustrating, because it’s impossible for us to do any semantic smart caching, such as being able to insert caches at the serving layer for retrieval calls. In a pure HTTP world, we could insert a cache for heavily used GETs given a GET is idempotent.

RPC API design

Regardless of Thrift, RPC encourages poorly unified API’s with lots of specific endpoints that don’t always jive. We have many services that have method topologies that are poorly composable. A well designed API, and cluster of API’s, should gently guide you to getting the data you need. In an ideal world if you get an ID in a payload response for a data object, there should be an endpoint to get more information about that ID. However, in the RPC world we end up with a batch call here, a specific RPC call there, sometimes requiring stitching several calls to get data that should have been a simple domain level call.

Internal vs External service writing

We have lot of public REST API’s and they are written using the Lift framework (some of our oldest code). Developers moving from internal to external services have to shift paradigms and move from writing REST with JSON to RPC with Thrift.

Overall Thrift is a great piece of technology, but after using it for a year we found that it’s not necessarily for us. All of these things have prompted a shift to writing REST style services.

Finatra

Finatra is an HTTP API framework built on top of Finagle. Because it’s still Finagle, we haven’t lost any of our operational knowledge of the underlying framework, but instead we can now write lightweight HTTP API’s with JSON.

With Finatra, all our new services have Swagger automatically enabled so API exploration is simple. And since it’s just plain JSON using Postman is now possible to debug and inspect APIs (as well as viewing requests in Charles or other proxies).

With REST we can still distribute lightweight clients, or more importantly, if there are dependency conflicts a service consumer can very quickly roll an HTTP client to a service. Our ELB logs now make sense and our new API’s are unified in their verbiage (GET vs POST vs PUT vs DELETE) and if we want to write RPC for a particular service we still can.

There are a few other things we like about Finatra. For those developers coming from a background of writing HTTP services, Finatra feels familiar with the concept of controllers, filters, unified test-bed for spinning up build verification tests (local in memory servers), dependency injection (via Guice) baked in, sane serialization using Jackson, etc. It’s hard to do the wrong thing given that it builds strong production level opinions onto Finagle. And thankfully those opinions are ones we share at Curalate!

We’re not in bad company — Twitter, Duolingo, and others are using Finatra in production.

The HTTP driver pattern

Yet another SOA blog post, this time about calling services. I’ve seen a lot of posts, articles, even books, on how to write services but not a good way about calling services. It may seem trivial, isn’t calling a service a matter of making a web request to one? Yes, it is, but in a larger organization it’s not always so trivial.

Distributing fat clients

The problem I ran into was the service stack in use at my organization provided a feature rich client as an artifact of a services build. It had retries, metrics, tracing with zipkin, etc. But, it also pulled in things like finagle, netty, jackson, and each service may be distributing slightly different versions of all of these dependencies. When you start to consume 3, 4, 5 or more clients in your own service, suddenly you’ve gotten into an intractable mess of dependencies. Sometimes there’s no actual way to resolve them all without forcing upgrades in other services! That… sucks. It violates the idea of services in that my service is now coupled to your service.

You don’t want to force service owners to have to write clients for each service they want to call. That’d be a big waste of time and duplicated effort. If your organization is mono-lingual (i.e. all java/scala/whatever) then its still worth providing a feature rich client that has the sane things built in: retries, metrics, tracing, fast fail, serialization, etc. But you don’t want services leaking all the nuts and bolts to each other.

One solution is to auto generate clients server side. This is akin to what WCF does, or projects like swagger, thrift for RPC, etc. The downside here is that the generated code is usually pretty nasty and sometimes its hard to plug in to augment the clients with custom tracing, correlation tracking, etc. Other times the API itself might need a few nicety helper methods that you don’t want to expose in the raw API itself. But in the auto generated world, you can’t do this.

There are other projects like Retrofit that look like they solve the problem since your client is just an interface and its only dependency is OkHttp. But retrofit isn’t scala friendly (None’s need custom support, default arguments in methods are not properly intercepted, etc). You’re also bound to the back-compat story of retrofit/okhttp, assuming that they can do things like make sure older versions live side by side together.

In practice, I found that retrofit (even with scala’s issues) didn’t work well in a distributed services environment where everyone was at wildly different versions of things.

Abstracting HTTP

However, taking the idea from retrofit we can abstract away http calls with an http driver. Http really isn’t that complicated, especially for how its used in conjuction with service to service calls:

import scala.concurrent.{ExecutionContext, Future}

case class ApiRequest(
  path: String,
  queryParams: Seq[(String, Option[String])] = Nil,
  headers: Seq[(String, Option[String])] = Nil,
  options: Option[RequestOptions] = None
) 

case class RequestOptions(
  contentType: Option[String],
  characterSet: String = "utf-8"
)

/**
 * A response with a body
 *
 * @param data     The deserialized data
 * @param response The raw http response
 * @tparam T The type to deserialize
 */
case class BodyResponse[T](data: T, response: RawResponse)

/**
 * A raw response that contains code, the body and headers
 *
 * @param code
 * @param body
 * @param headers
 */
case class RawResponse(code: Int, body: String, headers: Map[String, List[String]])

/**
 * An http error that all drivers should throw on non 2xx
 *
 * @param code  The code
 * @param body  An optional body
 * @param error The inner exception (may be driver specific)
 */
case class HttpError(code: Int, body: Option[String], error: Exception)
  extends Exception(s"Error ${code}, body: ${body}", error)

/**
 * Marker trait indicating an http client
 */
trait HttpClient

/**
 * The simplest HTTP Driver. This is used to abstract libraries that call out over the wire.
 *
 * Anyone can create a driver as long as it implements this interface
 */
trait HttpDriver {
  val serializer: HttpSerializer

  def get[TRes: Manifest](
    request: ApiRequest
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def post[TReq: Manifest, TRes: Manifest](
    request: ApiRequest,
    body: Option[TReq]
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def put[TReq: Manifest, TRes: Manifest](
    request: ApiRequest,
    body: Option[TReq]
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def patch[TReq: Manifest, TRes: Manifest](
    request: ApiRequest,
    body: Option[TReq]
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def custom[TReq: Manifest, TRes: Manifest](
    method: Methods,
    request: ApiRequest,
    body: Option[TReq]
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def delete[TRes: Manifest](
    request: ApiRequest
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def bytesRaw[TRes: Manifest](
    method: Methods,
    request: ApiRequest,
    body: Option[Array[Byte]]
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]
}

Service owners who want to distribute a client can create clients that have no dependencies (other than the driver definition. Platform maintainers, like myself, can be dilligent about making sure the driver interface never breaks, or if it does is broken in a new namespace such that different versions can peacefully co-exist in the same process.

An example client can now look like

class ServiceClient(driver: HttpDriver) {
  def ping()(implicit executionContext: ExecutionContext): Future[Unit] = {
    driver.get[Unit]("/health").map(_.data)
  }
}

But we still need to provide an implementation of a driver. This is where we can decouple things and provide drivers that are properly tooled with all the fatness we want (netty/finagle/zipkin tracing/monitoring/etc) and service owners can bind their clients to whatever driver they want. Those provided implementations can be in their own shared library that only service’s bind to (not service clients! i.e. terminal endpoints in the dependency graph)

There are few advantages here:

  • Clients can be distributed at multiple scala versions without dependency conflicts
  • It’s much simpler to version manage and back-compat an interface/trait than it is an entire lib
  • Default drivers that do the right thing can be provided by the service framework, and back compat doesn’t need to be taken into account there since the only consumer is the service (it never leaks).
  • Drivers are simple to use, so if someone needs to roll their own client its really simple to do it

Custom errors

We can do some other cool stuff now too, given we’ve abstracted away how to call http code. Another common issue with clients is dealing with meaningful errors that aren’t just the basic http 5xx/4xx codes. For example, if you throw a 409 conflict you may want the client to actually receive a WidgetInIncorrectState exception for some calls, and in other calls maybe a FooBarInUse error that contains more semantic information. Basically overloading what a 409 means for a particular call/query. One way of doing this is with a discriminator in the error body:

HTTP 409 response:
{
   "code": "WidgetInIncorrectState",
   "widgetName: "foo",
   "widgetSize": 1234
}

Given we don’t want client code pulling in a json library to do json parsing, the driver needs to support context aware deserialization.

To do that, I’ve exposed a MultiType object that defines

  • Given a path into the json object, which field defines the discriminator
  • Given a discriminator, which type to deserialize to
  • Which http error code to apply all this too

And it looks like:

/**
 * A type representing deserialization of multiple types.
 *
 * @param discriminatorField The field that represents the textual "key" of what the subtype is. Nested fields can be located using
 *                           json path format of / delimited. I.e /foo/bar
 * @param pathTypes          The lookup of the result of the discriminatorField to the subtype mapper
 * @tparam T The supertype of all the subtypes
 */
case class MultiType[T](
  discriminatorField: String,
  pathTypes: Map[String, SubType[_ <: T]]
)

/**
 * Represents a subtype as part of a multitype mapping
 *
 * @param path The optional json sub path (slash delimited) to deserialize the type as.
 * @tparam T The type to deserialize
 */
case class SubType[T: Manifest](path: Option[String] = None) {
  val clazz = manifest[T].runtimeClass.asInstanceOf[Class[T]]
}

Using this in a client looks like:

class ServiceClient(driver: HttpDriver) {
  val errorMappers = MultiType[ApiException](discriminatorField = "code", Map(
    "invalidData" -> SubType[InvalidDataException]()
  ))

  def ping()(implicit executionContext: ExecutionContext): Future[Unit] = {
    driver.get[Unit]("/health").map(_.data).failWithOnCode(500, errorMappers)
  }
}

This is saying that when I get the value invalidData in the json response of field code on an http 500 error, to actually throw an InvalidDataException in the client.

How does this work? Well just like the http driver, we’ve abstracted the serializer and that’s all plugged in by the service consumer

case class DiscriminatorDoesntExistException(msg: String) extends Exception(msg)

object JacksonHttpSerializer {
  implicit def jacksonToHttpSerializer(jacksonSerializer: JacksonSerializer): HttpSerializer = {
    new JacksonHttpSerializer(jacksonSerializer)
  }
}

class JacksonHttpSerializer(jackson: JacksonSerializer = new JacksonSerializer()) extends HttpSerializer {
  override def fromDiscriminator[SuperType](multiType: MultiType[SuperType])(str: String): SuperType = {
    val tree = jackson.objectMapper.readTree(str)

    val node = tree.at(addPrefix(multiType.discriminatorField, "/"))

    val subType = multiType.pathTypes.get(node.textValue()).orElse(multiType.defaultType).getOrElse {
      throw new RuntimeException(s"Discriminator ${multiType.discriminatorField} does not exist")
    }

    val treeToDeserialize = subType.path.map(m => tree.at(addPrefix(m, "/"))).getOrElse(tree)

    jackson.objectMapper.treeToValue(treeToDeserialize, subType.clazz)
  }

  override def toString[T](data: T): String = {
    jackson.toJson(data)
  }

  override def fromString[T: Manifest](str: String): T = {
    jackson.fromJson(str)
  }

  private def addPrefix(s: String, p: String) = {
    p + s.stripPrefix(p)
  }
}

Inherent issues

While there are a lot of goodies in abstracting serialization and http calling into a library API provided with implementations (drivers), it does handicap the clients a little bit. Things like doing custom manipulation of the raw response, any sort of business logic, adding other libraries, etc is really frowned upon. I’d argue this is a good thing and that this should all be handled at the service level since a client is always a nice to have and not a requirement.

Conclusion

The ultimate goal in SOA is separation. But 100% separation should not mean copy-pasting things, reinventing the wheel, or not sharing any code. It just means you need to build the proper lightweight abstractions to help keep strong barriers between services without creating a distributed monolith.

With the http drive abstraction pattern it’s now easy to provide drives that use finagle-http under the hood, or okhttp, or apache http, etc. Client writers can share their model and client code with helpful utilities without leaking dependencies. And most importantly, service owners can update dependencies and move to new scala versions without fearing that their dependencies are going to cause runtime or compile time issues against pulled in clients, all while still iterating quickly and safely.

Bit packing Pacman

Haven’t posted in a while, since I’ve been heads down in building a lot of cool tooling at work (blog posts coming), but had a chance to mess around a bit with something that came up in an interview question this week.

I frequently ask candidates a high level design question to build PacMan. Games like pacman are fun because on the surface they are very simple, but if you don’t structure your entities and their interactions correctly the design falls apart.

At some point during the interview we had scaled the question up such that there was now a problem of knowing at a particular point in the game what was nearby it. For example, if the board is 100000 x 100000 (10 billion elements) how efficiently can we determine if there is a nugget/wall next to us? One option is to store all of these entities in a 2d array and just access the neighbors. However, if the entity is any non trivial object, then we now have at minumum 16 bytes. That means we’re storing 160 gigs to access the board. Probably not something we can realistically do on commodity hardware.

Given we’re answering only a “is something there or not” question, one option is to bit pack the answer. In this sense you can leverage that each bit represents a coordinate in your grid. For example in a 2D grid

0 1
2 3

These positions could be represented by the binary value at that bit:

0 = 0b0001
1 = 0b0010
2 = 0b0100
3 = 0b1000

If we do that, and we store a list of longs (64 bits, 8 bytes) then to store 10 billion elements we need:

private val maxBits = maxX * maxY
private val requiredLongs = (maxBits / 64) + 1

Which ends up being 22,032,273 longs, which in turn is 176.2 MB. Thats… a big savings. Considering that the trivial form we stored 10,000,000,000 objects, this is a compression ratio of 450%.

Now, one thing the candidate brought up (which is a great point) is that this makes working with the values much more difficult. The answer here is to provide a higher level API that hides away the hard bits.

I figured today I’d set down and do just that. We need to be able to do a few things

  1. Find out how many longs to store
  2. Find out given a coordinate which long it belongs to
  3. In that long toggle the bit representing the coordinate if we want to set/unset it
class TwoDBinPacker(maxX: Int, maxY: Int) {
  private val maxBits = maxX * maxY
  private val requiredLongs = (maxBits / 64) + 1
  private val longArray = new Array[Long](requiredLongs)

  def get(x: Int, y: Int): Boolean = {
    longAtPosition(x, y).value == 1
  }

  def set(x: Int, y: Int, value: Boolean) = {
    val p = longAtPosition(x, y)

    longArray(p.index) = p.set(value)
  }

  private def longAtPosition(x: Int, y: Int): BitValue = {
    val flattenedPosition = y * maxX + x

    val longAtPosition = flattenedPosition / 64

    val bitAtPosition = flattenedPosition % 64

    BitValue(longAtPosition, longArray(longAtPosition), bitAtPosition)
  }
}

With the helper class of a BitValue looking like:

case class BitValue(index: Int, container: Long, bitNumber: Int) {
  val value = (container >> bitNumber) & 1

  def set(boolean: Boolean): Long = {
    if (boolean) {
      val maskAt = 1 << bitNumber

      container | maskAt
    } else {
      val maskAt = ~(1 << bitNumber)

      container & maskAt
    }
  }
}

At this point we can drive a scalatest:

"Bit packer" should "pack large sets (10 billion!)" in {
  val packer = new TwoDBinPacker(100000, 100000)

  packer.set(0, 0, true)
  packer.set(200, 400, true)

  assert(packer.get(0, 0))
  assert(packer.get(200, 400))
  assert(!packer.get(99999, 88888))
}

And this test runs in 80ms.

Now, this is a pretty naive way of doing things, since we are potentially storing tons of unused longs. A smarter way would be use a sparse set with skip lists, such that as you use a long you create it and mark it used, but things before it and after it (up to the next long) are marker blocks that can span many ranges. I.e.

{EmtpyBlock}[long, long, long]{EmptyBlock}[long]

This way you don’t have to store things you don’t actually set.

Anyways, a fun little set of code to write. Full source available on my github

Strongly typed http headers in finatra

When building service architectures one thing you need to solve is how to pass context between services. This is usually stuff like request id’s and other tracing information (maybe you use zipkin) between service calls. This means that if you set request id FooBar123 on an entrypoint to service A, if service A calls service B it should know that the request id is still FooBar123. The bigger challenge is usually making sure that all thread locals keep this around (and across futures/execution contexts), but before you attempt that you need to get it into the system in the first place.

I’m working in finatra these days, and I love this framework. It’s got all the things I loved from dropwizard but in a scala first way. Todays challenge was that I wanted to be able to pass request http headers around between services in a typesafe way that could be used in thread local request contexts. Basically I want to send

X-Magic-Header someValue

And be able to resolve that into a MagicHeader(value: T) class.

The first attempt is easy, just parse header values into case classes:

case class MagicHeader(value: String)

But the question I have is how do I enforce that the header string X-Magic-Value is directly correlated to the case class MagicHeader?

object MagicHeader { 
   val key = "X-Magic-Header"
}

case class MagicHeader(value: String)

Maybe, but still, when someone sends the value out, they can make a mistake:

setRequestHeader("X-mag1c-whatevzer" -> magicHeader.value)

That sucks, I don’t want that. I want it strictly paired. I’m looking for what is in essence a case class that has 2 fields: key, value, but where the key is fixed. How do I do that?

I like to start with how I want to use something, and then work backwards to how to make that happen. Given that, lets say we want an api kind of like:

object Experimental {
  val key = "Experimental"

  override type Value = String
}

And I’d like to be able to do something like

val experimentKey = Experimental("experiment abc")
(experimentKey.key -> experimentKey.value) shouldEqual
         ("Experimental" -> "experiment abc")

I know this means I need an apply method somewhere, and I know that I want a tuple of (key, value). I also know that because I have a path dependent type of the second value, that I can do something with that

Maybe I can fake an apply method to be like

trait ContextKey {
  val key: String

  /**
   * The custom type of this key
   */
  type Value

  /**
   * A tupel of (String, Value)
   */
  type Key = Product2[String, Value]

  def apply(data: Value): Key = new Key {
    override def _1: String = key

    override def _2: Value = data
  }
}

And update my object to be

object Experimental extends ContextKey {
  val key = "Experimental"

  override type Value = String
}

Now my object has a mixin of an apply method that creates an anonmyous tuple of type String, Value. You can create instances of Experimental but you can’t ever set the key name itself! However, I can still access the pinned key because the anonymous tuple has it!

But in the case that I wanted, I wanted to use these as http header values. Which means I need to be able to parse a string into a type of ContextKey#Value which is path dependent on the object type.

We can do that by adding now a few extra methods on the ContextKey trait:

trait ContextKeyType[T] extends Product2[String, T] {
  def unparse: String
}

trait ContextKey {
  self =>
  val key: String

  /**
   * The custom type of this key
   */
  type Value

  /**
   * A tupel of (String, Value)
   */
  type Key = ContextKeyType[Value]

  /**
   * Utility to allow the container to provide a mapping from Value => String
   *
   * @param r
   * @return
   */
  def parse(r: String): Value

  def unparse(v: Value): String

  def apply(data: Value): Key = new Key {
    override def _1: String = key

    override def _2: Value = data

    /**
     * Allow a mapping of Value => String
     *
     * @return
     */
    override def unparse: String = self.unparse(data)

    override def equals(obj: scala.Any): Boolean = {
      canEqual(obj)
    }

    override def canEqual(that: Any): Boolean = {
      that != null &&
      that.isInstanceOf[ContextKeyType[_]] &&
      that.asInstanceOf[ContextKeyType[_]]._1 == key &&
      that.asInstanceOf[ContextKeyType[_]]._2 == data
    }
  }
}

This introduces a parse and unparse method which converts things to and from strings. A http header object can now define how to convert it:

object Experimental extends ContextKey {
  val key = "Experimental"
  override type Value = String

  override def parse(value: String): String = value

  override def unparse(value: String): String = value
}

So, if we want to maybe send JSON in a header, or a long/int/uuid we can now parse and unparse that value pre and post wire.

Now lets add a utility to convert a Map[String, String] which could represent an http header map, into a set of strongly typed context values:

object ContextValue {
  def find[T <: ContextKey](search: T, map: Map[String, String]): Option[T#Value] = {
    map.collectFirst {
      case (key, value) if search.key == key => search.parse(value)
    }
  }
}

Back in finatra land, lets add a http filter

case class CurrentRequestContext(
  experimentId: Option[Experimental.Value],
)

object RequestContext {
  private val requestType = Request.Schema.newField[CurrentRequestContext]

  implicit class RequestContextSyntax(request: Request) {
    def context: CurrentRequestContext = request.ctx(requestType)
  }

  private[filters] def set(request: Request): Unit = {
    val data = CurrentRequestContext(
      experimentId = ContextValue.find(Experimental, request.headerMap)
    )

    request.ctx.update(requestType, data)
  }
}

/**
 * Set the remote context from requests 
 */
class RemoteContextFilter extends SimpleFilter[Request, Response] {
  override def apply(request: Request, service: Service[Request, Response]): Future[Response] = {
    RequestContext.set(request)

    service(request)
  }
}

From here on out, we can provide a set of strongly typed values that are basically case classes with hidden keys

Deployment the paradoxical way

First and foremost, this is all Jake Swensons brain child. But it’s just too cool to not share and write about. Thanks Jake for doing all the hard work :)

At paradoxical, we really like being able to crank out libraries and projects as fast as possible. We hate boilerplate and we hate repetition. Everything should be automated. For a long time we used maven archetypes to crank out services from a template and libraries from a template, and that worked reasonably well. However, deployment was always kind of a manual process. We had scripts in each repo to use the maven release plugin but our build system (Travis) wasn’t wired into it. This meant that deploys of libraries/services required a manual (but simple) step to run. We also had some kinks with our gpg keys and we weren’t totally sure a clean way of having Travis be able to sign our artifacts in a secure way without our keys being checked into a bunch of different repos

Jake and I had talked a while ago about how nice it would be if we could

  • Have all builds to master auto deployed as snapshots
  • PR’s built but not deployed
  • Creating a github release kicked off an actual release

The first two were reasonably easy with the travis scripts we already had, but it was the last one that was fun.

This article was posted not long ago about simplifying your maven release process by chucking the maven release plugin and instead using the maven deploy directly. If you could parameterize your maven artifact version number and have your build pass that in from the git tag, then we could really easily achieve git tag driven development!

To that end, Jake created a git project that facilitated setting up all our repo’s for tag driven deployment. Each deployable of ours would check out this project as a submodule under .deployment which contains the tooling to make git tag releases happen.

To onboard

First things first, is that we need a way to delegate deployment after our travis build is complete. So you’d add the following to your projects travis file:

git:
  submodules: false
before_install:
  # https://git-scm.com/docs/git-submodule#_options:
  # --remote
  # Instead of using the superproject’s recorded SHA-1 to update the submodule,
  # use the status of the submodule’s remote-tracking (branch.<name>.remote) branch (submodule.<name>.branch).
  # --recursive
  # https://github.com/travis-ci/travis-ci/issues/4099
  - git submodule update --init --remote --recursive
after_success:
- ./.deployment/deploy.sh

Which would pull the git deployment submodule, and delegate the after step to its deploy script.

You also need to add the deployment project as a parent of your pom:

<parent>
    <groupId>io.paradoxical</groupId>
    <artifactId>deployment-base-pom</artifactId>
    <version>1.0</version>
</parent>

This sets up nice things for us like making sure we sign our GPG artifacts, include sources as part of our deployment, and attaches javadocs.

The last thing you need to do is parametarize your artifact version field:

<version>1.0${revision}</version>

The parent pom defines revision and will set it to be either the git tag or -SNAPSHOT depending on context.

But, for those of you with strong maven experience, an alarm may fire that you can’t parameterize the version field. To solve that problem, Jake wrote a wonderful maven parameter resolver which lets you white-list which fields need to be pre-processed before they are processed. This solves an issue where a deployed maven pom that has parameterized values that are set at build time only are captured for deployment. Without that, maven has issues resolving transitive dependencies.

Anyways, the base pom handles a lot of nice things :)

The deploy script

Now lets break down the after build deploy script. It’s job is to take the travis encrypted gpg keys (which are also password secured) and decrypt them, and run the right maven release given the git tags.

if [ -n "$TRAVIS_TAG" ]; then
    echo "Deploying release version for tag '${TRAVIS_TAG}'"
    mvn clean deploy --settings "${SCRIPT_DIR}/settings.xml" -DskipTests -P release -Drevision='' $@
    exit $?
elif [ "$TRAVIS_BRANCH" = "master" ]; then
    echo "Deploying snapshot version on branch '${TRAVIS_BRANCH}'"
    mvn clean deploy --settings "${SCRIPT_DIR}/settings.xml" -DskipTests -P snapshot $@
    exit $?
else
    echo "No deployment running for current settings"
    exit 0
fi

It’s worth noting here a few magic things.

  1. The settings.xml file is provided by submodule and contains a field for the gpg username and the parametrized password the in every repo and contains the gpg user
  2. Because the deploy script is invoked from the root of the project, even though the deploy script is in the deployment submodule it resolves paths from the script execution point (not where it the script lives at). This is why the script captures its own path and stores it as the $SCRIPT_DIR variable.

Release time!

Now that it’s all set up we can safely merge to master whenever we want to publish a snapshot, and if we want to mark a release as public we just create a git tag for it.

Coproducts and polymorphic functions for safety

I was recently exploring shapeless and a coworker turned me onto the interesting features of coproducts and how they can be used with polymorphic functions.

Frequently when using pattern matching you want to make sure that all cases are exhaustively checked. A non exhaustive pattern match is a runtime exception waiting to happen. As a scala user, I’m all about compile time checking. For classes that I own I can enforce exhaustiveness by creating a sealed trait heirarchy:

sealed trait Base
case class Sub1() extends Base
case class Sub2() extends Base

And if I ever try and match on an Base type I’ll get a compiler warning (that I can fail on) if all the types aren’t matched. This is nice because if I ever add another type, I’ll get a (hopefully) failed build.

But what about the scenario where you don’t own the types?

case class Type1()
case class Type2()
case class Type3()

They’re all completely unrelated. Even worse is how do you create a generic function that accepts an instance of those 3 types but no others? You could always create overloaded methods:

def takesType(type: Type1) = ???
def takesType(type: Type2) = ???
def takesType1(type: Type3) = ???

Which works just fine, but what if that type needs to be passed through a few layers of function calls before its actually acted on?

def doStuff(type: Type1) = ... takesType(type1)
def doStuff(type: Type2) = ... takesType(type2)
def doStuff(type: Type3) = ... takesType(type3)

Oh boy, this is a mess. We can’t get around with just using generics with type bounds since there is no unified type for these 3 types. And even worse is if we add another type. We could use an either like Either[Type1, Either[Type2, Either[Type3, Nothing]]]

Which lets us write just one function and then we have to match on the subsets. This is kind of gross too since its polluted with a bunch of eithers. Turns out though, that a coproduct is exactly this… a souped up either!

Defining

type Items = Type1 :+: Type2 :+: Type3 :+: CNil

(where CNil is the terminator for a coproduct) we now have a unified type for our collection. We can write functions like :

def doStuff(item: Items) = {
  // whatever
  takesType(item)
}

At some point, you need to lift an instance of Type1 etc into a type of Item and this can be done by calling Coproduct[Item](instance). This call will fail to compile if the type of the instance is not a type of Item. You also are probably going to want to actually do work with the thing, so you need to unbox this souped up either and do stuff with it

This is where the shapeless PolyN methods come into play.

object Worker {
  type Items = Type1 :+: Type2 :+: Type3 :+: CNil

  object thisIsAMethod extends Poly1 {
    // corresponding def for the data type of the coproduct instance
    implicit def invokedOnType1 = at[Type1](data => data.toString)
    implicit def invokedOnType2 = at[Type2](data => data.toString)
    implicit def invokedOnType3 = at[Type3](data => data.toString)
  }

  def takesItem(item: Item): String = {
    thisIsAMethod(item)
  }
}

class Provider {
  Worker.takesItem(Coproduct[Item](Type1()) // ok
  Worker.takesItem(Coproduct[Item](WrongType()) // fails
}

The object thisIsAMethod creates a bunch of implicit type dependent functions that are defined at all the elements in the coproduct. If we add another option to our coproduct list, we’ll get a compiler error when we try and use the coproduct against the polymorphic function. This accomplishes the same thing as giving us the exhaustiveness check but its an even stronger guarantee as the build will fail.

While it is a lot of hoops to jump through, and can be a little mind bending, I’ve found that coproducts and polymorphic functions are a really nice addition to my scala toolbox. Being able to strongly enforce these kinds of contracts is pretty neat!

Mocking nested objects with mockito

Yes, I know its a code smell. But I live in the real world, and sometimes you need to mock nested objects. This is a scenario like:

when(a.b.c.d).thenReturn(e)

The usual pattern here is to create a mock for each object and return the previous mock:

val a = mock[A]
val b = mock[B]
val c = mock[C]
val d = mock[D]

when(a.b).thenReturn(b)
when(b.c).thenReturn(c)
when(c.d).thenReturn(d)

But again, in the real world the signatures are longer, the types are nastier, and its never quite so clean. I figured I’d sit down and solve this for myself once and for all and came up with:

import org.junit.runner.RunWith
import org.mockito.Mockito
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}

@RunWith(classOf[JUnitRunner])
class Tests extends FlatSpec with Matchers {
  "Mockito" should "proxy nested objects" in {
    val parent = Mocks.mock[Parent]

    Mockito.when(
      parent.
        mock(_.getChild1).
        mock(_.getChild2).
        mock(_.getChild3).
        value.doWork()
    ).thenReturn(3)

    parent.value.getChild1.getChild2.getChild3.doWork() shouldEqual 3
  }
}

class Child3 {
  def doWork(): Int = 0
}

class Child2 {
  def getChild3: Child3 = new Child3
}

class Child1 {
  def getChild2: Child2 = new Child2
}

class Parent {
  def getChild1: Child1 = new Child1
}

As you can see in the full test we can create some mocks object, and reference the call chain via extractor methods.

The actual mocker is really pretty simple, it just looks nasty cause of all the lambdas/manifests. All thats going on here is a way to pass the next object to a chain and extract it with a method. Then we can create a mock using the manifest and assign that mock to the source object via the lambda.

import org.mockito.Mockito

object Mocks {
  implicit def mock[T](implicit manifest: Manifest[T]) = new RichMockRoot[T]

  class RichMockRoot[T](implicit manifest: Manifest[T]) {
    val value = Mockito.mock[T](manifest.runtimeClass.asInstanceOf[Class[T]])

    def mock[Y](extractor: T => Y)(implicit manifest: Manifest[Y]): RichMock[Y] = {
      new RichMock[T](value, List(value)).mock(extractor)
    }
  }

  class RichMock[T](c: T, prevMocks: List[_]) {
    def mock[Y](extractor: T => Y)(implicit manifest: Manifest[Y]): RichMock[Y] = {
      val m = Mockito.mock[Y](manifest.runtimeClass.asInstanceOf[Class[Y]])

      Mockito.when(extractor(c)).thenReturn(m)

      new RichMock(m, prevMocks ++ List(m))
    }

    def value: T = c

    def mockChain[Y](idx: Int) = prevMocks(idx).asInstanceOf[Y]

    def head[Y] = mockChain[Y](0)
  }
}

The main idea here is just to hide away the whole “make b and have it return c” for you. You can even capture all the intermediate mocks in a list (I called it a mock chain), and expose the first element of the list with head. With a little bit of scala manifest magic you can even get around needing to pass class files around and can leverage the generic parameter (boy, feels almost like .NET!).

Extracting scala method names from objects with macros

I have a soft spot in me for AST’s ever since I went through the exercise of building my own language. Working in Java I missed the dynamic ability to get compile time information, though I knew it was available as part of the annotation processing pipleine during compilation (which is how lombok works). Scala has something similiar in the concept of macros: a way to hook into the compiler, manipulate or inspect the syntax tree, and rewrite or inject whatever you want. It’s a wonderfully elegant system that reminds me of Lisp/Clojure macros.

I ran into a situation (as always) where I really wanted to get the name of a function dynamically. i.e.

class Foo {
   val field: String = "" 
   def method(): Unit = {}
}

val name: String = ??.field // = "field"

In .NET this is pretty easy since at runtime you can create an expression tree which gives you the AST. But I haven’t been in .NET in a while, so off to macros I went!

First off, I found the documentation regarding macros to be lackluster. It’s either rudimentary with trivial examples, or the learning curve was steep and I was too lazy to read through all of it. Usually when I encounter scenarios like this I turn to exploratory programming, where I have a unit test that sets up a basic example and I leverage the debugger and intellij live REPL to poke through what I can and can’t do. Time to get set up.

First, I needed to create a new submodule in my multi module maven project that would contain my macro. The reason is that you can’t use macros in the same compilation unit that they are defined in. You can however, use macros in a macros test since the compiler compiles test sources different from regular sources.

That said, debugging macros is harder than normal because you aren’t debugging your running program, you are debugging the actual compiler. I found this blog post which was a life saver, even though it was missing a few minor pieces.

1. Set the main class to scala.tools.nsc.Main
2. Set the VM args to -Dscala.usejavacp=true
3. Set the program arguments to first point to the file containing the macro, then the file to compile that uses the macro:

-cp types.Types macros/src/main/scala/com/devshorts/common/macros/MethodNames.scala config/src/test/scala/config/ConfigProxySpec.scala

Now you can actually debug your macro!

First let me show the test

case class MethodNameTest(field1: Object) {
  def getFoo(arg: Object): Unit = {}
  def getFoo2(arg: Object, arg2: Object): Unit = {}
}

class MethodNamesMacroSpec extends FlatSpec with Matchers {
  "Names macro" should "extract from an function" in {
    methodName[MethodNameTest](_.field1) shouldEqual MethodName("field1")
  }

  it should "extract when the function contains an argument" in {
    methodName[MethodNameTest](_.getFoo(null)) shouldEqual MethodName("getFoo")
  }

  it should "extract when the function contains multiple argument" in {
    methodName[MethodNameTest](_.getFoo2(null, null)) shouldEqual MethodName("getFoo2")
  }

  it should "extract when the method is curried" in {
    methodName[MethodNameTest](m => m.getFoo2 _) shouldEqual MethodName("getFoo2")
  }
}

macro

methodName here is a macro that extracts the method name from a lambda passed in of the parameterized generic type. What’s nice about how scala set up their macros is you provide an alias for your macro such that you can re-use the macro but type it however you want.

object MethodNames {
  implicit def methodName[A](extractor: (A) => Any): MethodName = macro methodNamesMacro[A]

  def methodNamesMacro[A: c.WeakTypeTag](c: Context)(extractor: c.Expr[(A) => Any]): c.Expr[MethodName] = {
     ...
   }
}

I’ve made the methodName function take a generic and a function that uses that generic (even though no actual instance is ever passed in). The nice thing about this is I can re-use the macro typed as another function elsewhere. Imagine I want to pin [A] so people don’t have to type it. I can do exactly that!

case class Configuration (foo: String)

implicit def config(extractor: Configuration => Any): MethodName = macro MethodNames.methodNamesMacro[Configuration]

config(_.foo) == "foo"

At this point its time to build the bulk of the macro. The idea is to inspect parts of the AST and potentially walk it to find the pieces we want. Here’s what I ended up with:

def methodNamesMacro[A: c.WeakTypeTag](c: Context)(extractor: c.Expr[(A) => Any]): c.Expr[MethodName] = {
  import c.universe._

  @tailrec
  def resolveFunctionName(f: Function): String = {
    f.body match {
      // the function name
      case t: Select => t.name.decoded

      case t: Function =>
        resolveFunctionName(t)

      // an application of a function and extracting the name
      case t: Apply if t.fun.isInstanceOf[Select] =>
        t.fun.asInstanceOf[Select].name.decoded

      // curried lambda
      case t: Block if t.expr.isInstanceOf[Function] =>
        val func = t.expr.asInstanceOf[Function]

        resolveFunctionName(func)

      case _ => {
        throw new RuntimeException("Unable to resolve function name for expression: " + f.body)
      }
    }
  }

  val name = resolveFunctionName(extractor.tree.asInstanceOf[Function])

  val literal = c.Expr[String](Literal(Constant(name)))

  reify {
    MethodName(literal.splice)
  }
}

For more details on parts of the AST here is a great resource

In the first case, when we pass in methodName[Config](_.method) it gets mangled into a function with a body that is of x$1.method. The select indicates the x$1 instance and selects the method expression of it. This is an easy case.

In the block case that maps to when we call methodName[Config](c => c.thing _). In this case we have a function but its curried. In this scenario the function body is a block who’s inner expression is a function. But, the functions body of that inner function is an Apply.

Apply takes two arguments — a Select or an Ident for the function and a list of arguments

So that makes sense.

The rest is just helper methods to recurse.

The last piece of the puzzle is to create an instance of a string literal and splice it into a new expression returning the MethodName case class that contains the string literal.

All in all a fun afternoons worth of code and now I get semantic string safety. A great use case here can be to type configuration values or other string semantics with a trait. You can get compile time refactoring + type safety. Other use cases are things like database configurations and drivers (the .NET mongo driver uses expression trees to type an object to its underlying mongo collection).

Unit testing DNS failovers

Something that’s come up a few times in my career is the difficulty of validating if and when your code can handle actual DNS changes. A lot of times testing that you have the right JVM settings and that your 3rd party clients can handle it involves mucking with hosts files, nameservers, or stuff like Route53 and waiting around. Then its hard to automate and deterministically reproduce. However, you can hook into the DNS resolution in the JVM to control what gets resolved to what. And this way you can tweak the resolution in a test and see what breaks! I found some info at this blog post and cleaned it up a bit for usage in scala.

The magic sauce to pull this off is to make sure you override the default sun.net.spi.nameservice.NameServiceDescriptor. Internally in the InetAddress class it tries to load an instance of the interface NameServiceDescriptor using the Service loader mechanism. The service loader looks for resources in META-INF/services/fully.qualified.classname.to.override and instantiates whatever fully qualified class name is that class name override file.

For example, if we have

cat META-INF/services/sun.net.spi.nameservice.NameServiceDescriptor
io.paradoxical.test.dns.LocalNameServerDescriptor

Then the io.paradoxical.test.dns.LocalNameServerDescriptor will get created. Nice.

What does that class actually look like?

class LocalNameServerDescriptor extends NameServiceDescriptor {
  override def getType: String = "dns"

  override def createNameService(): NameService = {
    new LocalNameServer()
  }

  override def getProviderName: String = LocalNameServer.dnsName
}

The type is of dns and the name service implementation is our own class. The provider name is something we have custom defined as well below:

object LocalNameServer {
  Security.setProperty("networkaddress.cache.ttl", "0")

  protected val cache = new ConcurrentHashMap[String, String]()

  val dnsName = "local-dns"

  def use(): Unit = {
    System.setProperty("sun.net.spi.nameservice.provider.1", s"dns,${dnsName}")
  }

  def put(hostName: String, ip: String) = {
    cache.put(hostName, ip)
  }

  def remove(hostName: String) = {
    cache.remove(hostName)
  }
}

class LocalNameServer extends NameService {

  import LocalNameServer._

  val default = new DNSNameService()

  override def lookupAllHostAddr(name: String): Array[InetAddress] = {
    val ip = cache.get(name)
    if (ip != null && !ip.isEmpty) {
      InetAddress.getAllByName(ip)
    } else {
      default.lookupAllHostAddr(name)
    }
  }

  override def getHostByAddr(bytes: Array[Byte]): String = {
    default.getHostByAddr(bytes)
  }
}

Pretty simple. We have a cache that is stored in a singleton companion object with some helper methods on it, and all we do is delegate looking into the cache. If we can resolve the data in the cache we return it, otherwise just proxy it to the default resolver.

The use method sets a system property that says to use the dns resolver of name local-dns as the highest priority nameservice.provider.1 (lower numbers are higher priority)

Now we can write some tests and see if this works!

@RunWith(classOf[JUnitRunner])
class DnsTests extends FlatSpec with Matchers {
  LocalNameServer.use()

  "DNS" should "resolve" in {
    val google = resolve("www.google.com")

    google.getHostAddress shouldNot be("127.0.0.1")
  }

  it should "be overridable" in {
    LocalNameServer.put("www.google.com", "127.0.0.1")

    val google = resolve("www.google.com")

    google.getHostAddress should be("127.0.0.1")

    LocalNameServer.remove("www.google.com")
  }

  it should "be undoable" in {
    LocalNameServer.put("www.google.com", "127.0.0.1")

    val google = resolve("www.google.com")

    google.getHostAddress should be("127.0.0.1")

    LocalNameServer.remove("www.google.com")

    resolve("www.google.com").getHostAddress shouldNot be("127.0.0.1")
  }

  def resolve(name: String) = InetAddress.getByName(name)
}

Happy dns resolving!

Consistent hashing for fun

I think consistent hashing is pretty fascinating. It lets you define a ring of machines that shard out data by a hash value. Imagine that your hash space is 0 -> Int.Max, and you have 2 machines. Well one machine gets all values hashed from 0 -> Int.Max/2 and the other from Int.Max/2 -> Int.Max. Clever. This is one of the major algorithms of distributed systems like cassandra and dynamoDB.

For a good visualization, check out this blog post.

The fun stuff happens when you want to add replication and fault tolerance to your hashing. Now you need to have replicants and manage when machines join and add. When someone joins, you need to re-partition the space evenly and re-distribute the values that were previously held.

Something similar when you have a node leave, you need to make sure that whatever it was responsible for in its primray space AND the things it was responsible for as a secondary replicant, are re-redistributed amongst the remaining nodes.

But the beauty of consistent hashing is that the replication basically happens for free! And so does redistribution!

Since my new feature is in all in Scala, I figured I’d write something up to see how this might play out in scala.

For the impatient, the full source is here.

First I started with some data types

case class HashValue(value: String) extends AnyRef

case class HashKey(key: Int) extends AnyRef with Ordered[HashKey] {
  override def compare(that: HashKey): Int = key.compare(that.key)
}

object HashKey {
  def safe(key: Int) = new HashKey(Math.abs(key))
}

case class HashRange(minHash: HashKey, maxHash: HashKey) extends Ordered[HashRange] {
  override def compare(that: HashRange): Int = minHash.compare(that.minHash)
}

I chose to wrap the key in a positive space since it made things slightly easier. In reality you want to use md5 or some actual hashing function, but I relied on the hash code here.

And then a machine to hold values:

import scala.collection.immutable.TreeMap

class Machine[TValue](val id: String) {
  private var map: TreeMap[HashKey, TValue] = new TreeMap[HashKey, TValue]()

  def add(key: HashKey, value: TValue): Unit = {
    map = map + (key -> value)
  }

  def get(hashKey: HashKey): Option[TValue] = {
    map.get(hashKey)
  }

  def getValuesInHashRange(hashRange: HashRange): Seq[(HashKey, TValue)] ={
    map.range(hashRange.minHash, hashRange.maxHash).toSeq
  }

  def keepOnly(hashRanges: Seq[HashRange]): Seq[(HashKey, TValue)] = {
    val keepOnly: TreeMap[HashKey, TValue] =
      hashRanges
      .map(range => map.range(range.minHash, range.maxHash))
      .fold(map.empty) { (tree1, tree2) => tree1 ++ tree2 }

    val dropped = map.filter { case (k, v) => !keepOnly.contains(k) }

    map = keepOnly
    
    dropped.toSeq
  }
}

A machine keeps a sorted tree map of hash values. This lets me really quickly get things within ranges. For example, when we re-partition a machine, it’s no longer responsible for the entire range set that it was before. But it may still be responsible for parts of it. So we want to be able to tell a machine hey, keep ranges 0-5, 12-20, but drop everything else. The tree map lets me do this really nicely.

Now for the fun part, the actual consistent hashing stuff.

Given a set of machines, we need to define how the circular partitions is defined

private def getPartitions(machines: Seq[Machine[TValue]]): Seq[(HashRange, Machine[TValue])] = {
  val replicatedRanges: Seq[HashRange] = Stream.continually(defineRanges(machines.size)).flatten

  val infiteMachines: Stream[Machine[TValue]] = 
       Stream.continually(machines.flatMap(List.fill(replicas)(_))).flatten

  replicatedRanges
  .zip(infiteMachines)
  .take(machines.size * replicas)
  .toList
}

What we want to make sure is that each node sits on multiple ranges, this gives us the replication factor. To do that I’ve duplicated the machines in the list by the replication factor, and made sure all the lists cycle around indefinteily, so while they are not evenly distributed around the ring (they are clustered) they do provide fault tolerance

Lets look at what it takes to put a value into the ring:

private def put(hashkey: HashKey, value: TValue): Unit = {
  getReplicas(hashkey).foreach(_.add(hashkey, value))
}

private def getReplicas(hashKey: HashKey): Seq[Machine[TValue]] = {
  partitions
  .filter { case (range, machine) => hashKey >= range.minHash && hashKey < range.maxHash }
  .map(_._2)
}

We need to make sure that for each replica in the ring that sits on a hash range, that we insert it into that machine. Thats pretty easy, though we can improve this later with better lookups

Lets look at a get

def get(hashKey: TKey): Option[TValue] = {
  val key = HashKey.safe(hashKey.hashCode())

  getReplicas(key)
  .map(_.get(key))
  .collectFirst { case Some(x) => x }
}

Also similar. Go through all the replicas, and find the first one to return a value

Now lets look how to add a machine into the ring

def addMachine(): Machine[TValue] = {
  id += 1

  val newMachine = new Machine[TValue]("machine-" + id)

  val oldMachines = partitions.map(_._2).distinct

  partitions = getPartitions(Seq(newMachine) ++ oldMachines)

  redistribute(partitions)

  newMachine
}

So we first create a new list of machines, and then ask how to re-partition the ring. Then the keys in the ring need to redistribute themselves so that only the nodes who are responsible for certain ranges contain those keys

def redistribute(newPartitions: Seq[(HashRange, Machine[TValue])]) = {
  newPartitions.groupBy { case (range, machine) => machine }
  .flatMap { case (machine, ranges) => machine.keepOnly(ranges.map(_._1)) }
  .foreach { case (k, v) => put(k, v) }
}

Redistributing isn’t that complicated either. We group all the nodes in the ring by the machine they are on, then for each machine we tell it to only keep values that are in its replicas. The machine keepOnly function takes a list of ranges and will remove and return anything not in those ranges. We can now aggregate all the things that are “emitted” by the machines and re insert them into the right location

Removing a machine is really similiar

def removeMachine(machine: Machine[TValue]): Unit = {
  val remainingMachines = partitions.filter { case (r, m) => !m.eq(machine) }.map(_._2)

  partitions = getPartitions(remainingMachines.distinct)

  redistribute(partitions)
}

And thats all there is to it! Now we have a fast, simple consistent hasher.