Tagged: akka

Converting akka scala futures to java futures

Back in akka land! I’m using the ask pattern to get results back from actors since I have a requirement to block and get a result (I can’t wait for an actor to push at a later date). Thats fine, but converting from scala futures to java completable futures is a pain. I also, (like mentioned in another post) want to make sure that my async responses capture and set the MDC for proper logging.

My final usage should look something like:

private <Response, Request> Future<Response> askActorForResponseAsync(Request source) {
    final FiniteDuration askTimeout = new FiniteDuration(config.getAskForResultTimeout().toMillis(), TimeUnit.MILLISECONDS);

    final Timeout timeout = new Timeout(askTimeout);

    final scala.concurrent.Future<Object> ask = Patterns.ask(master.getActor(), new PersistableMessageContext(source), timeout);

    return FutureConverter.fromScalaFuture(ask)
                          .executeOn(actorSystem.dispatcher())
                          .thenApply(i -> (Response) i);
}

The idea is that I’m going to translate a scala future with a callback into a completable future java promise.

Next up, the future converter:

public class FutureConverter {
    public static <T> 
Read more
Adding MDC logging to akka

I’ve mentioned before, but I’m working heavily in a project that is leveraging akka. I am really enjoying the message passing model and so far things are great, but tying in an MDC for the SLFJ logging context proved complicated. I had played with the custom executor model described here but hadn’t attempted the akka custom dispatcher.

I was thinking that a custom dispatcher would work great to pass along the MDC since then you’d never have to think about it, but unfortunately I couldn’t get it to work. Akka kept failing to instantiate the dispatcher. I was also worried about configuration data and possible tuning that you might lose giving akka your own dispatcher configurator.

So, given that I wasn’t quite sure what to do. What I ended up with however was a little extra work but turned out well. I went with an augmented dispatcher/subscriber model. Basically for … Read more

Simplifying class matching with java 8

I’m knee deep in akka these days and its a great queueing framework, but unfortunately I’m stuck using java and not able to use scala (business decisions, not mine!) so pattern matching on incoming untyped events can be kind of nasty.

You frequently see stuff like this in receive methods:

public void onReceive(Object message){
 if(message instanceof Something){

 }
 else if (message instanceof SomethingElse){

 }
 .. etc
}

And while that technically works, I really hate it because it promotes a monolothic function doing too much work. It also encourages less disciplined devs to put logic into the if block. While this is fine for a few checks, what happens when you need to dispatch 10, or 20 different types? It’s not uncommon in actor based systems to have lots of small message types.

Also, because akka gives you your object as a type erased Object you can’t use normal dispatching … Read more

Auto scaling akka routers

I’m working on a project where I need to multiplex many requests through a finite set of open sockets. For example, I have 200 messages, but I can only have at max 10 sockets open. To accomplish this I’ve wrapped the sockets in akka actors and am using an akka routing mechanism to “share” the 10 open sockets through a roundrobin queue.

This works out great, since now the consumers (who are rabbit mq listeners) just post messages to a facacde on the resource, and akka will route the request and do the appropriate work for me.

However, I wanted to know of a clean way to be able to add more resources (or remove them). Say at runtime I am asked to add 10 more open connections, or that suddenly we need to scale down to 5 connections. I’d like the router to be able to manage that for … Read more

RxJava Observables and Akka actors

I was playing with both akka and rxjava and came across the following post that described how to map rxjava observables from messages posted to akka actors.

Since my team works in java, I decided to try mapping the concept to java directly, but found that there was an issue. When I tried to have multiple subscribers listen on the stream I’d get an exception since more than one subscriber would send the “subscribe” message and try to modify the akka receive context.

I also wanted to make it easier to extend the actors to be able to process a piece of work, and then resubmit it for consumption by the observable.

The subscribe command messages

First, let me show the commands we can send to the actors. This is just mapping the scala union type that the original blog post had. The @Data attribute is part of project lombokRead more