Adding MDC logging to akka

I’ve mentioned before, but I’m working heavily in a project that is leveraging akka. I am really enjoying the message passing model and so far things are great, but tying in an MDC for the SLFJ logging context proved complicated. I had played with the custom executor model described here but hadn’t attempted the akka custom dispatcher.

I was thinking that a custom dispatcher would work great to pass along the MDC since then you’d never have to think about it, but unfortunately I couldn’t get it to work. Akka kept failing to instantiate the dispatcher. I was also worried about configuration data and possible tuning that you might lose giving akka your own dispatcher configurator.

So, given that I wasn’t quite sure what to do. What I ended up with however was a little extra work but turned out well. I went with an augmented dispatcher/subscriber model. Basically for every event that I send out I wrap it in a PersistentableMessage which traps the fields of the MDC that I care about, and then on any actor I have them subclass a custom logging base class that pops out the persistent message container, sets the MDC, and gives the actor the underlying message.

For my project we’re tracking everything with what we call a CorrelationId which is just a UUID.

The message wrapper

@Data
public class PersistableMessageContext implements CorrelationIdGetter, CorrelationIdSetter {
    private final Object source;

    private UUID correlationId;

    public PersistableMessageContext(Object source){
        this.source = source;

        try {
            final String s = MDC.get(FilterAttributes.CORR_ID);

            setCorrelationId(UUID.fromString(s));
        }
        catch(Throwable ex){}
}

This is the message that I want to pass around. By containing its source correlation ID it can later be used to set the context when its being consumed

The actor base

I now have all my actors subclass this class

public abstract class LoggableActor extends UntypedActor {
    @Override public void onReceive(final Object message) throws Exception {
        Boolean wasSet = false;

        if (CorrelationIdGetter.class.isAssignableFrom(message.getClass())) {
            final UUID correlationId = ((CorrelationIdGetter) message).getCorrelationId();

            if (correlationId != null) {
                MDC.put(FilterAttributes.CORR_ID, correlationId.toString());

                wasSet = true;
            }
        }

        if (message instanceof PersistableMessageContext) {
            onReceiveImpl(((PersistableMessageContext) message).getSource());
        }
        else {
            onReceiveImpl(message);
        }

        if(wasSet) {
            MDC.remove(FilterAttributes.CORR_ID);
        }
    }

    public abstract void onReceiveImpl(final Object message) throws Exception;
}

This lets me pass in anything that implements a CorrelationIdGetter and if it happens to also be a persisted message, pop out the inner message.

Sending out messages

Now the big issue here is to make sure that we are consistent in publishing messages. This means using routers, broadcasts, etc, all have to make sure to push out a message wrapped in a persistent container. To help make that easier I created a few augmented akka publisher classes. Below is a class with static methods (to make it easy to import) that wrap an actor ref or a router.

import akka.actor.ActorRef;
import akka.routing.Broadcast;
import akka.routing.Router;

/**
 * Utilitiy to provide context propagation on akka messages
 */
public class AkkaAugmenter {

    public static AkkaAugmentedActor wrap(ActorRef src) {
        return new AkkaAugmentedActor(){
            @Override public void tell(final Object msg, final ActorRef sender) {
                final PersistableMessageContext persistableMessageContext = new PersistableMessageContext(msg);

                src.tell(persistableMessageContext, sender);
            }

            @Override public ActorRef getActor() {
                return src;
            }
        };
    }

    public static AkkaAugmentedRouter wrap(Router src) {
       return new AkkaAugmentedRouter() {
           @Override public void route(final Object msg, final ActorRef sender) {
               final PersistableMessageContext persistableMessageContext = new PersistableMessageContext(msg);

               src.route(persistableMessageContext, sender);
           }

           @Override public void broadcast(final Object msg, final ActorRef sender) {
               final PersistableMessageContext persistableMessageContext = new PersistableMessageContext(msg);

               src.route(new Broadcast(persistableMessageContext), sender);
           }

           @Override public Router getRouter() {
               return src;
           }
       };
    }
}

The augmented actor:

public interface AkkaAugmentedActor {
    void tell(Object msg, ActorRef sender);

    ActorRef getActor();
}

And the augmented router:

public interface AkkaAugmentedRouter {
    void route(Object msg, ActorRef sender);

    void broadcast(Object msg, ActorRef sender);

    Router getRouter();
}

Conclusion

And now all I need to do is to wrap a default actor or router give to me by akka. From here on out all messages are auto wrapped and my MDC is properly propagated. While I would have liked to not rely on convention this way, at least I made it simple once you’ve made the right types.

Post a comment

You may use the following HTML:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>