Thursday, August 9, 2018

Crosswalk Pattern

Intro

CompletableFuture is great. It is so great that it definitely deserves more attention. It simplified parallel execution immensely and what is even more important no third party libraries are necessary. But working with it requires some finesse. Today I'll show how to use CompletableFuture with lockable resources. This post continues the series of pattern ideas.

Java's ReentrantReadWriteLock

This very useful class has been around since Java 5. If you have a resource that can be read or written to you can wrap it with a ReentrantReadWriteLock. When a client wants to write it needs to acquire the write lock. If a client wants to read it needs to acquire a read lock. Yes exactly the write lock and a read lock. Many read locks can be held at the same time (as many as 65535). But only one write lock can be held at any time. No read lock can be acquired while the write lock is held (by different threads). The write lock also cannot be acquired while at least one read lock is held. These are the rules of the game. More information can be found in the javadocs of ReentrantReadWriteLock.

Motivation

So what is the problem with CompletableFuture? The problem is in Java a lock is held by a thread. One cannot perform lock.lock() in one thread and lock.unlock() in another. It won't work. This issue becomes particularly important if certain tasks must be executed in parallel and some resource must remain unchanged while the tasks are being executed. These tasks need to take either a read lock or the write lock of the resource and unlock it after all is done.

Applicability

Use this pattern if you need to ensure that some object or objects do not change during some long-running or not so long-running tasks. A necessary requirement is that these object or object are protected by a Java Lock possibly ReentrantReadWriteLock.

Structure

The solution involves using an additional single-threaded executor. It is assumed we can get a Runnable to lock all the locks and a Runnable to unlock all the locks. These Runnables must be executed in the single-threaded executor (STE later on). Here are the steps that must be run in the STE in the text form:
  1. Run the locking Runnable with CompletableFuture.runAsync(runnable, STE)
  2. Check that all locks have been locked (with a LockWrapper for example)
  3. Get the aggregate CompletableFuture for the tasks which are run in a different executor. You can do it in a runnable but it must be done in the STE. The tasks cannot start before the locks have been locked.
  4. Call get() or get (timeout, timeunit) on the aggregate CompletableFuture.
  5. Run the unlocking Runnable
In this case it makes more sense to show the structure of execution not the structure of classes. Here it is:
This single-threaded executor on the diagram looks like a crosswalk, isn't it?

Participants

Single-threaded executor - the executor that obtains the locks and releases them after all tasks have completed execution
Tasks Executor - the executor that executes the tasks in parallel
Locks - the locks that must be locked to ensure some object or objects do not change while the tasks are being executed

Consequences

  1. Can run long-running tasks in a separate executor and ensure some object or objects remain unchanged during the execution
  2. Can use Java's locks and ReentrantReadWriteLock. 

Sample code

public CompletableFuture<A> execute() {
    CompletableFuture<Void> lockFT = CompletableFuture.completedFuture(null);
    if (lockRunnable != null) {
        lockFT.thenRunAsync(lockRunnable, ste);
    }
    CompletableFuture<Void> execFT = lockFT.thenRunAsync(() ->
        {
            if (<all locks locked>) {
                CompletableFuture<T> tasksAggr = <get future>;
                // wait until all tasks have completed execution
                tasksAggr.get();
            } else {
                <report this>
            }
        },
    ste);
    CompletableFuture<Void> unlockFT;
    if (unlockRunnable != null) {
        // execute unlock even if the previous stage completed exceptionally
        unlockFT = execFT.whenCompleteAsync((Void x, Throwable t) -> unlockRunnable.run(), ste);
    } else {
        unlockFT = execFT;
    }
    return unlockFT.thenApplyAsync(<return value>, ste);
}

Final Thoughts

In this simple pattern it is not the structure that is important but the threads that execute it. By correctly handling a single-threaded executor and combining it with another possibly multi-threaded one we can use Java's ReentrantReadWriteLock with multiple threads. This single-threaded executor serves as a crosswalk that the operations must use to cross the road. If some operations veer off this crosswalk this scheme will fail.

No comments:

Post a Comment