Gerrit's Blog

16.03.2015

Hystrix collapser

In my last article I introduced hystrix to keep control of your (remote) interfaces. It helps to make your connection solid against timeouts or errors on the wire but also helps you to work with a broken connection by introducing circuit breakers with fallback results. This feature also reduces the requests from your systems to nearly zero and gives the other system a chance to breathe. Let's explore some more features in hystrix.

Collapse it

One common problem when working with remote systems (e.g. databases, webservices) is the time needed for the request round trip. If you have possible several hundreds calls this round trip time will get much more important for you and have a bad impact on your overall performance.

The solution hystrix offers is the HystrixCollapser. A collapser will collect request over time (or by count) and handle them to a HystrixCommand that supports querying batch data.

As an example we will load some external blog posts. So we have a BlogPostRepository

public class BlogPostRepository {
  public List<BlogPost> loadAll(List<Integer> ids) {
    ArrayList<BlogPost> blogPosts = new ArrayList<>(ids.size());
    for (Integer ignored : ids) {
      BlogPost blogPost = new BlogPost();
      blogPosts.add(blogPost);
    }
    return blogPosts;
  }

  public BlogPost load(Integer id) {
    return new BlogPost();
  }
}

and a BlogPostService

public class DefaultBlogPostService implements BlogPostService {

    private final BlogPostRepository blogPostRepository;

    public DefaultBlogPostService(BlogPostRepository blogPostRepository) {
        this.blogPostRepository = blogPostRepository;
    }

    @Override
    public List<Future<BlogPost>> getBlogPosts(List<Integer> ids) {
        List<Future<BlogPost>> results = new ArrayList<>(ids.size());
        results.addAll(
                ids.stream()
                        .map(id -> new BlogPostCollapsedCommand(blogPostRepository, id).queue())
                        .collect(Collectors.toList())
        );
        return results;
    }
}

The classes are pretty straight forward but you should get the idea. For every requested id in the service a new Collapser will be created to retrieve the matching blog post. You might notice two things here: We are using queue instead of exectue to queue the request instead of explicit waiting for a result which would lead to single execution in the collapser which gives us no benefit but some more overhead by creating new collapser and command instances. The second one is the return type of the queuing method: a Future. This makes pretty much sense since we don't want to interrupt our main application when we are batch fetching some remote data.

Here comes the code for the collapser

public class BlogPostCollapsedCommand extends HystrixCollapser<List<BlogPost>, BlogPost, Integer> {

    private final BlogPostRepository blogPostRepository;
    private final Integer entryId;

    public BlogPostCollapsedCommand(BlogPostRepository blogPostRepository, Integer entryId) {
        super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("blogPostCollapser")).andCollapserPropertiesDefaults(
                HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(1000)));
        this.blogPostRepository = blogPostRepository;
        this.entryId = entryId;
    }

    @Override
    public Integer getRequestArgument() {
        return entryId;
    }

    @Override
    protected HystrixCommand<List<BlogPost>> createCommand(
            Collection<CollapsedRequest<BlogPost, Integer>> collapsedRequests) {

        final List<Integer> entryIds = new ArrayList<>(collapsedRequests.size());
        entryIds.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument)
                .collect(Collectors.toList()));

        return new BlogPostBatchCommand(blogPostRepository, entryIds);
    }

    @Override
    protected void mapResponseToRequests(List<BlogPost> batchResponse,
            Collection<CollapsedRequest<BlogPost, Integer>> collapsedRequests) {
        int count = 0;
        for (CollapsedRequest<BlogPost, Integer> collapsedRequest : collapsedRequests) {
            final BlogPost blogPost = batchResponse.get(count++);
            collapsedRequest.setResponse(blogPost);
        }
    }
}

This is the most minimal example I can give which does something when called ;)

Let's start with the types you have to define.

The constructor gets a timer property to set the time the collapser will wait to collect calls and convert them into a single request. The getRequestArgument function just returns the given (single) request argument. The next both functions are the heart of the collapser. createCommand collects all request arguments and puts them into a command capable of doing a batch request (such as calling the repository above). The mapResponseToRequests function gets called after the batch request was triggered. It re-orders the results of the request back to the initial provided requests. In this example I don't really care about the ordering in the result list and mapping the single results back to their matching request as you might see. In a more real world application you would not use a List but e.g. a Map to link request and response correctly.

Take a look at the sample code on github.