Showing posts with label Java. Show all posts
Showing posts with label Java. Show all posts

Thursday, August 9, 2018

Crosswalk Pattern

Intro

CompletableFuture is great. It is so great that it definitely deserves more attention. It simplified parallel execution immensely and what is even more important no third party libraries are necessary. But working with it requires some finesse. Today I'll show how to use CompletableFuture with lockable resources. This post continues the series of pattern ideas.

Java's ReentrantReadWriteLock

This very useful class has been around since Java 5. If you have a resource that can be read or written to you can wrap it with a ReentrantReadWriteLock. When a client wants to write it needs to acquire the write lock. If a client wants to read it needs to acquire a read lock. Yes exactly the write lock and a read lock. Many read locks can be held at the same time (as many as 65535). But only one write lock can be held at any time. No read lock can be acquired while the write lock is held (by different threads). The write lock also cannot be acquired while at least one read lock is held. These are the rules of the game. More information can be found in the javadocs of ReentrantReadWriteLock.

Motivation

So what is the problem with CompletableFuture? The problem is in Java a lock is held by a thread. One cannot perform lock.lock() in one thread and lock.unlock() in another. It won't work. This issue becomes particularly important if certain tasks must be executed in parallel and some resource must remain unchanged while the tasks are being executed. These tasks need to take either a read lock or the write lock of the resource and unlock it after all is done.

Applicability

Use this pattern if you need to ensure that some object or objects do not change during some long-running or not so long-running tasks. A necessary requirement is that these object or object are protected by a Java Lock possibly ReentrantReadWriteLock.

Structure

The solution involves using an additional single-threaded executor. It is assumed we can get a Runnable to lock all the locks and a Runnable to unlock all the locks. These Runnables must be executed in the single-threaded executor (STE later on). Here are the steps that must be run in the STE in the text form:
  1. Run the locking Runnable with CompletableFuture.runAsync(runnable, STE)
  2. Check that all locks have been locked (with a LockWrapper for example)
  3. Get the aggregate CompletableFuture for the tasks which are run in a different executor. You can do it in a runnable but it must be done in the STE. The tasks cannot start before the locks have been locked.
  4. Call get() or get (timeout, timeunit) on the aggregate CompletableFuture.
  5. Run the unlocking Runnable
In this case it makes more sense to show the structure of execution not the structure of classes. Here it is:
This single-threaded executor on the diagram looks like a crosswalk, isn't it?

Participants

Single-threaded executor - the executor that obtains the locks and releases them after all tasks have completed execution
Tasks Executor - the executor that executes the tasks in parallel
Locks - the locks that must be locked to ensure some object or objects do not change while the tasks are being executed

Consequences

  1. Can run long-running tasks in a separate executor and ensure some object or objects remain unchanged during the execution
  2. Can use Java's locks and ReentrantReadWriteLock. 

Sample code

public CompletableFuture<A> execute() {
    CompletableFuture<Void> lockFT = CompletableFuture.completedFuture(null);
    if (lockRunnable != null) {
        lockFT.thenRunAsync(lockRunnable, ste);
    }
    CompletableFuture<Void> execFT = lockFT.thenRunAsync(() ->
        {
            if (<all locks locked>) {
                CompletableFuture<T> tasksAggr = <get future>;
                // wait until all tasks have completed execution
                tasksAggr.get();
            } else {
                <report this>
            }
        },
    ste);
    CompletableFuture<Void> unlockFT;
    if (unlockRunnable != null) {
        // execute unlock even if the previous stage completed exceptionally
        unlockFT = execFT.whenCompleteAsync((Void x, Throwable t) -> unlockRunnable.run(), ste);
    } else {
        unlockFT = execFT;
    }
    return unlockFT.thenApplyAsync(<return value>, ste);
}

Final Thoughts

In this simple pattern it is not the structure that is important but the threads that execute it. By correctly handling a single-threaded executor and combining it with another possibly multi-threaded one we can use Java's ReentrantReadWriteLock with multiple threads. This single-threaded executor serves as a crosswalk that the operations must use to cross the road. If some operations veer off this crosswalk this scheme will fail.

Friday, July 27, 2018

Distribution System Pattern

Intro

Hi all! This blog post continues the "new patterns ideas" series. In these posts I suggest new patterns that utilize the latest technology available in computer programming. The well-known patterns from the Gang of Four book are all great and are widely used. But they were created a long time ago. They didn't have concurrency, futures and other stuff. I call them "new pattern ideas" because they are not actually patterns at this moment only ideas. I'm sure some programmers have used similar structures in their programs trying to solve similar problems.

Patterns help us in many ways. For those who don't know who to solve a problem they may show the way. For those who have already solved a problem they help explain the solution to others. In my opinion the best programs can be represented as a group of patterns linked together.

Motivation

The picture above shows the motivation for this pattern. You have a number of tasks which can be executed in parallel. The tasks must be distributed to workers and the results the workers produce add up to one final result. All the tools for this task are already available in Java: CompletableFuture, ExecutorService. This pattern will introduce some clarity how to use these tools.

Applicability

Use this pattern when you have a big task that can be decomposed into many small tasks that can be executed in parallel. Execution of one small task creates one small result. All the small results add up to produce one final result. 

Structure

Tasks

The tasks should be organized hierarchically like this:
Leaves contain the Task objects that do the actual work. The main idea is that the task execution must return a result and CompletableFuture uses Supplier in supplyAsync methods. Consequently it is easier to use task objects implementing Supplier. All the leaves have one execute method that returns a CompletableFuture received by passing the task object to CompletableFuture.supplyAsync (possibly also passing its own executor).  
Folders allow for grouping of tasks and ordered execution. For example one group of tasks may be executed before the other. It is important to remember though that all the tasks return results of one type or at least inherited from one type. Folders call execute on all of their children and return a CompletableFuture that is the result of CompletableFuture.allOf(...). If necessary folders may contain other folders.
The root is used for administrative purposes. It may contain the executor service for the tasks and also is the main entry point for the distribution system. It should have something like an execute method that begins the whole execution process and calls execute on all folders. 
Here is the call hierarchy:

execute always returns a CompletableFuture. 

Accumulating Results

Accumulating results is a very important part of this pattern. The idea is to create an interface like this:

public interface IDistribAccumulator<R, F, E> {

    public void addResult(R newResult);

    public List<R> getAllResults();

    public F getResult();

    public void addError(E newError);

    public List<E> getErrors();
}

We'll call this object the accumulator. Here R is the type of the task result, F the final result of this operation, E the type of an error. The leaf adds some handling to the task by means of CompletableFuture.handle. If the task completed without errors it adds the result to the accumulator. If an exception was thrown it adds an error to the accumulator. Here is a sample code of the execute method of the Leaf:

public CompletableFuture<R> execute() {
        CompletableFuture<R> execFuture = CompletableFuture.supplyAsync(task, es);
        return execFuture.handle((R arg0, Throwable t) -> {
            if (arg0 != null) {
                accumulator.addResult(arg0);
            }
            if (t != null) {
                accumulator.addError(new <Throwable wrapper>(t));
            }
            return arg0;
        });
    }

Tasks remain decoupled from the leaves and the work is done. 

Users of this distribution system pattern will implement this interface and may provide a way to get the final result from the individual results when all the tasks have completed execution. 

In progress status

Also a common issue is how to report to the user the status when the work is in progress. Some tasks have completed execution, some are in progress, some are waiting in the queue and the UI must show the current status. The solution is to use the accumulator. It can return the tasks that are already completed and even the results for them. It is of course desirable that the accumulator returns an unmodified list from getAllResults to avoid any issues. 


How to create the structure

It makes sense to create a Builder to create the distribution system. The instance of the Builder is created with some parameters and then the Builder uses these parameters to create first the root, then the folders and then the tasks. 

Structure mapped to motivation


Participants

TaskRoot - the entry point to the distribution system. Contains shared objects and begin the execution
TaskFolder - contains individual tasks. Used for grouping tasks.
TaskLeaf - contains the actual task object. Executes it, adds the result or the error to the accumulator, returns the CompletableFuture.
Task - the actual piece of work. All tasks are independent (at least within a folder) and can be run in parallel.
Accumulator - accumulates the results from individual tasks and returns the final result.

Consequences

  1. Clear structure. After Java more or less streamlined the process of parallel execution by adding ExecutorService, CompletableFuture and other facilities a lot of people have begun using these facilities. But in order to successfully implement a solution a clear structure is necessary. This pattern offers such a clear structure.
  2. Reduced coupling. The structure to execute the tasks and get results is decoupled from the actual tasks. The objects that must be implemented are the tasks themselves, the accumulator and the builder. 
  3. Code reuse. The main structure can be written once and use multiple times by supplying the task objects, the accumulator and the builder. 

Implementation Considerations

This pattern is a concurrent pattern. Consequently it is very important to synchronize all the methods that can be accessed by multiple threads. Mostly these are the methods in the accumulator. It may be best if all of them are synchronized. 
The parts that need to be changed in any concrete implementation are the tasks, the accumulator and the builder. All the other parts of the system are fairly general and can be reused.

Final Thoughts

There is an actual implementation of this pattern and it is working quite well. Maybe in one of the next posts I'll talk about it.

Friday, July 20, 2018

Assembly Line Pattern

Intro

Hi all!
Important: This is my first attempt at a decent blog post about programming. Please go easy on me if I miss something or write something that doesn't look good.

So to the point. Java's CompletableFuture has been available for quite some time and it is a very useful tool indeed. But most well-known patterns from the famous "Design Patterns" book were created a looong time ago when creating more than one thread was rare. The pattern that I called "Assembly Line" will use both some well-known patterns and CompletableFuture to solve one common task.

Motivation

The user pressed a button and the program must execute some actions in response. The actions are executed sequentially slowly building up the end result. Like this:
It seems like an easy task - create some functions and do it! Nothing can be easier, it is simple single-threaded programming! Unfortunately it is not so easy. What if you have several similar tasks like this and in each task the steps are a little different? One step is removed, one is added. For example for some actions you do not need to show the user dialog in others additional processing is required for the parameters. That means a lot of code must be written several times. What if one or two of those steps represent a long-running operation? A special subsystem is called for to create and process such requests.

Applicability

Use this pattern when the process of handling user requests consists of similar steps with a little variation between the requests. The steps are executed sequentially no parallel execution is necessary.

Structure

Simple CompletableFuture

To describe the structure we'll start with the simplest form possible:
This diagram clearly shows the idea but it is very unwieldy. Usually the sequential tasks are not running in isolation. They need to get some information and add some information to the final result. 

Context

We need a context:
This is much better but here is the next hurdle. We need to reuse as much code as possible and that means the code for individual stages (we'll call them stages from now on) must be separated from each other and added as necessary. That means we also need a builder for the stages.

Builder

We pass some parameters to the builder and the builder creates a chain of stages. It is not the same as the Chain of Responsibility pattern because all the stages are always executed. This is more like filters in a servlet.
Now that we have a builder we can take a look at the base class for all stages.

Stage Base Class

The idea is to use "then" methods of CompletableFuture to chain executions and return a CompletableFuture that is executed after all the stages have completed execution. It is best to use an Executor to run the stages.

Complete Picture

The complete picture looks a little more complicated than the one we started with:
Everything looks in place. The final CompletableFuture from the System Entry point returns the context object.

Participants

Assembly Line system entry point - accepts the requests and begin processing. Creates and executes the chain of stages.
Context - stores the information the stages need to execute. Stages may read information from the context and write information to the context
Stage - one piece of work that is executed as part of a chain. Could be a long-running task
StageBuilder - creates the chain of stages based on the user request

Consequences

  1. As usual - reduced coupling. More classes that know nothing about each other :-). Seriously all the code that is used to handle user requests can be divided into small, more or less  independent classes/methods. Then the builder will create the correct chain for every user request. No code duplication, loose coupling through the context object
  2. Ability to execute long-running tasks as part of the chain. The main reason to use CompletableFuture in this pattern. In fact some stages may even use their own thread pools within themselves as long as they return a CompletableFuture.
  3. This structure is easy to understand and extend. It is easy to add more stages if necessary - just add the necessary classes and modify/add the builder. In fact the ability to understand should not be underestimated here. This simple model allows us to break a lot of sequential code into simpler pieces and combine them as necessary.

Final Thoughts

The assembly line system has existed for more than 100 years. It must be very simple for humans to understand and use. Why not use it in programming as well?