Category: Code

This post was originally posted on my company’s engineering blog here: http://engineering.curalate.com/2018/05/16/productionalizing-ecs.html

In January of last year we decided as a company to move towards containerization and began a migration to move onto AWS ECS. We pushed to move to containers, and off of AMI based VM deployments, in order to speed up our deployments, simplify our build tooling (since it only has to work on containers), get the benefits of being able to run our production code in a sandbox even locally on our dev machines (something you can’t really do easily with AMI’s), and lower our costs by getting more out of the resources we’re already paying for.

However, making ECS production ready was actually quite the challenge. In this post I’ll discuss:

  • Scaling the underlying ECS cluster
  • Upgrading the backing cluster images
  • Monitoring our containers
  • Cleanup of images, container artifacts
  • Remote debugging of our JVM processes

Which is a short summary of the things we encountered and our solutions, finally making ECS a set it and forget it system.

Scaling the cluster

The first thing we struggled with was how to scale our cluster. ECS is a container orchestrator, analogous to Kubernetes or Rancher, but you still need to have a set of EC2 machines to run as a cluster. The machines all need to have the ECS Docker agent installed on it and ECS doesn’t provide a way to automatically scale and manage your cluster for you. While this has changed recently with the announcement of Fargate, Fargate’s pricing makes it cost prohibitive for organizations with a lot of containers.

The general recommendation that AWS gave with ECS was to scale based on CPU reservation limit OR memory limit. There’s no clear way to scale with a combination of the two, since auto scaling rules need to apply to a single CloudWatch metric or you face potential thrashing.

Our first attempt on scaling was to try and scale on container placement failures. ECS logs a message when containers are unable to be placed due to constraints (not enough memory on the cluster, or not enough CPU reservation left), but there is no way to actually capture that event programmatically (see this github issue). The goal here was to not preemptively scale, but instead scale on actual pressure. This way we wouldn’t be overpaying for machines in the cluster that aren’t heavily used. However we had to discard this idea since it wasn’t possible due to API limitations.

Our second attempt, and one that we have been using now in production, is to use an AWS Lambda function to monitor the memory and CPU reservation of the cluster and emit a compound metric to CloudWatch that we can scale on. We set a compound threshold with the logic of:

  1. If either memory or CPU is above the max threshold, scale up
  2. Else if both memory and CPU are below the min, scale down.
  3. Else stay the same

We represent a scale up event with a CloudWatch value of 2, a scale down as value 0 and otherwise the nominal state as value 1.

The code for that is shown below:

package com.curalate.lambdas.ecs_scaling

import com.amazonaws.services.CloudWatch.AmazonCloudWatch
import com.amazonaws.services.CloudWatch.model._
import com.curalate.lambdas.ecs_scaling.ScaleMetric.ScaleMetric
import com.curalate.lambdas.ecs_scaling.config.ClusterScaling
import org.joda.time.DateTime
import scala.collection.JavaConverters._

object ScaleMetric extends Enumeration {
  type ScaleMetric = Value

  val ScaleDown = Value(0)
  val StaySame = Value(1)
  val ScaleUp = Value(2)
}

case class ClusterMetric(
  clusterName: String,
  scaleMetric: ScaleMetric,
  periodSeconds: Int
)

class MetricResolver(clusterName: String, cloudWatch: AmazonCloudWatch) {
  private lazy val now = DateTime.now()
  private lazy val start = now.minusMinutes(3)

  private val dimension = new Dimension().withName("ClusterName").withValue(clusterName)

  val periodSeconds = 60

  protected def getMetric(name: String): Double = {
    val baseRequest = new GetMetricStatisticsRequest().withDimensions(dimension)

    cloudWatch.getMetricStatistics(baseRequest.
      withMetricName(name).
      withNamespace("AWS/ECS").
      withStartTime(start.toDate).
      withEndTime(now.toDate).
      withPeriod(periodSeconds).
      withStatistics(Statistic.Maximum)
    ).getDatapoints.asScala.head.getMaximum
  }

  lazy val currentCpuReservation: Double = getMetric("CPUReservation")

  lazy val currentMemoryReservation: Double = getMetric("MemoryReservation")
}

class ClusterStatus(
  scaling: ClusterScaling,
  metricResolver: MetricResolver
) {

  protected val logger = org.slf4j.LoggerFactory.getLogger(getClass)

  def getCompositeMetric(): ClusterMetric = {
    logger.info(s"CPU: ${metricResolver.currentCpuReservation}, memory: ${metricResolver.currentMemoryReservation}")

    val state =
      if (metricResolver.currentCpuReservation >= scaling.CPU.max || metricResolver.currentMemoryReservation >= scaling.memory.max) {
        ScaleMetric.ScaleUp
      }
      else if (metricResolver.currentCpuReservation <= scaling.CPU.min && metricResolver.currentMemoryReservation <= scaling.memory.min) {
        ScaleMetric.ScaleDown
      } else {
        ScaleMetric.StaySame
      }

    ClusterMetric(scaling.name, state, metricResolver.periodSeconds)
  }
}

class CloudwatchEmitter(cloudWatch: AmazonCloudWatch) {
  def writeMetric(metric: ClusterMetric): Unit = {
    val cluster = new Dimension().withName("ClusterName").withValue(metric.clusterName)

    cloudWatch.putMetricData(new PutMetricDataRequest().
      withMetricData(new MetricDatum().
        withMetricName("ScaleStatus").
        withDimensions(cluster).
        withTimestamp(DateTime.now().toDate).
        withStorageResolution(metric.periodSeconds).
        withValue(metric.scaleMetric.id.toDouble)
      ).withNamespace("Curalate/AutoScaling"))
  }
}

Wiring in our ECS cluster to autoscale on this metric value in our Terraform configuration looks like

resource "aws_cloudwatch_metric_alarm" "cluster_scale_status_high_blue" {
  count               = "${var.autoscale_enabled}"
  alarm_name          = "${var.cluster_name}_ScaleStatus_high_blue"
  comparison_operator = "${lookup(var.alarm_high, "comparison_operator")}"
  evaluation_periods  = "${lookup(var.alarm_high, "evaluation_periods")}"
  period              = "${lookup(var.alarm_high, "period")}"
  statistic           = "${lookup(var.alarm_high, "statistic")}"
  threshold           = "${lookup(var.alarm_high, "threshold")}"
  metric_name         = "ScaleStatus"
  namespace           = "Curalate/AutoScaling"

  dimensions {
    ClusterName = "${var.cluster_name}"
  }

  alarm_description = "High cluster resource usage"
  alarm_actions     = ["${aws_autoscaling_policy.scale_up_blue.arn}"]
}

resource "aws_cloudwatch_metric_alarm" "cluster_scale_status_low_blue" {
  count               = "${var.autoscale_enabled}"
  alarm_name          = "${var.cluster_name}_ScaleStatus_low_blue"
  comparison_operator = "${lookup(var.alarm_low, "comparison_operator")}"
  evaluation_periods  = "${lookup(var.alarm_low, "evaluation_periods")}"
  period              = "${lookup(var.alarm_low, "period")}"
  statistic           = "${lookup(var.alarm_low, "statistic")}"
  threshold           = "${lookup(var.alarm_low, "threshold")}"
  metric_name         = "ScaleStatus"
  namespace           = "Curalate/AutoScaling"

  dimensions {
    ClusterName = "${var.cluster_name}"
  }

  alarm_description = "Low cluster resource usage"
  alarm_actions     = ["${aws_autoscaling_policy.scale_down_blue.arn}"]
}

variable "alarm_high" {
  type = "map"

  default = {
    comparison_operator = "GreaterThanThreshold"
    evaluation_periods  = 4
    period              = 60
    statistic           = "Maximum"
    threshold           = 1
  }
}

variable "alarm_low" {
  type = "map"

  default = {
    comparison_operator = "LessThanThreshold"
    evaluation_periods  = 10
    period              = 60
    statistic           = "Maximum"
    threshold           = 1
  }
}

We made our Lambda dynamically configurable by loading data from our configuration system and allowing us to onboard new clusters to monitor, and to dynamically tune the values of the thresholds.

You can see this in effect here:

Host draining and ECS rescheduling

This leads us to another problem. When the ASG goes to down-scale from a CloudWatch event, it puts the boxes into DRAINING. However, draining doesn’t necessarily mean that existing services have been re-scheduled on other boxes! It just means that connections are drained from the existing hosts, and that the ECS scheduler now needs to move the containers elsewhere. This can be problematic in that if you are down-scaling 2 hosts that are serving both of your HA containers, then you can now have a situation where your service is at 0 instances! To solve this, we wired up a custom ASG lifecycle hook that polls the draining machines and makes sure that the containers are fully stopped, and that the active cluster contains at least the min running instances of each service (where a service defines its minimum acceptable threshold and its min allowed running instances). For example if a service can run at 50% capacity and its min is set to 20, then we need to verify that there are at least 10 active before we fully allow the box to be removed, giving the ECS scheduler time to move things around.

Cluster upgrades

Solving cluster scaling and draining just introduced the next question: how do we do zero downtime cluster upgrades? Because we now have many services running on the cluster, the blast radius for failure is much higher. If we fail a cluster upgrade we could take many of the services at Curalate down with us.

Our solution, while not fully automated, is beautiful in its simplicity. Leveraging the draining Lambda, we keep all our clusters grouped into ASGs labeled blue and green. To upgrade, we spin up the unused cluster with new backing AMI’s and wait for it to be steady state. Then we tear down the old cluster and rely on the draining Lambda to prevent any race issues with the ECS scheduler.

Each time we need to do a cluster upgrade, either for security updates, base AMI changes, or other infrastructure wide sweeping changes, we do a manual toggle using Terraform to drive the base changes.

For example, our Terraform ECS cluster module in QA looks like this

module "ecs_cluster_default_bg" {
  source = "github.com/curalate/infra-modules.git//aws-ecs-cluster-blue-green?ref=2018.03.07.20.09.12"

  cluster_name                       = "${aws_ecs_cluster.default.name}"
  availability_zones                 = "${data.terraform_remote_state.remote_env_state.availability_zones}"
  environment                        = "${var.environment}"
  region                             = "${var.region}"
  iam_instance_profile               = "${data.terraform_remote_state.remote_env_state.iam_instance_profile}"
  key_name                           = "${data.terraform_remote_state.remote_env_state.key_name}"
  security_group_ids                 = "${data.terraform_remote_state.remote_env_state.ecs_security_groups}"
  subnet_ids                         = "${data.terraform_remote_state.remote_env_state.public_subnet_ids}"
  drain_hook_notification_target_arn = "${module.drain_hook.notification_target_arn}"
  drain_hook_role_arn                = "${module.drain_hook.role_arn}"
  autoscale_enabled                  = true

  root_volume_size = 50
  team             = "devops"

  blue = {
    image_id      = "ami-5ac76b27"
    instance_type = "c4.2xlarge"

    min_size         = 2
    max_size         = 15
    desired_capacity = 5
  }

  green = {
    image_id      = "ami-c868b6b5"
    instance_type = "c3.2xlarge"

    min_size         = 0
    max_size         = 0
    desired_capacity = 0
  }
}

Monitoring with statsd

Curalate uses Datadog as our graph visualization tool and we send metrics to datadog via the dogstatsd agent that is installed on our boxes. Applications emit UDP events to the dogstatsd agent which then aggregates and sends messages to datadog over TCP.

In the containerized world we had 3 options for sending metrics

  1. Send directly from the app
  2. Deploy all containers with a sidecar of statsd
  3. Proxy messages to the host box and leave dogstatsd on the host

We elected for option 3 since option 1 makes it difficult to do sweeping upgrades and option 2 uses extra resources on ECS we didn’t want.

However we needed a way to determistically write messages from a Docker container to its host. To do this we leveraged the docker0 bridge network

# returns x.x.x.1 base ip of the docker0 bridge IP
get_data_dog_host(){
    # extracts the ip address from eth0 of 1.2.3.4 and splits off the last octet (returning 1.2.3)
    BASE_IP=`ifconfig | grep eth0 -A 1 | grep inet | awk '{print $2}' | sed "s/addr://" | cut -d. -f1-3`

    echo "${BASE_IP}.1"
}

And we configure our apps to use this IP to send messages to.

Cleanup

One thing we chose to do was to volume mount our log folders to the host system for semi-archival reasons. By mounting our application logs to the host, if the container crashed or was removed from Docker, we’d still have a record of what happened.

That said, containers are transient; they come and go all the time. The first question we had was “where do logs go?”. What folder do we mount them to? For us, we chose to mount all container logs in the following schema:

/var/log/curalate/<service-name>/containers/<constainer-sha>

This way we can back correlate the logs for a particular container in a particular folder. If we have multiple instances of the same image running a host the logs don’t stomp on each other.

We normally have a log rotator on our AMI boxes that handles long running log files, however in our AMI based deployments machines and clusters are immutable and singular. This means that as we do new deploys the old logs are removed with the box and only one service is allowed to sit on one EC2 host.

In the new world the infrastructure is immutable at the container level, not the VM level. So in this sense, the base VM also has a log rotator to rotate all the container logs, but we didn’t account for the fact that services will start and stop and deploy hundreds of times daily leaving hundreds of rotated log files in stale folders.

After the first disk alert though we added the following cron script:

buntu@ip-172-17-50-242:~$ crontab -l
# Chef Name: container-log-prune
*/10 * * * * /opt/curalate/docker/prune.rb
# Chef Name: volume-and-image-prune
0 0 * * * docker images -q | xargs docker rmi && docker system prune -f

We have 2 things happening here, the first is a Ruby script that checks for running containers and then deletes all container IDs in the recursive log glob that aren’t active anymore. We run this once an hour.

#!/usr/bin/env ruby

require 'set'
require 'fileutils'
require 'English'

containers = `docker ps --format '{{.ID}}'`.split("\n").to_set

unless $CHILD_STATUS.success?
  puts 'Failed to query docker'
  exit 1
end

dirs = Dir.glob('/var/log/**/containers/*')

to_delete = dirs.reject do |d|
  (containers.include? File.basename(d))
end

to_delete.each do |d|
  puts "Deleting #{d}"

  FileUtils.rm_rf d
end

The second script is pretty straightforward and we leverage the Docker system prune command to remove old volume overlay data, images that are unused, and any other system cleanup stuff. We run this daily because we want to leverage the existing images that are already downloaded on a box to speed up autoscaling. We’re ok with taking a once daily hit to download the image base layers at midnight if necessary during a scaling event.

JMXMP

JMX is a critical tool in our toolbox here at Curalate as nearly all of our services and applications are written using Scala on the JVM. Normally in our AMI deployments we can singularly control the ports that are open and they are determistic. If we open port 5555 it’s always open on that box. However when we start to have many services run on the same host, we need to leverage dockers dynamic port routing which makes knowing which port maps to what more difficult.

Normally this isn’t really an issue, as services that do need to expose ports to either each other or the public are routed through an ALB that manages that for us. But JMX is a different beast. JMX, in its ultimate wisdom, requires a 2 port handshake in order to connect. What this means is that the port you connect to on JMX is not the ultimate port you communicate over in JMX. When you make a JMX connection to the connection port it replies back with the communication port and you then communicate on that.

But in the world of dynamic port mappings, we can find the first port from the dynamic mapping, but there is no way for us to know the second port. This is because the container itself has no information about what its port mapping is, for all it knows its port is what it says it is!

Thankfully there is a solution using an extension to JMX called JMXMP. With some research from this blog post we rolled a jmxmp java agent:

package com.curalate.agents.jmxmp;

import javax.management.MBeanServer;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
import java.lang.instrument.Instrumentation;
import java.lang.management.ManagementFactory;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class Agent {
    public static void premain(String agentArgs, Instrumentation inst) {
        Boolean enableLogging = Boolean.valueOf(System.getProperty("javax.management.remote.jmx.enable_logging", "false"));

        Boolean bindEth0 = Boolean.valueOf(System.getProperty("javax.management.remote.jmx.bind_eth0", "true"));

        try {
            Map<String, String> jmxEnvironment = new HashMap<String, String>();

            jmxEnvironment.put("jmx.remote.server.address.wildcard", "false");

            final String defaultHostAddress = (bindEth0 ? getEth0() : getLocalHost()).replace("/","");

            JMXServiceURL jmxUrl = new JMXServiceURL(System.getProperty("javax.management.remote.jmxmp.url", "service:jmx:jmxmp://" + defaultHostAddress + ":5555"));

            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();

            JMXConnectorServer jmxRemoteServer = JMXConnectorServerFactory.newJMXConnectorServer(jmxUrl, jmxEnvironment, mbs);

            if (enableLogging) {
                System.out.println("Starting jmxmp agent on '" + jmxUrl + "'");
            }

            jmxRemoteServer.start();
        }
        catch (Throwable e) {
            if (enableLogging) {
                e.printStackTrace();
            }
        }
    }

    private static String getEth0() {
        try {
            return Collections.list(NetworkInterface.getByName("eth0").getInetAddresses())
                              .stream()
                              .filter(x -> !x.isLoopbackAddress() && x instanceof Inet4Address)
                              .findFirst()
                              .map(Object::toString)
                              .orElse("127.0.0.1");
        }
        catch (Exception e) {
            return "127.0.0.1";
        }
    }

    private static String getLocalHost() {
        try {
            return InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            return "127.0.0.1";
        }
    }
}

That we bundle in all our service startups:

exec java -agentpath:/usr/local/lib/libheapster.so -javaagent:agents/jmxmp-agent.jar $JVM_OPTS $JVM_ARGS -jar

JMXMP does basically the same thing as JMX, except it only requires one port to be open. By standardizing our ports on port 5555 we can look up the 5555 port mapping in ECS and connect to it via JMXMP and get all our “favorite” JMX goodies (if you’re doing JMX you’re already in a bad place).

For full reference all our dockerized java apps have a main entrypoint that Docker executes which is shown below. This allows us some sane default JVM settings but also exposes a way for us to manually override any of the settings via the JVM_ARGS env var (which we can set during our Terraform deployments)

#!/usr/bin/env bash

HOST_IP="localhost"

# Entrypoint for service start
main() {
    set_host_ip

    DATADOG_HOST=`get_data_dog_host`

    # location the fat jar
    BIN_JAR=`ls /app/bin/*.jar | head`

    LOG_PATH="/var/log/${HOSTNAME}"

    mkdir -p ${LOG_PATH}
    mkdir -p /heap_dumps

    JVM_OPTS="""
        -server \
        -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/heap_dumps \
        -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled \
        -Xmx512m -Xms512m \
        -XX:+ScavengeBeforeFullGC -XX:+CMSScavengeBeforeRemark \
        -Dsun.net.inetaddr.ttl=5 \
        -Dcom.sun.management.jmxremote.port=1045 \
        -Dcom.sun.management.jmxremote.authenticate=false \
        -Dcom.sun.management.jmxremote.ssl=false \
        -Dcontainer.id=${HOSTNAME} \
        -Dhostname=${HOST_IP} \
        -Dlog.service.output=${LOG_PATH}/service.log \
        -Dlog.access.output=${LOG_PATH}/access.log \
        -Dloggly.enabled=${LOGGLY_ENABLED} \
        -Ddatadog.host=${DATADOG_HOST} \
        -Ddatadog.defaultTags=application:${CLOUD_APP}
    """

    exec java -agentpath:/usr/local/lib/libheapster.so -javaagent:agents/jmxmp-agent.jar $JVM_OPTS $JVM_ARGS -jar ${BIN_JAR} $@
}

# Set the host IP variable of the EC2 host instance
# by querying the EC2 metadata api
# if there is no response after the timeout we'll default to localhost
set_host_ip () {
    if [ "200" == "$(/usr/bin/curl --connect-timeout 2 --max-time 2 -s -o /dev/null -w "%{http_code}" 169.254.169.254/latest/meta-data/local-ipv4)" ]
    then
        HOST_IP=$(curl 169.254.169.254/latest/meta-data/local-ipv4)
    else
        HOST_IP="$(hostname)"

        if [ "${HOST_IP}" = "${HOSTNAME}" ]
        then
            HOST_IP="localhost"
        fi
    fi
}

# returns x.x.x.1 base ip of the docker0 bridge IP
get_data_dog_host(){
    # extracts the ip address from eth0 of 1.2.3.4 and splits off the last octet (returning 1.2.3)
    BASE_IP=`ifconfig | grep eth0 -A 1 | grep inet | awk '{print $2}' | sed "s/addr://" | cut -d. -f1-3`

    echo "${BASE_IP}.1"
}

# execute main
main $@

Choosing how to group a cluster

One thing we wrestled with is how to choose where a service will go. For the most part we have a default cluster comprised of c4.2xl‘s that everyone is allowed to deploy to.

I wanted to call out that choosing what service goes on what cluster and what machine types comprise a cluster can be tricky. For our GPU based services, the choice is obvious in that they go onto a cluster that has GPU acceleration. For other clusters we tried smaller machines with fewer containers, and we tried larger machines with more containers. We found that we preferred fewer larger machines since most of our services are not running at full capacity, so they get the benefit of extra IO and memory without overloading the host system. With smaller boxes we had less headroom and it was more difficult to pack services with varying degrees of memory/CPU reservation necessities.

On that note however, we’ve also chosen to segment some high priority applications onto their own clusters. These are services that under no circumstances can fail, or require more than average resources (whether IO or otherwise), or are particularly unstable. While we don’t get the cost savings by binpacking services onto that cluster, we still get the fast deploy/rollback/scaling properties with containers so we still consider it a net win.

Conclusion

ECS was really easy to get started on, but as with any production system there are always gotcha’s. Overall we’re really pleased with the experience, even though it wasn’t pain free. In the end, we can now deploy in seconds, rollback in seconds, and still enjoy a pseudo immutable infrastructure that is simple to reason about as well as locally reproducible!

Continue reading

Debugging “Maximum String literal length exceeded” with scala

Today I ran into a fascinating bug. We use ficus as a HOCON auto parser for scala. It works great, because parsing configurations into strongly typed case classes is annoying. Ficus works by using a macro to invoke implicitly in scope Reader[T] classes for data types and recursively builds the nested parser.

I went to create a test for a new custom field I added to our config:

class ProductConfigTests extends FlatSpec {
  "Configs" should "be valid in QA" in {
    assert(ConfigLoader.verify(ProductsConfig, Environment.QA).isSuccess)
  }
}

Our config verifier just invokes the hocon parser and makes sure it doesn’t throw an error. ProductsConfig has a lot of fields to it, and I recently added a new one. Suddenly the test broke with the following error:

error] Error while emitting com/services/products/service/tests/ConfigTests
[error] Maximum String literal length exceeded
[error] one error found
[error] (server/test:compileIncremental) Compilation failed
[error] Total time: 359 s, completed May 10, 2018 4:56:02 PM
> test:compile
[info] Compiling 36 Scala sources to /Users/antonkropp/src/products/server/target/scala-2.12/test-classes...
java.lang.IllegalArgumentException: Maximum String literal length exceeded
	at scala.tools.asm.ByteVector.putUTF8(ByteVector.java:213)
	at scala.tools.asm.ClassWriter.newUTF8(ClassWriter.java:1114)
	at scala.tools.asm.ClassWriter.newString(ClassWriter.java:1582)
	at scala.tools.asm.ClassWriter.newConstItem(ClassWriter.java:1064)
	at scala.tools.asm.MethodWriter.visitLdcInsn(MethodWriter.java:1187)
	at scala.tools.asm.tree.LdcInsnNode.accept(LdcInsnNode.java:71)
	at scala.tools.asm.tree.InsnList.accept(InsnList.java:162)
	at scala.tools.asm.tree.MethodNode.accept(MethodNode.java:820)
	at scala.tools.asm.tree.MethodNode.accept(MethodNode.java:730)

Wat?

I fired up sbt -jvm-debug 5005 and attached to the compiler.

I can def see that there is some sort of class being written with a large const string. But why? I’d never seen this before.

I went to another service that has a test exactly like this for its config and used cfr to decompile the generated scala files:

antonkropp at combaticus in ~/src/curalate/queue-batcher/server/target/scala-2.12/test-classes/com/curalate/services/queuebatcher/service/tests (devx/minimize-io-calls)
$ ls
total 1152
-rw-r--r--  1 antonkropp  staff   47987 May 10 11:20 BatchTrackerTests.class
-rw-r--r--  1 antonkropp  staff  145293 May 10 13:34 BitGroupTests.class
-rw-r--r--  1 antonkropp  staff    3112 May  9 13:23 ConfigTests$$anon$1$$anon$2$$anon$3.class
-rw-r--r--  1 antonkropp  staff    3553 May  9 13:23 ConfigTests$$anon$1$$anon$2$$anon$4$$anon$5.class
-rw-r--r--  1 antonkropp  staff    5190 May  9 13:23 ConfigTests$$anon$1$$anon$2$$anon$4.class
-rw-r--r--  1 antonkropp  staff    3627 May  9 13:23 ConfigTests$$anon$1$$anon$2$$anon$6.class
-rw-r--r--  1 antonkropp  staff    3906 May  9 13:23 ConfigTests$$anon$1$$anon$2.class
-rw-r--r--  1 antonkropp  staff    4904 May  9 13:23 ConfigTests$$anon$1$$anon$7.class
-rw-r--r--  1 antonkropp  staff    4598 May  9 13:23 ConfigTests$$anon$1$$anon$8.class
-rw-r--r--  1 antonkropp  staff    5063 May  9 13:23 ConfigTests$$anon$1.class
-rw-r--r--  1 antonkropp  staff    3125 May  9 13:23 ConfigTests$$anon$17$$anon$18$$anon$19.class
-rw-r--r--  1 antonkropp  staff    3573 May  9 13:23 ConfigTests$$anon$17$$anon$18$$anon$20$$anon$21.class
-rw-r--r--  1 antonkropp  staff    5213 May  9 13:23 ConfigTests$$anon$17$$anon$18$$anon$20.class
-rw-r--r--  1 antonkropp  staff    3640 May  9 13:23 ConfigTests$$anon$17$$anon$18$$anon$22.class
-rw-r--r--  1 antonkropp  staff    3924 May  9 13:23 ConfigTests$$anon$17$$anon$18.class
-rw-r--r--  1 antonkropp  staff    4914 May  9 13:23 ConfigTests$$anon$17$$anon$23.class
-rw-r--r--  1 antonkropp  staff    4606 May  9 13:23 ConfigTests$$anon$17$$anon$24.class
-rw-r--r--  1 antonkropp  staff    5073 May  9 13:23 ConfigTests$$anon$17.class
-rw-r--r--  1 antonkropp  staff    3119 May  9 13:23 ConfigTests$$anon$9$$anon$10$$anon$11.class
-rw-r--r--  1 antonkropp  staff    3566 May  9 13:23 ConfigTests$$anon$9$$anon$10$$anon$12$$anon$13.class
-rw-r--r--  1 antonkropp  staff    5205 May  9 13:23 ConfigTests$$anon$9$$anon$10$$anon$12.class
-rw-r--r--  1 antonkropp  staff    3634 May  9 13:23 ConfigTests$$anon$9$$anon$10$$anon$14.class
-rw-r--r--  1 antonkropp  staff    3915 May  9 13:23 ConfigTests$$anon$9$$anon$10.class
-rw-r--r--  1 antonkropp  staff    4909 May  9 13:23 ConfigTests$$anon$9$$anon$15.class
-rw-r--r--  1 antonkropp  staff    4601 May  9 13:23 ConfigTests$$anon$9$$anon$16.class
-rw-r--r--  1 antonkropp  staff    5066 May  9 13:23 ConfigTests$$anon$9.class
-rw-r--r--  1 antonkropp  staff   87180 May  9 13:23 ConfigTests.class
-rw-r--r--  1 antonkropp  staff   69451 May  9 13:23 DbTests.class
-rw-r--r--  1 antonkropp  staff   12985 May  9 13:23 MysqlTests.class
-rw-r--r--  1 antonkropp  staff   68418 May 10 12:40 Tests.class
drwxr-xr-x  4 antonkropp  staff     128 May  9 13:23 db
drwxr-xr-x  9 antonkropp  staff     288 May  9 13:23 modules

$ java -jar ~/tools/cfr.jar ConfigTests.class

1000 lines later, I can see

So something is putting in a large string of the configuration parser compiled into the class file.

I checked the ficus source code and its not it, so it must be something with the test.

Turns out assert is a macro from scalatest:

def assert(condition: Boolean)(implicit prettifier: Prettifier, pos: source.Position): Assertion = macro AssertionsMacro.assert

Where the macro

def assert(context: Context)(condition: context.Expr[Boolean])(prettifier: context.Expr[Prettifier], pos: context.Expr[source.Position]): context.Expr[Assertion] =
    new BooleanMacro[context.type](context, "assertionsHelper").genMacro[Assertion](condition, "macroAssert", context.literal(""), prettifier, pos)

Is looking for an implicit position.

Position is from scalactic which comes with scalatest

case class Position(fileName: String, filePathname: String, lineNumber: Int)

/**
 * Companion object for <code>Position</code> that defines an implicit
 * method that uses a macro to grab the enclosing position.
 */
object Position {

  import scala.language.experimental.macros

  /**
   * Implicit method, implemented with a macro, that returns the enclosing
   * source position where it is invoked.
   *
   * @return the enclosing source position
   */
  implicit def here: Position = macro PositionMacro.genPosition
}

And here we can ascertain that the macro expansion of the ficus config parser is being captured by the position file macro and auto compiled into the assert statement!

Changing the test to be

class ProductConfigTests extends FlatSpec {
  "Configs" should "be valid in QA" in {
    validate(ConfigLoader.verify(ProductsConfig, Environment.QA).isSuccess)
  }

  /**
   * This validate function needs to exist because this bug is amazing.
   *
   * `assert` is a macro from scalatest that automatically compiles the contextual source tree
   * into the assert, so that you can get line number and metadata context if the line fails.
   *
   * The ficus macro expander for ProductConfig is larger than 65k characters, which is normally fine
   * for code, however since scalatest tries to compile this > 65k anonymous class tree as a _string_
   * it breaks the java compiler!
   *
   * By breaking the function scope and having the macro create a closure around the _validate_ block
   * it no longer violates the 65k static string constraint
   */
  private def validate(block: => Boolean): Unit = {
    assert(block)
  }
}

Now makes the test pass. What a day.

AETR an open source workflow engine

For the past several years I’ve been thinking about the idea of an open source workflow execution engine. Something like AWS workflow but simpler. No need to upload python, or javascript, or whatever. Just call an API with a callback url, and when the API completes its step, callback to the coordinator with a payload. Have the coordinator then send that payload to the next step in the workflow, etc.

This kind of simplified workflow process is really common and I keep running into it at different places that I work at. For example, my company ingests client catalogs to augment imagery with their SKU numbers and other metadata. However that ingestion process is really fragmented and asynchronous. There’s an ingestion step, following that there is a normalization step, then a processing step, then an indexing step, etc. In the happy case everyone is hooked together with a queue pipeline where once ingestion is done it publishes a message to the normalizer, etc. But what happens when you want to take the principal components of this pipeline and create an adhoc pipeline? We don’t necessarily want the ingestor to always write to the normalizer. It’d be great to be able to compose these steps into different step trees that can own their own flow.

This is what AETR lets you do.

How it works

The primary building blocks in AETR are

  1. A step tree
  2. A run tree

A step tree is literally a tree structure that represents what a sequence of steps is. Leaf nodes in the tree are all API actions, and parent nodes in the tree are either a sequential or a parallel parent. What this means is you can have trees like this:

Sequential
 |- Sequential
    |- API1
    |- API2
 |- Parallel
    |- API3
    |- API4

In this tree the root is sequential, which means its child nodes must run… sequentially. The first child is also a sequential parent, so the ordering of this node is the execution of API1 followed by API2 when API1 completes. When that branch completes, the next branch can execute. That branch is parallel, so both API3 and API4 execute concurrently. When both complete, the final root node is complete!

Int the nomenclature of AETR when you go to run a step tree, it becomes a run tree. A run tree is the same tree as a step tree but includes information such as state, timing, inputs/outputs, etc. For example:

case class Run(
  id: RunInstanceId,
  var children: Seq[Run],
  rootId: RootId,
  repr: StepTree,
  executedAt: Option[Instant] = None,
  completedAt: Option[Instant] = None,
  version: Version = Version(1),
  createdAt: Instant = Instant.now(),
  updatedAt: Instant = Instant.now(),
  var parent: Option[Run] = None,
  var state: RunState = RunState.Pending,
  var input: Option[ResultData] = None,
  var output: Option[ResultData] = None
)

DB layer

Run trees are stored in a postgres DB and are easy to reconstitute from a storage layer. Since every row contains the root, we can in one DB call get all the nodes for a run tree and then rebuild the graph in memory based on parent/child links.

Step trees related to run trees are a bit more complicated to rebuild since step trees can point to other step trees. To rebuild a step tree there’s a step tree table which contains each individual step node as a row in the db. And there is also a table called step_children which relates a parent to its ordered set of children. We need a children link instead of a parent link for the reason described above. Step trees can be modified to link to other trees, and they can be re-used in many composable steps. This means that there’s no clear parent of a tree, since the action of API1 can be re-used in many different trees.

Here’s an example of rebuilding a step tree:

def getStep(stepTreeId: StepTreeId): Future[StepTree] = {
    val idQuery = sql&quot;&quot;&quot;
                     WITH RECURSIVE getChild(kids) AS (
                       SELECT ${stepTreeId}
                       UNION ALL
                       SELECT child_id FROM step_children
                       JOIN getChild ON kids = step_children.id
                     )
                     SELECT * FROM getChild&quot;&quot;&quot;.as[StepTreeId]

    val nodesQuery = for {
      ids &lt;- idQuery
      treeNodes &lt;- steps.query.filter(_.id inSet ids).result
      treeChildren &lt;- children.query.filter(_.id inSet ids).result
    } yield {
      (treeChildren, treeNodes)
    }

    provider.withDB(nodesQuery.withPinnedSession).map {
      case (children, nodes) =&gt;
        val allSteps = composer.reconstitute(nodes, children)

        allSteps.find(_.id == stepTreeId).get
    }
  }

We can use a recursive CTE in postgres to find all the children starting at a given tree id, then we can slurp those childrens identities and rebuild the graph in memory.

Storing the children in a separate table also has an advantage that parent are child aware. Why does this matter? Well AETR wouldn’t be as useful as it is if all it did was strictly call API’s. We need a way to transform payloads between calls and we need a way to reduce parallel calls into a singular output, so that nodes can be composed. This matters because assume that API1 returns some json shape, and API2 requires a different json shape as its input. If we hooked API`` ->API2directly it'd never work. There needs to be a _mapper_. But mapping functions are only related to their relative placement in the graph. If we rehookAPI1->API3` now it may need a different mapping function. To that end you can’t store mappers directly on step nodes themselves, it has to be on the child relationship.

On top of that we have the concept of reducers in AETR. Parallel parents can take the result set of all their children and reduce the payloads into one result.

Lets look at a concrete example:

null

Here’s a root tree that does things in parallel and has some sequential sub nodes.

If we look at one of the parallel parents we can see how to reduce the data:

Much the same way we can see how to map data between nodes for sequential parents

Mappers and reducers are executed in a sandboxed nashorn engine.

Concurrency

It’s important in AETR to make sure that as we work on these trees that concurrent access doesn’t introduce race conditions. AETR internally supports some optimistic locking on the trees as well as atomic version updates to prevent any concurrency issues.

Example

Lets take a look at a full flow!

In this example we run the full tree and we can see the inputs and outputs of the data as they are mapped, and finally reduced. When the entire tree is complete the root node of the tree contains the final data. In this way the root is the final result and can be used to programmatically poll the AETR api.

Give AETR a shot and please feel free to leave feedback here or in the github issues!

Chaos monkey for docker

I work at a mostly AWS shop, and while we still have services on raw EC2, nearly all of our new development is on Amazon ECS in docker. I like docker because it provides a unified unit of operation (a container) that makes it easy to build shared tooling regardless of language/application. It also lets you reproduce your applications local in the same environment they run remote, as well as starting fast and deploying fast.

However, many services run on a shared ECS node in a cluster, and so while things like Chaos Monkey may run around turning nodes off it’d be nice to have a little less of an impact during working hours while still being able to stress recovery and our alerting.

This is actually pretty easy though with a little docker container we call The Beast. All the beast does is run on a ECS Scheduled event every 15-30 minutes from 10am – 3pm PST (we have teams east and west coasts) and the beast kills a random container from whatever cluster node its on. It doesn’t do a lot of damage, but it does test your fault tolerance.

Here’s The Beast:

#!/usr/bin/env ruby

require 'json'
require 'pp'

class Hash
  def extract_subhash(*extract)
    h2 = self.select{|key, value| extract.include?(key) }
    self.delete_if {|key, value| extract.include?(key) }
    h2
  end
end

puts "UNLEASH THE BEAST!"

ignore_image_regex = ENV["IGNORED_REGEX"]

raw = "[#{`docker ps --format '{{json .}}'`.lines.join(',')}]"

running_services = JSON.parse(raw).map { |val| val.extract_subhash("ID", "Image")}

puts running_services

puts "Ignoring regex #{ignore_image_regex}"

if ignore_image_regex && ignore_image_regex.length > 0
  running_services.delete_if {|value|
    /#{ignore_image_regex}/ === value["Image"]
  }
end

if !running_services || running_services.length == 0
  puts "No services to kill"

  Process.exit(0)
end

puts "Bag of services to kill: "

to_kill = running_services.sample

puts "Killing #{pp to_kill}"

`docker kill #{to_kill["ID"]}`

prng = Random.new

quips = [
    "Dont fear the reaper",
    "BEAST MODE",
    "You been rubby'd",
    "Pager doody"
]

puts "#{quips[prng.rand(0..quips.length-1)]}"

Beast supports a regex of ignored images (so critical images like the ecs_agent and itself) can be marked as ignore. This can also be used to update the beast to allow it to ignore services temporarily/etc.

We deploy The Beast with terraform, the general task definition looks like:

[
  {
    "name": "the-beast",
    "image": "${image}:${version}",
    "cpu": 10,
    "memory": 50,
    "essential": true,
    "logConfiguration": {
        "logDriver": "awslogs",
        "options": {
          "awslogs-group": "${log_group}",
          "awslogs-region": "${region}",
          "awslogs-stream-prefix": "the-beast"
        }
    },
    "environment": [
        {
          "name": "IGNORED_REGEX", "value": ".*ecs_agent.*|.*the-beast.*"
        }
    ],
    "mountPoints": [
        { "sourceVolume": "docker-socket", "containerPath": "/var/run/docker.sock", "readOnly": true }
    ]
  }
]

And the terraform:

resource "aws_ecs_task_definition" "beast_rule" {
  family = "beast-service"
  container_definitions = "${data.template_file.task_definition.rendered}"

  volume {
    name = "docker-socket"
    host_path = "/var/run/docker.sock"
  }
}

data "template_file" "task_definition" {
  template = "${file("${path.module}/files/task-definition.tpl")}"

  vars {
    version = "${var.beast-service["version"]}"
    region = "${var.region}"
    image = "${data.terraform_remote_state.remote_env_state.docker_namespace}/the-beast"
    log_group = "${var.log-group}"
  }
}

resource "aws_cloudwatch_event_target" "beast_scheduled_job_target" {
  target_id = "${aws_ecs_task_definition.beast_rule.family}"
  rule = "${aws_cloudwatch_event_rule.beast_scheduled_job.name}"
  arn = "${data.aws_ecs_cluster.default_cluster.id}"
  role_arn = "${data.aws_iam_role.ecs_service_role.arn}"
  ecs_target {
    task_count = 1
    task_definition_arn = "${aws_ecs_task_definition.beast_rule.arn}"
  }
}

resource "aws_cloudwatch_event_rule" "beast_scheduled_job" {
  name = "${aws_ecs_task_definition.beast_rule.family}"
  description = "Beast kills a container every 30 minutes from 10AM to 3PM PST Mon-Thu"
  schedule_expression = "cron(0/30 18-23 ? * MON-THU *)"
  is_enabled = false
}

resource "aws_cloudwatch_log_group" "beast_log_group" {
  name = "${var.log-group}"
}

We can log to cloudwatch and correlate back information if a service was killed by the best as well. It’s important to note that you need to mount the docker socket for beast to work, since it needs docker to run. A sample dockerfile looks like:

FROM ubuntu:xenial

RUN apt-get update && apt-get install -y ruby-full docker.io build-essential

RUN gem install json

ADD beast.rb /app/beast.rb

RUN chmod +x /app/beast.rb

ENTRYPOINT "/app/beast.rb"

It’s bare bones, but it works, and the stupid quips at the end always make me chuckle.

Functors in scala

A coworker of mine and I frequently talk about higher kinded types, category theory, and lament about the lack of unified types in scala: namely functors. A functor is a fancy name for a thing that can be mapped on. Wanting to abstract over something that is mappable comes up more often than you think. I don’t necessarily care that its an Option, or a List, or a whatever. I just care that it has a map.

We’re not the only ones who want this. Cats, Shapeless, Scalaz, all have implementations of functor. The downside there is that usually these definitions tend to leak throughout your ecosystem. I’ve written before about ecosystem and library management, and it’s an important thing to think about when working at a company of 50+ people. You need to think long and hard about putting dependencies on things. Sometimes you can, if those libraries have good versioning or back-compat stories, or if they expose lightweight API’s with heavyweight bindings that you can separate out.

Often times these libraries aren’t really well suited for large scale use and so you’re forced to either replicate, denormalize, or otherwise hide away how those things come into play.

In either case, this post isn’t about that. I just wanted to know how the hell those libraries did the magic.

Let me lay out the final product first and we’ll break it down:

trait Functor[F[_]] {
  def map[A, B](f: F[A])(m: A => B): F[B]
}

object Functor {
  implicit class FunctorOps[F[_], A](f: F[A])(implicit functor: Functor[F]) {
    def map[B](m: A => B): F[B] = {
      functor.map(f)(m)
    }
  }

  implicit def iterableFunctor[T[X] <: Traversable[X]] = new Functor[T] {
    override def map[A, B](f: T[A])(m: A => B) = {
      f.map(m).asInstanceOf[T[B]]
    }
  }

  implicit def optionFunctor = new Functor[Option] {
    override def map[A, B](f: Option[A])(m: A => B) = {
      f.map(m)
    }
  }

  implicit def futureFunctor(implicit executionContext: ExecutionContext) = new Functor[Future] {
    override def map[A, B](f: Future[A])(m: A => B) = {
      f.map(m)
    }
  }
}

And no code is complete without a test…

class Tests extends FlatSpec with Matchers {

  import com.curalate.typelevel.Functor
  import com.curalate.typelevel.Functor._

  private def testMaps[T[_] : Functor](functor: T[Int]): T[Int] = {
    functor.map(x => x + 1)
  }

  "A test" should "run" in {
    testMaps(List(1)) shouldEqual List(2)

    testMaps(Some(1): Option[Int]) shouldEqual Some(2)

    testMaps(None: Option[Int]) shouldEqual None

    testMaps(Set(1)) shouldEqual Set(2)

    Await.result(testMaps(Future.successful(1)), Duration.Inf) shouldEqual 2
  }
}

How did we get here? First if you look at the definition of functor again

trait Functor[F[_]] {
  def map[A, B](f: F[A])(m: A => B): F[B]
}

We’re saying that

  1. Given a type F that contains some other unknown type (i.e. F is a box, like List, or Set)
  2. Define a map function from A to B and give me back a type of F of B

The nuanced part here is that the map takes an instance of F[A]. We need this to get all the types to be happy, since we have to specify somewhere that F[A] and A => B are paired together.

Lets make a functor for list, since that one is pretty easy:

object Functor {
  implicit lazy val listFunctor = new Functor[List] {
    override def map[A, B](f: List[A])(m: A => B) = {
      f.map(m)
    }
  }
}

Now we can get an instance of functor from a List[T]

We could use it like this now:

def listMapper(f: Functor[List[Int]])(l: List[Int])  = {
  f.map(l)(_ + 1)
}

But that sort of sucks. I don’t want to know I have a list, that defeats the purpose of a functor!

What if we do

def intMapper[T[_]](f: Functor[T[Int]])(l: T[Int])  = {
  f.map(l)(_ + 1)
}

Kind of better. Now I have a higher kinded type that doesn’t care about what the box is. But I still need to somehow get an instance of a functor to do my mapping.

This is where the ops class come in:

implicit class FunctorOps[F[_], A](f: F[A])(implicit functor: Functor[F]) {
  def map[B](m: A => B): F[B] = {
    functor.map(f)(m)
  }
}

This guy says given a container, and a functor for that container, here is a helpful map function. It’s giving us an extension method on F[A] that adds map. You may wonder, well dont’ all things we’re mapping on already have a map function? And the answer is yes, but the compiler doesn’t know that since we’re dealing with only generics here!

Now, we can import our functor ops class and finally get that last bit to work:

class Tests extends FlatSpec with Matchers {

  import com.curalate.typelevel.Functor
  import com.curalate.typelevel.Functor._

  private def testMaps[T[_] : Functor](functor: T[Int]): T[Int] = {
    functor.map(x => x + 1)
  }

  "A test" should "run" in {
    testMaps(List(1)) shouldEqual List(2)

    testMaps(Some(1): Option[Int]) shouldEqual Some(2)

    testMaps(None: Option[Int]) shouldEqual None

    testMaps(Set(1)) shouldEqual Set(2)

    Await.result(testMaps(Future.successful(1)), Duration.Inf) shouldEqual 2
  }
}

Pulling it all together, we’re asking for a type of T that is a box of anything that has an implicit Functor[T] typeclass. We want to use the map method on the functor of T and that map method comes because we leverage the implicit FunctionOps.

It helps to think of functor not as an interface that a thing implements, but as a typeclass/extension of a thing. I.e. in order to get a map, you have to wrap something.

Anyways, big thanks to Christian for helping me out.

Tracing High Volume Services

This post was originally posted at engineering.curalate.com

We like to think that building a service ecosystem is like stacking building blocks. You start with a function in your code. That function is hosted in a class. That class in a service. That service is hosted in a cluster. That cluster in a region. That region in a data center, etc. At each level there’s a myriad of challenges.

From the start, developers tend to use things like logging and metrics to debug their systems, but a certain class of problems crops up when you need to debug across services. From a debugging perspective, you’d like to have a higher projection of the view of the system: a linearized view of what requests are doing. I.e. You want to be able to see that service A called service B and service C called service D at the granularity of single requests.

Cross Service Logging

The simplest solution to this is to require that every call from service to service comes with some sort of trace identifier. Incoming requests into the system, either from public API’s or client side requests, or even from async daemon invoked timers/schedules/etc generates a trace. This trace then gets propagated through the entire system. If you use this trace in all your log statements you can now correlate cross service calls.

How is this accomplished at Curalate? For the most part we use Finagle based services and the Twitter ecosystem has done a good job of providing the concept of a thread local TraceId and automatically propagating it to all other twitter-* components (yet another reason we like Finatra!).

All of our service clients automatically pull this thread local trace id out and populate a known HTTP header field that services then pick up and re-assume. For Finagle based clients this is auto-magick’d for you. For other clients that we use, like OkHttp, we had to add custom interceptors that pulled the trace from the thread local and set it on the request.

Here is an example of the header being sent automatically as part of Zipkin based headers (which we re-use as our internal trace identifiers):

finagle_trace_id

Notice the X-B3-TraceId header. When a service receives this request it’ll re-assume the trace id and set its SLF4j MDC field of traceId to be that value. We can now include in our logback.xml configuration to include the trace id like in our STDOUT log configuration below:

<appender name="STDOUT-COLOR" class="ch.qos.logback.core.ConsoleAppender">
    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
        <level>TRACE</level>
    </filter>
    <encoder>
        <pattern>%yellow(%d) [%magenta(%X{traceId})] [%thread] %highlight(%-5level) %cyan(%logger{36}) %marker - %msg%n</pattern>
    </encoder>
</appender>

And we can also send the trace id as a structured JSON field to Loggly.

Let’s look at an example from our own logs:

tid_example

What we’re seeing here is a system called media-api made a query to a system called networkinformationsvc. The underlying request carried a correlating trace id across the service boundaries and both systems logged to Loggly with the json.tid (transaction id) field populated. Now we can query our logs and get a linear time based view of what’s happening.

Thread local tracing

The trick here is to make sure that this implicit trace id that is pinned to the thread local of the initiating request properly moves from thread to thread as you make async calls. We don’t want anyone to have to ever remember to set the trace. It should just gracefully flow from thread to thread implicity.

To make sure that traces hop properly between systems we had to make sure to enforce that everybody uses an ExecutionContext that safely captures the callers thread local’s before executing. This is critical, otherwise you can make an async call and the trace id gets dropped. In that case, bye bye go the logs! It’s hyper important to always take an execution context and to never pin an execution context when it comes to async scala code. Thankfully, we can make any execution context safe by wrapping it up in a delegate:

/**
 * Wrapper around an existing ExecutionContext that makes it propagate MDC information.
 */
class PropagatingExecutionContextWrapper(wrapped: ExecutionContext)
  extends ExecutionContext { self =>

   override def prepare(): ExecutionContext = new ExecutionContext {
     // Save the call-site state
     private val context = Local.save()

     def execute(r: Runnable): Unit = self.execute(new Runnable {
       def run(): Unit = {
         // re-assume the captured call site thread locals
         Local.let(context) {
           r.run()
         }
       }
     })

     def reportFailure(t: Throwable): Unit = self.reportFailure(t)
   }

  override def execute(r: Runnable): Unit = wrapped.execute(r)

  override def reportFailure(t: Throwable): Unit = wrapped.reportFailure(t)
}

class TwitterExecutionContextProvider extends ExecutionContextProvider {
  /**
   * Safely wrap any execution context into one that properly passes context
   *
   * @param executionContext
   * @return
   */
  override def of(executionContext: ExecutionContext) = new PropagatingExecutionContextWrapper(executionContext)
}

We’ve taken this trace wrapping concept and applied to all kinds of executors like ExecutorService, and ScheduledExecutorService. Technically we don’t really want to expose the internals of how we wrap traces, so we load an ExecutionContextProvider via a java service loading mechanism and provide an API contract so that people can wrap executors without caring how they are wrapped:

/**
 * A provider that loads from the java service mechanism
 */
object ExecutionContextProvider {
  lazy val provider: ExecutionContextProvider = {
    Option(ServiceLoader.load(classOf[ExecutionContextProvider])).
      map(_.asScala).
      getOrElse(Nil).
      headOption.
      getOrElse(throw new MissingExecutionContextException)
  }
}

/**
 * Marker interfaces to provide contexts with custom logic. This
 * forces users to make sure to use the execution context providers that support request tracing
 * and maybe other tooling
 */
trait ProvidedExecutionContext extends ExecutionContext

/**
 * A context provider contract
 */
trait ExecutionContextProvider {
  def of(context: ExecutionContext): ProvidedExecutionContext

  ...
}

From a callers perspective they now do:

implicit val execContext = ExecutionContextProvider.provider.of(scala.concurrent.ExecutionContext.Implicits.global)

Which would wrap, in this example, the default scala context.

Service to Service dependency and performance tracing

Well that’s great! We have a way to safely and easily pass trace id’s, and we’ve tooled through our clients to all pass this trace id automatically, but this only gives us logging information. We’d really like to be able to leverage the trace information to get more interesting statistics such as service to service dependencies, performance across service hops, etc. Correlated logs is just the beginning of what we can do.

Zipkin is an open source tool that we’ve discussed here before so we won’t go too much into it, but needless to say that Zipkin hinges on us having proper trace identifiers. It samples incoming requests to determine IF things should be traced or not (i.e. sent to Zipkin). By default, we have all our services send 0.1% of their requests to Zipkin to minimize impact on the service.

Let’s look at an example:

zipkin

In this Zipkin trace we can see that this batch call made a call to Dynamo. The whole call took 6 milliseconds and 4 of those milliseconds were spent calling Dynamo. We’ve tooled through all our external client dependencies with Zipkin trace information automatically using java dynamic proxies so that as we upgrade our external dep’s we get tracing on new functions as well.

If we dig further into the trace:

zipkin_w_trace

We can now see (highlighted) the trace ID and search in our logs for logs related to this trace

Finding needles in the haystack

We have a way to correlate logs, and get sampled performance and dependency information between services via Zipkin. What we still can’t do yet is trace an individual piece of data flowing through high volume queues and streams.

Some of our services at Curalate process 5 to 10 thousand items a second. It’s just not fiscally prudent to log all that information to Loggly or emit unique metrics to our metrics system (DataDog). Still, we want to know at the event level where things are in the system, where they passed through, where they got dropped etc. We want to answer the question of

Where is identifier XYZ.123 in the system and where did it go and come from?

This is difficult to answer with the current tools we’ve discussed.

To solve this problem we have one more system in play. This is our high volume auditing system that lets us write and filter audit events at a large scale (100k req/s+). The basic architecture here is we have services write audit events via an Audit API which are funneled to Kinesis Firehose. The firehose stream buffers data for either 5 minutes or 128 MB (whichever comes first). When the buffer limit is reached, firehose dumps newline separated JSON in a flat fi`le into S3. We have a lambda function that waits for S3 create events on the bucket, reads the JSON, then transforms the JSON events into Parquet which is an efficient columnar storage format. The Parquet file is written back into S3 into a new folder with the naming scheme of

year=YYYY/month=MM/day=DD/hour=HH/minute=mm/<uuid>.parquet

Where the minutes are grouped in 5 minute intervals. This partition is then added to Athena, which is a managed map-reduce around PrestoDB, that lets you query large datasets in S3.

auditing_arch

What does this have to do with trace id’s? Each event emitted comes with a trace id that we can use to query back to logs or Zipkin or other correlating identifiers. This means that even if services aren’t logging to Loggly due to volume restrictions, we can still see how events trace through the system. Let’s look at an example where we find a specific network identifier from Instagram and see when it was data mined and when we added semantic image tags to it (via our vision APIs):

SELECT minute, app, message, timestamp, context
FROM curalateauditevents."audit_events"
WHERE context['network_id'] = '1584258444344170009_249075471' and context['network']='instagram'
and day=18 and hour=22
order by timestamp desc
limit 100

This is the Athena query. We’ve included the specific network ID and network we are looking for, as well as a limited partition scope.

athena_query

Notice the two highlights.

Starting at the second highlight there is a message that we augmented the piece of data. In our particular pipe we only augment data under specific circumstances (not every image is analyzed) and so it was important to see that some images were dropped and this one was augmented. Now we can definitely say “yes, item ABC was augmented but item DEF was not and here is why”. Awesome.

Moving upwards, the first highlight is how much data was scanned. This particular partition we looked through has 100MB of data, but we only searched through 2MB to find what we wanted (this is due to the optimization of Parquet). Athena is priced by how much data you scan at a cost of $5 per terabyte. So this query was pretty much free at a cost of $0.000004. The total set of files across all the partitions for the past week is roughly 21GB spanning about 3.5B records. So even if we queried all the data, we’d only pay $.04. In fact, the biggest cost here isn’t in storage or query or lambda, it’s in firehose! Firehose charges you $0.029 per GB transferred. At this rate we pay 60 cents a week. The boss is going to be ok with that.

However, there are still some issues here. Remember the target scale is upwards of 100k req/s. At that scale we’re dealing with a LOT of data through Kinesis Firehose. That’s a lot of data into S3, a lot of IO reads to transform to Parquet, and a lot of opportunities to accidentally scan through tons of data in our athena partitions with poorly written queries that loop over repeated data (even though we limit partitions to a 2 week TTL). We also now have issues of rate limiting with Kinesis Firehose.

On top of that, some services just pump so much repeated data that its not worth seeing it all the time. To that end we need some sort of way to do live filters on the streams. What we’ve done to solve this problem is leverage dynamically invoked Nashorn javascript filters. We load up filters from a known remote location at an interval of 30 seconds, and if a service is marked for filtering (i.e. it has a really high load and needs to be filtered) then it’ll run all of its audit events through the filter before it actually gets sent to the downstream firehose. If an event fails the filter it’s discarded. If it passes, the event is annotated with which filter name it passed through and sent through the stream.

Filters are just YML files for us:

name: "Filter name"
expiration: <Optional DateTime. Epoch or string datetime of ISO formats parseable by JODA>
js: |
    function filter(event) {
        // javascript that returns a boolean
    }

And an example filter may look like

name: "anton_client_filter"
js: |
    function filter(event) {
      var client = event.context.get("client_id")

      return client != null && client == "3136"
    }

In this filter only events that are marked with the client id of my client will pass through. Some systems don’t need to be filtered so all their events pass through anyway.

Now we can write queries like

SELECT minute, app, message, timestamp, context
FROM curalateauditevents."audit_events"
WHERE contains(trace_names, 'anton_client_filter')
and day=18 and hour=22
limit 100

To get events that were tagged with my filter in the current partition. From there, we now can do other exploratory queries to find related data (either by trace id or by other identifiers related to the data we care about).

Let’s look at some graphs that show how dramatic this filtering can be

filtering

Here the purple line is one of our data mining ingestion endpoints. It’s pumping a lot of data to firehose, most of which is repeated over time and so isn’t super useful to get all the input from. The moment the graph drops is when the yml file was uploaded with a filter to add filtering to the service. The blue line is a downstream service that gets data after debouncing and other processing. Given its load is a lot less we don’t care so much that it is sending all its data downstream. You can see the purple line slow to a trickle later on when the filter kicks in and data starts matching it.

Caveats with Nashorn

Building the system out there were a few interesting caveats when using Nashorn in a high volume pipeline like this.

The first was that subtle differences in javascript can have massive performance impacts. Let’s look at some examples and benchmark them.

function filter(event) {
  var anton = {
    "136742": true,
    "153353": true
  }

  var mineable = event.context.get("mineable_id")

  return mineable != null && anton[mineable]
}

The JMH benchmarks of running this code is

[info] FiltersBenchmark.testInvoke  thrpt   20     1027.409 ±      29.922  ops/s
[info] FiltersBenchmark.testInvoke   avgt   20  1484234.075 ± 1783689.007  ns/op

What?? 29 ops/second

Let’s make some adjustments to the filter, given our internal system loads the javascript into an isolated scope per filter and then re-invokes just the function filter each time (letting us safely create global objects and pay heavy prices for things once):

var anton = {
  "136742": true,
  "153353": true
}

function filter(event) {
  var mineable = event.context.get("mineable_id")

  return mineable != null && anton[mineable]
}
[info] FiltersBenchmark.testInvoke  thrpt   20  7391161.402 ± 206020.703  ops/s
[info] FiltersBenchmark.testInvoke   avgt   20    14879.890 ±   8087.179  ns/op

Ah, much better! 206k ops/sec.

If we use java constructs:

function filter(event) {
  var anton = new java.util.HashSet();
  anton.add("136742")
  anton.add("153353")

  var mineable = event.context.get("mineable_id")

  return mineable != null && anton.contains(mineable)
}
[info] FiltersBenchmark.testInvoke  thrpt   20  5662799.317 ± 301113.837  ops/s
[info] FiltersBenchmark.testInvoke   avgt   20    41963.710 ±  11349.277  ns/op

Even better! 301k ops/sec

Something is clearly up with the anonymous object creation in Nashorn. Needless to say, benchmarking is important, especially when these filters are going to be dynamically injected into every single service we have. We need them to be performant, sandboxed, and safe to fail.

For that we make sure everything runs its own engine scope in a separate execution context isolated from main running code and is fired off asynchronously to not block the main calling thread. This is also where we have monitoring and alerting on when someone uploads a non-performant filter so we can investigate and mitigate quickly.

For example, the discovery of the poorly performing json object came from this alert:

high_cpu

Conclusion

Tracing is hard and it’s incredibly difficult to tool through after the fact if you start to build service architectures without this in mind from the get go. Tooling trace identifiers through the system from the beginning sets you up for success in building more interesting debugging infrastructure that isn’t always possible without that. When building larger service ecosystems it’s important to keep in mind how to inspect things at varying granularity levels. Sometimes building custom tools to help inspect the systems is worth the effort, especially if they help debug complicated escalations or data inconsistencies.

From Thrift to Finatra

Originally posted on the curalate engineering blog

There are a million and one ways to do (micro-)services, each with a million and one pitfalls. At Curalate, we’ve been on a long journey of splitting out our monolith into composable and simple services. It’s never easy, as there are a lot of advantages to having a monolith. Things like refactoring, code-reuse, deployment, versioning, rollbacks, are all atomic in a monolith. But there are a lot of disadvantages as well. Monoliths encourage poor factoring, bugs in one part of the codebase force rollbacks/changes of the entire application, reasoning about the application in general becomes difficult, build times are slow, transient build errors increase, etc.

To that end our first foray into services was built on top of Twitter Finagle stack. If you go to the page and can’t figure out what exactly finagle does, I don’t blame you. The documentation is lackluster and in and of itself is quite low-level. Finagle defines a service as a function that transforms a request into a response, and composes services with filters that manipulate requests/responses themselves. It’s a clean abstraction, given that this is basically what all web service frameworks do.

Thrift

Finagle by itself isn’t super opinionated. It gives you building blocks to build services (service discovery, circuit breaking, monitoring/metrics, varying protocols, etc) but doesn’t give you much else. Our first set of services built on finagle used Thrift over HTTP. Thrift, similiar to protobuf, is an intermediate declarative language that creates RPC style services. For example:

namespace java tutorial
namespace py tutorial

typedef i32 int // We can use typedef to get pretty names for the types we are using
service MultiplicationService
{
        int multiply(1:int n1, 2:int n2),
}

Will create an RPC service called MultiplicationService that takes 2 parameters. Our implementation at Curalate hosted Thrift over HTTP (serializing Thrift as JSON) since all our services are web based behind ELB’s in AWS.

We have a lot of services at Curalate that use Thrift, but we’ve found a few shortcomings:

Model Reuse

Thrift forces you to use primitives when defining service contracts, which makes it difficult to share lightweight models (with potentially useful utilities) to consumers. We’ve ended up doing a lot of mapping between generated Thrift types and shared model types. Curalate’s backend services are all written in Scala, so we don’t have the same issues that a company like Facebook (who invented Thrift) may have with varying languages needing easy access to RPC.

Requiring a client

Many times you want to be able to interact with a service without needing access to a client. Needing a client has made developers to get used to cloning service repositories, building the entire service, then entering a Scala REPL in order to interact with a service. As our service surface area expands, it’s not always feasible to expect one developer to build another developers service (conflicting java versions, missing SBT/Maven dependencies or settings, etc). The client requirement has led to services taking heavyweight dependencies on other services and leaking dependencies. While Thrift doesn’t force you to do this, this has been a side effect of it taking extra love and care to generate a Thrift client properly, either by distributing Thrift files in a jar or otherwise.

Over the wire inspection

With Thrift-over-HTTP, inspecting requests is difficult. This is due to the fact that these services use Thrift serialization, which unlike JSON, isn’t human-readable.

Because Thrift over HTTP is all POSTs to /, tracing access and investigating ELB logs becomes a jumbled mess of trying to correlate times and IP’s to other parts of our logging infrastructure. The POST issue is frustrating, because it’s impossible for us to do any semantic smart caching, such as being able to insert caches at the serving layer for retrieval calls. In a pure HTTP world, we could insert a cache for heavily used GETs given a GET is idempotent.

RPC API design

Regardless of Thrift, RPC encourages poorly unified API’s with lots of specific endpoints that don’t always jive. We have many services that have method topologies that are poorly composable. A well designed API, and cluster of API’s, should gently guide you to getting the data you need. In an ideal world if you get an ID in a payload response for a data object, there should be an endpoint to get more information about that ID. However, in the RPC world we end up with a batch call here, a specific RPC call there, sometimes requiring stitching several calls to get data that should have been a simple domain level call.

Internal vs External service writing

We have lot of public REST API’s and they are written using the Lift framework (some of our oldest code). Developers moving from internal to external services have to shift paradigms and move from writing REST with JSON to RPC with Thrift.

Overall Thrift is a great piece of technology, but after using it for a year we found that it’s not necessarily for us. All of these things have prompted a shift to writing REST style services.

Finatra

Finatra is an HTTP API framework built on top of Finagle. Because it’s still Finagle, we haven’t lost any of our operational knowledge of the underlying framework, but instead we can now write lightweight HTTP API’s with JSON.

With Finatra, all our new services have Swagger automatically enabled so API exploration is simple. And since it’s just plain JSON using Postman is now possible to debug and inspect APIs (as well as viewing requests in Charles or other proxies).

With REST we can still distribute lightweight clients, or more importantly, if there are dependency conflicts a service consumer can very quickly roll an HTTP client to a service. Our ELB logs now make sense and our new API’s are unified in their verbiage (GET vs POST vs PUT vs DELETE) and if we want to write RPC for a particular service we still can.

There are a few other things we like about Finatra. For those developers coming from a background of writing HTTP services, Finatra feels familiar with the concept of controllers, filters, unified test-bed for spinning up build verification tests (local in memory servers), dependency injection (via Guice) baked in, sane serialization using Jackson, etc. It’s hard to do the wrong thing given that it builds strong production level opinions onto Finagle. And thankfully those opinions are ones we share at Curalate!

We’re not in bad company — Twitter, Duolingo, and others are using Finatra in production.

The HTTP driver pattern

Yet another SOA blog post, this time about calling services. I’ve seen a lot of posts, articles, even books, on how to write services but not a good way about calling services. It may seem trivial, isn’t calling a service a matter of making a web request to one? Yes, it is, but in a larger organization it’s not always so trivial.

Distributing fat clients

The problem I ran into was the service stack in use at my organization provided a feature rich client as an artifact of a services build. It had retries, metrics, tracing with zipkin, etc. But, it also pulled in things like finagle, netty, jackson, and each service may be distributing slightly different versions of all of these dependencies. When you start to consume 3, 4, 5 or more clients in your own service, suddenly you’ve gotten into an intractable mess of dependencies. Sometimes there’s no actual way to resolve them all without forcing upgrades in other services! That… sucks. It violates the idea of services in that my service is now coupled to your service.

You don’t want to force service owners to have to write clients for each service they want to call. That’d be a big waste of time and duplicated effort. If your organization is mono-lingual (i.e. all java/scala/whatever) then its still worth providing a feature rich client that has the sane things built in: retries, metrics, tracing, fast fail, serialization, etc. But you don’t want services leaking all the nuts and bolts to each other.

One solution is to auto generate clients server side. This is akin to what WCF does, or projects like swagger, thrift for RPC, etc. The downside here is that the generated code is usually pretty nasty and sometimes its hard to plug in to augment the clients with custom tracing, correlation tracking, etc. Other times the API itself might need a few nicety helper methods that you don’t want to expose in the raw API itself. But in the auto generated world, you can’t do this.

There are other projects like Retrofit that look like they solve the problem since your client is just an interface and its only dependency is OkHttp. But retrofit isn’t scala friendly (None’s need custom support, default arguments in methods are not properly intercepted, etc). You’re also bound to the back-compat story of retrofit/okhttp, assuming that they can do things like make sure older versions live side by side together.

In practice, I found that retrofit (even with scala’s issues) didn’t work well in a distributed services environment where everyone was at wildly different versions of things.

Abstracting HTTP

However, taking the idea from retrofit we can abstract away http calls with an http driver. Http really isn’t that complicated, especially for how its used in conjuction with service to service calls:

import scala.concurrent.{ExecutionContext, Future}

case class ApiRequest(
  path: String,
  queryParams: Seq[(String, Option[String])] = Nil,
  headers: Seq[(String, Option[String])] = Nil,
  options: Option[RequestOptions] = None
) 

case class RequestOptions(
  contentType: Option[String],
  characterSet: String = "utf-8"
)

/**
 * A response with a body
 *
 * @param data     The deserialized data
 * @param response The raw http response
 * @tparam T The type to deserialize
 */
case class BodyResponse[T](data: T, response: RawResponse)

/**
 * A raw response that contains code, the body and headers
 *
 * @param code
 * @param body
 * @param headers
 */
case class RawResponse(code: Int, body: String, headers: Map[String, List[String]])

/**
 * An http error that all drivers should throw on non 2xx
 *
 * @param code  The code
 * @param body  An optional body
 * @param error The inner exception (may be driver specific)
 */
case class HttpError(code: Int, body: Option[String], error: Exception)
  extends Exception(s"Error ${code}, body: ${body}", error)

/**
 * Marker trait indicating an http client
 */
trait HttpClient

/**
 * The simplest HTTP Driver. This is used to abstract libraries that call out over the wire.
 *
 * Anyone can create a driver as long as it implements this interface
 */
trait HttpDriver {
  val serializer: HttpSerializer

  def get[TRes: Manifest](
    request: ApiRequest
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def post[TReq: Manifest, TRes: Manifest](
    request: ApiRequest,
    body: Option[TReq]
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def put[TReq: Manifest, TRes: Manifest](
    request: ApiRequest,
    body: Option[TReq]
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def patch[TReq: Manifest, TRes: Manifest](
    request: ApiRequest,
    body: Option[TReq]
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def custom[TReq: Manifest, TRes: Manifest](
    method: Methods,
    request: ApiRequest,
    body: Option[TReq]
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def delete[TRes: Manifest](
    request: ApiRequest
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]

  def bytesRaw[TRes: Manifest](
    method: Methods,
    request: ApiRequest,
    body: Option[Array[Byte]]
  )(implicit executionContext: ExecutionContext): Future[BodyResponse[TRes]]
}

Service owners who want to distribute a client can create clients that have no dependencies (other than the driver definition. Platform maintainers, like myself, can be dilligent about making sure the driver interface never breaks, or if it does is broken in a new namespace such that different versions can peacefully co-exist in the same process.

An example client can now look like

class ServiceClient(driver: HttpDriver) {
  def ping()(implicit executionContext: ExecutionContext): Future[Unit] = {
    driver.get[Unit]("/health").map(_.data)
  }
}

But we still need to provide an implementation of a driver. This is where we can decouple things and provide drivers that are properly tooled with all the fatness we want (netty/finagle/zipkin tracing/monitoring/etc) and service owners can bind their clients to whatever driver they want. Those provided implementations can be in their own shared library that only service’s bind to (not service clients! i.e. terminal endpoints in the dependency graph)

There are few advantages here:

  • Clients can be distributed at multiple scala versions without dependency conflicts
  • It’s much simpler to version manage and back-compat an interface/trait than it is an entire lib
  • Default drivers that do the right thing can be provided by the service framework, and back compat doesn’t need to be taken into account there since the only consumer is the service (it never leaks).
  • Drivers are simple to use, so if someone needs to roll their own client its really simple to do it

Custom errors

We can do some other cool stuff now too, given we’ve abstracted away how to call http code. Another common issue with clients is dealing with meaningful errors that aren’t just the basic http 5xx/4xx codes. For example, if you throw a 409 conflict you may want the client to actually receive a WidgetInIncorrectState exception for some calls, and in other calls maybe a FooBarInUse error that contains more semantic information. Basically overloading what a 409 means for a particular call/query. One way of doing this is with a discriminator in the error body:

HTTP 409 response:
{
   "code": "WidgetInIncorrectState",
   "widgetName: "foo",
   "widgetSize": 1234
}

Given we don’t want client code pulling in a json library to do json parsing, the driver needs to support context aware deserialization.

To do that, I’ve exposed a MultiType object that defines

  • Given a path into the json object, which field defines the discriminator
  • Given a discriminator, which type to deserialize to
  • Which http error code to apply all this too

And it looks like:

/**
 * A type representing deserialization of multiple types.
 *
 * @param discriminatorField The field that represents the textual "key" of what the subtype is. Nested fields can be located using
 *                           json path format of / delimited. I.e /foo/bar
 * @param pathTypes          The lookup of the result of the discriminatorField to the subtype mapper
 * @tparam T The supertype of all the subtypes
 */
case class MultiType[T](
  discriminatorField: String,
  pathTypes: Map[String, SubType[_ <: T]]
)

/**
 * Represents a subtype as part of a multitype mapping
 *
 * @param path The optional json sub path (slash delimited) to deserialize the type as.
 * @tparam T The type to deserialize
 */
case class SubType[T: Manifest](path: Option[String] = None) {
  val clazz = manifest[T].runtimeClass.asInstanceOf[Class[T]]
}

Using this in a client looks like:

class ServiceClient(driver: HttpDriver) {
  val errorMappers = MultiType[ApiException](discriminatorField = "code", Map(
    "invalidData" -> SubType[InvalidDataException]()
  ))

  def ping()(implicit executionContext: ExecutionContext): Future[Unit] = {
    driver.get[Unit]("/health").map(_.data).failWithOnCode(500, errorMappers)
  }
}

This is saying that when I get the value invalidData in the json response of field code on an http 500 error, to actually throw an InvalidDataException in the client.

How does this work? Well just like the http driver, we’ve abstracted the serializer and that’s all plugged in by the service consumer

case class DiscriminatorDoesntExistException(msg: String) extends Exception(msg)

object JacksonHttpSerializer {
  implicit def jacksonToHttpSerializer(jacksonSerializer: JacksonSerializer): HttpSerializer = {
    new JacksonHttpSerializer(jacksonSerializer)
  }
}

class JacksonHttpSerializer(jackson: JacksonSerializer = new JacksonSerializer()) extends HttpSerializer {
  override def fromDiscriminator[SuperType](multiType: MultiType[SuperType])(str: String): SuperType = {
    val tree = jackson.objectMapper.readTree(str)

    val node = tree.at(addPrefix(multiType.discriminatorField, "/"))

    val subType = multiType.pathTypes.get(node.textValue()).orElse(multiType.defaultType).getOrElse {
      throw new RuntimeException(s"Discriminator ${multiType.discriminatorField} does not exist")
    }

    val treeToDeserialize = subType.path.map(m => tree.at(addPrefix(m, "/"))).getOrElse(tree)

    jackson.objectMapper.treeToValue(treeToDeserialize, subType.clazz)
  }

  override def toString[T](data: T): String = {
    jackson.toJson(data)
  }

  override def fromString[T: Manifest](str: String): T = {
    jackson.fromJson(str)
  }

  private def addPrefix(s: String, p: String) = {
    p + s.stripPrefix(p)
  }
}

Inherent issues

While there are a lot of goodies in abstracting serialization and http calling into a library API provided with implementations (drivers), it does handicap the clients a little bit. Things like doing custom manipulation of the raw response, any sort of business logic, adding other libraries, etc is really frowned upon. I’d argue this is a good thing and that this should all be handled at the service level since a client is always a nice to have and not a requirement.

Conclusion

The ultimate goal in SOA is separation. But 100% separation should not mean copy-pasting things, reinventing the wheel, or not sharing any code. It just means you need to build the proper lightweight abstractions to help keep strong barriers between services without creating a distributed monolith.

With the http drive abstraction pattern it’s now easy to provide drives that use finagle-http under the hood, or okhttp, or apache http, etc. Client writers can share their model and client code with helpful utilities without leaking dependencies. And most importantly, service owners can update dependencies and move to new scala versions without fearing that their dependencies are going to cause runtime or compile time issues against pulled in clients, all while still iterating quickly and safely.

Bit packing Pacman

Haven’t posted in a while, since I’ve been heads down in building a lot of cool tooling at work (blog posts coming), but had a chance to mess around a bit with something that came up in an interview question this week.

I frequently ask candidates a high level design question to build PacMan. Games like pacman are fun because on the surface they are very simple, but if you don’t structure your entities and their interactions correctly the design falls apart.

At some point during the interview we had scaled the question up such that there was now a problem of knowing at a particular point in the game what was nearby it. For example, if the board is 100000 x 100000 (10 billion elements) how efficiently can we determine if there is a nugget/wall next to us? One option is to store all of these entities in a 2d array and just access the neighbors. However, if the entity is any non trivial object, then we now have at minumum 16 bytes. That means we’re storing 160 gigs to access the board. Probably not something we can realistically do on commodity hardware.

Given we’re answering only a “is something there or not” question, one option is to bit pack the answer. In this sense you can leverage that each bit represents a coordinate in your grid. For example in a 2D grid

0 1
2 3

These positions could be represented by the binary value at that bit:

0 = 0b0001
1 = 0b0010
2 = 0b0100
3 = 0b1000

If we do that, and we store a list of longs (64 bits, 8 bytes) then to store 10 billion elements we need:

private val maxBits = maxX * maxY
private val requiredLongs = (maxBits / 64) + 1

Which ends up being 22,032,273 longs, which in turn is 176.2 MB. Thats… a big savings. Considering that the trivial form we stored 10,000,000,000 objects, this is a compression ratio of 450%.

Now, one thing the candidate brought up (which is a great point) is that this makes working with the values much more difficult. The answer here is to provide a higher level API that hides away the hard bits.

I figured today I’d set down and do just that. We need to be able to do a few things

  1. Find out how many longs to store
  2. Find out given a coordinate which long it belongs to
  3. In that long toggle the bit representing the coordinate if we want to set/unset it
class TwoDBinPacker(maxX: Int, maxY: Int) {
  private val maxBits = maxX * maxY
  private val requiredLongs = (maxBits / 64) + 1
  private val longArray = new Array[Long](requiredLongs)

  def get(x: Int, y: Int): Boolean = {
    longAtPosition(x, y).value == 1
  }

  def set(x: Int, y: Int, value: Boolean) = {
    val p = longAtPosition(x, y)

    longArray(p.index) = p.set(value)
  }

  private def longAtPosition(x: Int, y: Int): BitValue = {
    val flattenedPosition = y * maxX + x

    val longAtPosition = flattenedPosition / 64

    val bitAtPosition = flattenedPosition % 64

    BitValue(longAtPosition, longArray(longAtPosition), bitAtPosition)
  }
}

With the helper class of a BitValue looking like:

case class BitValue(index: Int, container: Long, bitNumber: Int) {
  val value = (container >> bitNumber) & 1

  def set(boolean: Boolean): Long = {
    if (boolean) {
      val maskAt = 1 << bitNumber

      container | maskAt
    } else {
      val maskAt = ~(1 << bitNumber)

      container & maskAt
    }
  }
}

At this point we can drive a scalatest:

"Bit packer" should "pack large sets (10 billion!)" in {
  val packer = new TwoDBinPacker(100000, 100000)

  packer.set(0, 0, true)
  packer.set(200, 400, true)

  assert(packer.get(0, 0))
  assert(packer.get(200, 400))
  assert(!packer.get(99999, 88888))
}

And this test runs in 80ms.

Now, this is a pretty naive way of doing things, since we are potentially storing tons of unused longs. A smarter way would be use a sparse set with skip lists, such that as you use a long you create it and mark it used, but things before it and after it (up to the next long) are marker blocks that can span many ranges. I.e.

{EmtpyBlock}[long, long, long]{EmptyBlock}[long]

This way you don’t have to store things you don’t actually set.

Anyways, a fun little set of code to write. Full source available on my github

Strongly typed http headers in finatra

When building service architectures one thing you need to solve is how to pass context between services. This is usually stuff like request id’s and other tracing information (maybe you use zipkin) between service calls. This means that if you set request id FooBar123 on an entrypoint to service A, if service A calls service B it should know that the request id is still FooBar123. The bigger challenge is usually making sure that all thread locals keep this around (and across futures/execution contexts), but before you attempt that you need to get it into the system in the first place.

I’m working in finatra these days, and I love this framework. It’s got all the things I loved from dropwizard but in a scala first way. Todays challenge was that I wanted to be able to pass request http headers around between services in a typesafe way that could be used in thread local request contexts. Basically I want to send

X-Magic-Header someValue

And be able to resolve that into a MagicHeader(value: T) class.

The first attempt is easy, just parse header values into case classes:

case class MagicHeader(value: String)

But the question I have is how do I enforce that the header string X-Magic-Value is directly correlated to the case class MagicHeader?

object MagicHeader { 
   val key = "X-Magic-Header"
}

case class MagicHeader(value: String)

Maybe, but still, when someone sends the value out, they can make a mistake:

setRequestHeader("X-mag1c-whatevzer" -> magicHeader.value)

That sucks, I don’t want that. I want it strictly paired. I’m looking for what is in essence a case class that has 2 fields: key, value, but where the key is fixed. How do I do that?

I like to start with how I want to use something, and then work backwards to how to make that happen. Given that, lets say we want an api kind of like:

object Experimental {
  val key = "Experimental"

  override type Value = String
}

And I’d like to be able to do something like

val experimentKey = Experimental("experiment abc")
(experimentKey.key -> experimentKey.value) shouldEqual
         ("Experimental" -> "experiment abc")

I know this means I need an apply method somewhere, and I know that I want a tuple of (key, value). I also know that because I have a path dependent type of the second value, that I can do something with that

Maybe I can fake an apply method to be like

trait ContextKey {
  val key: String

  /**
   * The custom type of this key
   */
  type Value

  /**
   * A tupel of (String, Value)
   */
  type Key = Product2[String, Value]

  def apply(data: Value): Key = new Key {
    override def _1: String = key

    override def _2: Value = data
  }
}

And update my object to be

object Experimental extends ContextKey {
  val key = "Experimental"

  override type Value = String
}

Now my object has a mixin of an apply method that creates an anonmyous tuple of type String, Value. You can create instances of Experimental but you can’t ever set the key name itself! However, I can still access the pinned key because the anonymous tuple has it!

But in the case that I wanted, I wanted to use these as http header values. Which means I need to be able to parse a string into a type of ContextKey#Value which is path dependent on the object type.

We can do that by adding now a few extra methods on the ContextKey trait:

trait ContextKeyType[T] extends Product2[String, T] {
  def unparse: String
}

trait ContextKey {
  self =>
  val key: String

  /**
   * The custom type of this key
   */
  type Value

  /**
   * A tupel of (String, Value)
   */
  type Key = ContextKeyType[Value]

  /**
   * Utility to allow the container to provide a mapping from Value => String
   *
   * @param r
   * @return
   */
  def parse(r: String): Value

  def unparse(v: Value): String

  def apply(data: Value): Key = new Key {
    override def _1: String = key

    override def _2: Value = data

    /**
     * Allow a mapping of Value => String
     *
     * @return
     */
    override def unparse: String = self.unparse(data)

    override def equals(obj: scala.Any): Boolean = {
      canEqual(obj)
    }

    override def canEqual(that: Any): Boolean = {
      that != null &&
      that.isInstanceOf[ContextKeyType[_]] &&
      that.asInstanceOf[ContextKeyType[_]]._1 == key &&
      that.asInstanceOf[ContextKeyType[_]]._2 == data
    }
  }
}

This introduces a parse and unparse method which converts things to and from strings. A http header object can now define how to convert it:

object Experimental extends ContextKey {
  val key = "Experimental"
  override type Value = String

  override def parse(value: String): String = value

  override def unparse(value: String): String = value
}

So, if we want to maybe send JSON in a header, or a long/int/uuid we can now parse and unparse that value pre and post wire.

Now lets add a utility to convert a Map[String, String] which could represent an http header map, into a set of strongly typed context values:

object ContextValue {
  def find[T <: ContextKey](search: T, map: Map[String, String]): Option[T#Value] = {
    map.collectFirst {
      case (key, value) if search.key == key => search.parse(value)
    }
  }
}

Back in finatra land, lets add a http filter

case class CurrentRequestContext(
  experimentId: Option[Experimental.Value],
)

object RequestContext {
  private val requestType = Request.Schema.newField[CurrentRequestContext]

  implicit class RequestContextSyntax(request: Request) {
    def context: CurrentRequestContext = request.ctx(requestType)
  }

  private[filters] def set(request: Request): Unit = {
    val data = CurrentRequestContext(
      experimentId = ContextValue.find(Experimental, request.headerMap)
    )

    request.ctx.update(requestType, data)
  }
}

/**
 * Set the remote context from requests 
 */
class RemoteContextFilter extends SimpleFilter[Request, Response] {
  override def apply(request: Request, service: Service[Request, Response]): Future[Response] = {
    RequestContext.set(request)

    service(request)
  }
}

From here on out, we can provide a set of strongly typed values that are basically case classes with hidden keys