Converting akka scala futures to java futures

Back in akka land! I’m using the ask pattern to get results back from actors since I have a requirement to block and get a result (I can’t wait for an actor to push at a later date). Thats fine, but converting from scala futures to java completable futures is a pain. I also, (like mentioned in another post) want to make sure that my async responses capture and set the MDC for proper logging.

My final usage should look something like:

private <Response, Request> Future<Response> askActorForResponseAsync(Request source) {
    final FiniteDuration askTimeout = new FiniteDuration(config.getAskForResultTimeout().toMillis(), TimeUnit.MILLISECONDS);

    final Timeout timeout = new Timeout(askTimeout);

    final scala.concurrent.Future<Object> ask = Patterns.ask(master.getActor(), new PersistableMessageContext(source), timeout);

    return FutureConverter.fromScalaFuture(ask)
                          .executeOn(actorSystem.dispatcher())
                          .thenApply(i -> (Response) i);
}

The idea is that I’m going to translate a scala future with a callback into a completable future java promise.

Next up, the future converter:

public class FutureConverter {
    public static <T> FromScalaFuture<T> fromScalaFuture(scala.concurrent.Future<T> future) {
        return new FromScalaFuture<>(future);
    }
}

This is just an entrypoint into a new class that can give you a nice fluent interface to provide the execution context.

Next, a class whose job is to create an akka callback and convert it into a completable future.

import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;

import java.util.concurrent.CompletableFuture;

public class FromScalaFuture<T> {

    private final Future<T> future;

    public FromScalaFuture(Future<T> future) {
        this.future = future;
    }

    public CompletableFuture<T> executeOn(ExecutionContext context) {
        final CompletableFuture<T> completableFuture = new CompletableFuture<>();

        final AkkaOnCompleteCallback<T> completer = AkkaCompletionConverter.<T>createCompleter((failure, success) -> {
            if (failure != null) {
                completableFuture.completeExceptionally(failure);
            }
            else {
                completableFuture.complete(success);
            }
        });

        future.onComplete(completer.toScalaCallback(), context);

        return completableFuture;
    }
}

And finally another guy whose job it is to translate java functions into akka callbacks:

import akka.dispatch.OnComplete;

@FunctionalInterface 
public interface AkkaOnCompleteCallback<T> {
    OnComplete<T> toScalaCallback();
}
import akka.dispatch.OnComplete;
import org.slf4j.MDC;

import java.util.Map;
import java.util.function.BiConsumer;


public class AkkaCompletionConverter {
    /**
     * Handles closing over the mdc context map and setting the responding future thread with the
     * previous context
     *
     * @param callback
     * @return
     */
    public static <T> AkkaOnCompleteCallback<T> createCompleter(BiConsumer<Throwable, T> callback) {
        return () -> {

            final Map<String, String> oldContextMap = MDC.getCopyOfContextMap();

            return new OnComplete<T>() {
                @Override public void onComplete(final Throwable failure, final T success) throws Throwable {
                    // capture the current threads context map
                    final Map<String, String> currentThreadsContext = MDC.getCopyOfContextMap();

                    // set the closed over context map
                    if(oldContextMap != null) {
                        MDC.setContextMap(oldContextMap);
                    }

                    callback.accept(failure, success);

                    // return the current threads previous context map
                    if(currentThreadsContext != null) {
                        MDC.setContextMap(currentThreadsContext);
                    }
                }
            };
        };
    }
}

2 comments

  1. Alex

    Hey – this looks really cool and exactly what I’m after … my brain is too addled (And possibly not good enough in the first place) to follow the missing steps – ie what AkkaOnCompleteCallback and what .toScalaCallback() look like, any clues you can give me? Thanks!

    • Anton Kropp

      Alex, my bad. I forgot to post an interface. The toscalacallback is just executing the lambda that the akkaoncompletecallback returns.

      I’m on travel now but I’ll post the missing pieces next week.

Post a comment

You may use the following HTML:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>