JavaBlog.fr / Java.lu DEVELOPMENT,Java Java: Multi tasks, Multi threading, Synchronization, Semaphore, Mutex, Barrier

Java: Multi tasks, Multi threading, Synchronization, Semaphore, Mutex, Barrier

Hi,

This post concerns the multi-tasking or multi-threading which is a programming technique to take advantage of the use of tasks (threads). We will see in this tutorial the practical implementation of a multi-tasking.

Firstly, to define what the multi-tasking, I propose several examples within an application:

  • the GUI can start a thread to load a image while it continues to process events generated by user actions,
  • a server application that waits for connection requests from clients, can initiate a process / thread to handle requests for multiple clients simultaneously,
  • the multiplication of two matrices (m, p) and (p, n) may be carried out m * n parallel threads,

Below, some goals of multi-tasking:

  • improve performance by distributing the different tasks on different processors,
  • enjoy the break of a task (waiting for I / O or a user action) to do something else,
  • faster response to user actions by rejecting a long and non-interactive task in another thread (example: corrections of spelling in a text editor),

However, there are also problems with multi-tasking:

  • the difficulties of writing a multi-threaded program,
  • the debugging of a program that uses the multi-tasking,

1) Notion of Process
Each process has an distinct address space (memory space where variables and objects are stored). Communication between processes can be achieved by:

  • Signals,
  • Tubes (pipes),
  • Shared Memory,
  • Sockets,
  • …,

The java.lang.Runtime class:

  • Allows the controls on the current process: Runtime.getRuntime(),
  • Methods to know the runtime environment: total / free / MaxMemory(), availableProcessors(),
  • Process Control: gc(), exit(), halt(),
  • Creation of other processes: exec(),
  • …,

The java.lang.Process class:

  • Allows the controls on a child process,
  • Creation of children processes: Runtime.getRuntime().exec(″command″),
  • Control: waitFor(), exitValue(), destroy(), getInputStream(), getOutputStream(), getErrorStream(),
  • …,

2) Notion of Thread
A thread allows run multiple portions of code simultaneously in the same process i.e. sharing the same memory space.
The Java environment is multi-threaded:

  • Creating Threads,
  • Synchronization of threads that share data,
  • …,

3) Creating and launching Threads
In Java, a task is actually a class that implements the Runnable interface. This class must have a constructor and a “run” method, which will be called during the actual execution of the task:

01public class MyTask implements Runnable{
02    private String threadName; // name of thread
03     
04    // Constructor
05    public MyTask (String name){
06        this.threadName = name; // set name of thread
07    }
08 
09    public void run(){ 
10        // instructions
11    }
12}

The launch of a task will look like this:

  • Creating an instance of the class are ExecutorService via the static method newFixedThreadPool of Executors class in taking as parameter the number of tasks required,
  • Instantiate the class “MyTask” with a identifier parameter,
  • Launch this task thanks to the execute method of ExecutorService class,
  • Once the instructions of the task were performed, one calls the shutdown method,

4) A first example
The following class MyTask implements Runnable, so that each MyTask object can execute concurrently. A variable sleepTime stores a random integer value chosen when the MyTask constructor executes. Each thread running a MyTask object sleeps for the amount of time specified by the corresponding MyTask object’s sleepTime (for a certain number of milliseconds), then outputs its name.

MyTask.java

01package ho.tests.thread.synchro.test1.order1.launch;
02 
03import java.util.Random;
04 
05/**
06 * Mytask Runnable example
07 * @author Huseyin OZVEREN
08 */
09public class MyTask implements Runnable {
10 
11    private int sleepTime; // random sleep time for thread  
12    private String threadName; // name of thread  
13    private static Random generator = new Random();
14     
15    public MyTask(String name) {
16        // set name of thread
17        this.threadName = name;
18        // random sleep time between 0 and 5 seconds
19        this.sleepTime = generator.nextInt( 5000 );
20    }
21 
22    public void run() {
23        try{
24            // put thread to sleep for sleepTime amount of time
25            System.out.printf( "%s starting and going to sleep for %d milliseconds.\n", this.threadName, this.sleepTime );
26            // put thread to sleep
27            Thread.sleep( this.sleepTime );  
28        } catch ( InterruptedException exception )  { 
29            exception.printStackTrace();
30        }
31        // print thread name
32        System.out.printf( "%s done sleeping\n", this.threadName ); 
33    }      
34}

ApplicationLaunch.java

01package ho.tests.thread.synchro.test1.order1.launch;
02 
03import java.util.concurrent.ExecutorService;
04import java.util.concurrent.Executors;
05 
06/**
07 * Thread launching example
08 * @author Huseyin OZVEREN
09 */
10public class ApplicationLaunch {
11    public static void main(String[] args) {
12         
13        System.out.println( "Starting threads in main thread" );
14         
15        // create and name each runnable
16        MyTask th1 = new MyTask("threadname_1");
17        MyTask th2 = new MyTask("threadname_2");
18        MyTask th3 = new MyTask("threadname_3");
19 
20        // create ExecutorService to manage threads OR empty (better)
21        ExecutorService es = Executors.newFixedThreadPool(3);
22         
23        // start threads and place in runnable state
24        es.execute(th1);
25        es.execute(th2);
26        es.execute(th3);
27 
28        // shutdown worker threads 
29        es.shutdown();
30         
31        System.out.println( "Threads started, main thread ends\n" );
32    }
33}

Outputs in console:

1Starting threads in main thread
2Threads started, main thread ends
3 
4threadname_1 starting and going to sleep for 624 milliseconds.
5threadname_3 starting and going to sleep for 849 milliseconds.
6threadname_2 starting and going to sleep for 719 milliseconds.
7threadname_1 done sleeping
8threadname_2 done sleeping
9threadname_3 done sleeping

5) Synchronization
The use of threads can lead to synchronization requirements to avoid problems with concurrent access to global variables.
So for example, we throw 2 tasks:
– Their instructions will therefore execute simultaneously,
– If these instructions change a global variable in the program, then there will be a problem.

In parallel programming, a “critical section” is a piece of code that can be executed concurrently by multiple threads without risking operational anomalies. These “critical section” synchronized are also named thread-safe, they could be blocks of codes, methods or components like “Vector”, “Hastable”, “ConcurrentHashMap”, “CopyOnWriteArrayList” or “CopyOnWriteArraySet” (whereas the “ArrayList” and “HashMap” are not thread-safe).

Warning: The thread-safe components should be used when there are risks due to changes by multiple threads because they are less efficient than “normal” or classic components.

This is where we need to use methods of synchronization. So, it is necessary to avoid the simultaneous execution of critical sections by multiple threads:
– In Java, the synchronized keyword is used to synchronize threads and prevent them from performing at the same time, critical sections of codes,
– Several threads can run concurrently code synchronized to the same object.

There are 2 possibilities to synchronize code on a object “myObject”:

  • Declare a synchronized method “myMethod” (Synchronization during the call of myObject.myMethod()):
    1public synchronized int myMethod(...) { . . . }
  • Use of a synchronized block on the object “myObject”:
  • 1synchronized(myObject) {
    2// synchronized codes
    3...
    4}

6) Semaphore
Java provides a class java.util.concurrent.Semaphore whose the constructor takes two parameters: an integer representing the current value of the semaphore that defines the number of resources available initially and a boolean indicating whether the method of administration must be FIFO.

Here we are going to develop the previous class MyTask and then synchronize two tasks to get a consistent outputs in console. In this example, the “acquire” method checks if the resource “semaphore” is available, whether this is the case then the semaphore is blocked by the current task, and this task is left to execute. Else the task is blocked until the resource “semaphore” is released by a call to “release” method (by an other task) that gives the resource.

MyTask.java

01package ho.tests.thread.synchro.test1.order2.semaphore;
02 
03import java.util.Random;
04import java.util.concurrent.Semaphore;
05 
06/**
07 * Mytask Runnable example using semaphore synchronization
08 * @author Huseyin OZVEREN
09 */
10public class MyTask implements Runnable {
11     
12    private int sleepTime; // random sleep time for thread  
13    private String threadName; // name of thread  
14    private static Random generator = new Random();
15     
16    Semaphore personalSem;
17    Semaphore neighboringSem;
18 
19    public MyTask(String name, Semaphore personalSem, Semaphore neighboringSem) {
20        // set name of thread
21        this.threadName = name;
22        // random sleep time between 0 and 5 seconds
23        this.sleepTime = generator.nextInt( 5000 );
24        //
25        this.personalSem = personalSem;
26        this.neighboringSem = neighboringSem;
27    }
28 
29    public void run() {
30        try {
31            personalSem.acquire();
32        } catch (InterruptedException ex) {
33            System.out.println(ex);
34        }
35 
36        try{
37            // put thread to sleep for sleepTime amount of time
38            System.out.printf( "%s starting and going to sleep for %d milliseconds.\n", this.threadName, this.sleepTime );
39            // put thread to sleep
40            Thread.sleep( this.sleepTime );  
41        } catch ( InterruptedException exception )  { 
42            exception.printStackTrace();
43        }
44 
45        // print thread name
46        System.out.printf( "%s done sleeping\n", this.threadName );
47 
48        // release semaphore => unblock the next task
49        neighboringSem.release();
50    }
51}

ApplicationSemaphore.java

01package ho.tests.thread.synchro.test1.order2.semaphore;
02 
03import java.util.concurrent.ExecutorService;
04import java.util.concurrent.Executors;
05import java.util.concurrent.Semaphore;
06 
07/**
08 * Thread launching example using semaphore synchronization
09 * @author Huseyin OZVEREN
10 */
11public class ApplicationSemaphore {
12    public static void main(String[] args) {
13        ExecutorService es = Executors.newFixedThreadPool(2);
14        // SEMPAHORE
15        // Param1=current value of the semaphore that defines the number of resources available initially (when is 0 then lock)
16        // Param2=boolean indicating whether the method of administration must be FIFO
17        Semaphore personalSem = new Semaphore(1, true);
18        Semaphore neighboringSem = new Semaphore(0, true);
19 
20        System.out.println( "Starting threads in main thread" );
21 
22        // create and name each runnable
23        // 1st thread
24        MyTask th1 = new MyTask("threadname_1", personalSem, neighboringSem);
25        es.execute(th1);
26 
27        // 2nd thread
28        MyTask th2 = new MyTask("threadname_2", neighboringSem, personalSem);
29        es.execute(th2);
30 
31        // shutdown worker threads 
32        es.shutdown();
33 
34        System.out.println( "Threads started, main thread ends\n" );
35    }
36}

Outputs in console:

1Starting threads in main thread
2Threads started, main thread ends
3 
4threadname_1 starting and going to sleep for 2284 milliseconds.
5threadname_1 done sleeping
6threadname_2 starting and going to sleep for 2438 milliseconds.
7threadname_2 done sleeping

Explanations:
The semaphore “personalSem” is initialized to 1 because it is necessary to have at least a resource initially available to execute the instructions when the method “acquire” will be called in the first task. The semaphore “neighboringSem” is initialized to 0 because in the second task, when the method “acquire” will be called, the task will be blocked because no (0) resource will be available.

An other example with 3 tasks:
ApplicationSemaphoreWith3Tasks.java

01package ho.tests.thread.synchro.test1.order2.semaphore;
02 
03import java.util.concurrent.ExecutorService;
04import java.util.concurrent.Executors;
05import java.util.concurrent.Semaphore;
06 
07/**
08 * Thread launching example using semaphore synchronization
09 * @author Huseyin OZVEREN
10 */
11public class ApplicationSemaphoreWith3Tasks {
12    public static void main(String[] args) {
13        ExecutorService es = Executors.newFixedThreadPool(2);
14        // SEMPAHORE
15        // Param1=current value of the semaphore that defines the number of resources available initially (when is 0 then lock)
16        // Param2=boolean indicating whether the method of administration must be FIFO
17        Semaphore personalSem = new Semaphore(1, true);
18        Semaphore neighboringSem2nd = new Semaphore(0, true);
19        Semaphore neighboringSem3rd = new Semaphore(0, true);
20 
21        System.out.println( "Starting threads in main thread" );
22 
23        // create and name each runnable
24        // 1st thread
25        MyTask th1 = new MyTask("threadname_1", personalSem, neighboringSem2nd);
26        es.execute(th1);
27 
28        // 2nd thread
29        MyTask th2 = new MyTask("threadname_2", neighboringSem2nd, neighboringSem3rd);
30        es.execute(th2);
31 
32        // 3rd thread
33        MyTask th3 = new MyTask("threadname_3", neighboringSem3rd, personalSem);
34        es.execute(th3);
35 
36        // shutdown worker threads 
37        es.shutdown();
38 
39        System.out.println( "Threads started, main thread ends\n" );
40    }
41}

Outputs in console:

1Starting threads in main thread
2Threads started, main thread ends
3 
4threadname_1 starting and going to sleep for 2709 milliseconds.
5threadname_1 done sleeping
6threadname_2 starting and going to sleep for 3964 milliseconds.
7threadname_2 done sleeping
8threadname_3 starting and going to sleep for 3101 milliseconds.
9threadname_3 done sleeping

However, the execution of this example you will find probably that the message “end main task” is not appearing at the end of the execution of two tasks because it has not been synchronized and therefore it may appear at any time.

This will allow us to see a second mechanism is the synchronization barrier.

7) Barrier
Imagine a barrier which remains closed as the two tasks were not performed. Java proposes a class named java.util.concurrent.CountDownLatch which is similar to a barrier initially closed. A construction of this object, a counter block is initialized with the integer value passed as parameter.

So, this class “CountDownLatch” provides the method “await” allowing to tasksto wait the barrier’ survey. To decrement the counter value, it must call the “countdown”. When the counter value reaches 0, it is that all tasks have completed their work and therefore the barrier opens.

Here we are going to develop the previous class MyTask in order to synchronize the main thread and two tasks to get a consistent outputs in console.

MyTask.java

01package ho.tests.thread.synchro.test1.order3.barrier;
02 
03import java.util.Random;
04import java.util.concurrent.Semaphore;
05import java.util.concurrent.CountDownLatch;
06 
07/**
08 * Mytask Runnable example using semaphore and barrier synchronization
09 * @author Huseyin OZVEREN
10 */
11public class MyTask implements Runnable {
12    private int sleepTime; // random sleep time for thread  
13    private String threadName; // name of thread  
14    private static Random generator = new Random();
15     
16    // Semaphores
17    Semaphore personalSem;
18    Semaphore neighboringSem;
19     
20    // Barrier
21    CountDownLatch barrier;
22     
23    public MyTask(String name, Semaphore personalSem, Semaphore neighboringSem, CountDownLatch barrier) {
24        // set name of thread
25        this.threadName = name;
26        // random sleep time between 0 and 5 seconds
27        this.sleepTime = generator.nextInt( 5000 );
28        //
29        this.personalSem = personalSem;
30        this.neighboringSem = neighboringSem;
31        //
32        this.barrier = barrier;
33    }
34     
35 
36    public void run() {
37        try {
38            personalSem.acquire();
39        } catch (InterruptedException ex) {
40            System.out.println(ex);
41        }
42 
43        try{
44            // put thread to sleep for sleepTime amount of time
45            System.out.printf( "%s starting and going to sleep for %d milliseconds.\n", this.threadName, this.sleepTime );
46            // put thread to sleep
47            Thread.sleep( this.sleepTime );  
48        } catch ( InterruptedException exception )  { 
49            exception.printStackTrace();
50        }
51         
52        // print thread name
53        System.out.printf( "%s done sleeping\n", this.threadName );
54         
55        // decrements the counter value of the barrier "bar"
56        barrier.countDown();
57         
58        // release semaphore => unblock the next task
59        neighboringSem.release();
60    }  
61}

ApplicationBarrierSemaphore.java

01package ho.tests.thread.synchro.test1.order3.barrier;
02 
03import java.util.concurrent.CountDownLatch;
04import java.util.concurrent.ExecutorService;
05import java.util.concurrent.Executors;
06import java.util.concurrent.Semaphore;
07 
08 
09/**
10 * Thread launching example using semaphore and barrier synchronization
11 * @author Huseyin OZVEREN
12 */
13public class ApplicationBarrierSemaphore {
14    public static void main(String[] args) {
15        ExecutorService es = Executors.newFixedThreadPool(2);
16        // SEMPAHORE
17        // Param1=current value of the semaphore that defines the number of resources available initially (when is 0 then lock)
18        // Param2=boolean indicating whether the method of administration must be FIFO
19        Semaphore personalSem = new Semaphore(1, true);
20        Semaphore neighboringSem = new Semaphore(0, true);
21         
22        // BARRIER for 2 threads
23        // Param1=blocking counter initialized with an integer value
24        CountDownLatch barrier = new CountDownLatch(2);
25         
26        System.out.println( "Starting threads in main thread" );
27 
28        // create and name each runnable
29        // 1st thread
30        MyTask th1 = new MyTask("threadname_1", personalSem, neighboringSem, barrier);
31        es.execute(th1);
32 
33        // 2nd thread
34        MyTask th2 = new MyTask("threadname_2", neighboringSem, personalSem, barrier);
35        es.execute(th2);
36         
37        try {
38            barrier.await(); // Current thread is blocked until all others threads have call the method bar.countDown();
39        } catch (InterruptedException ex) {
40            System.out.println(ex);
41        }
42 
43        // shutdown worker threads 
44        es.shutdown();
45 
46        System.out.println( "Threads started, main thread ends\n" );
47    }
48}

Outputs in console:

1Starting threads in main thread
2threadname_1 starting and going to sleep for 42 milliseconds.
3threadname_1 done sleeping
4threadname_2 starting and going to sleep for 2610 milliseconds.
5threadname_2 done sleeping
6Threads started, main thread ends

8) Mutex
Finally, I would like expose you the MutEx (MUTual EXclusion). But what is it?
A Mutex is nothing but simple non-re entrant mutual exclusion lock. The lock is free upon construction. Each acquire gets the lock, and each release frees it. Releasing a lock that is already free has no effect. MUTEX is achieved in java by the keyword “synchronized” used when two or more methods try to access the same variables/Data structure. Mutex can be useful in constructions that cannot be expressed using java synchronized blocks because the acquire/release pairs do not occur in the same method or code block.

So, Java proposes a class named java.util.concurrent.locks.ReentrantLock:
“A reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities.”

The main use of Mutex is sharing data or resources in a multitasking environment. For example, if a task changes and displays a variable, which must be protected during its modification. This requirement can be achieved through the use of semaphores, however, a lock or may be more practical.

Here we are going to develop the previous class MyTask in order to synchronize the main thread and two tasks to get a consistent outputs in console.

MyTask.java

01package ho.tests.thread.synchro.test1.order4.mutex;
02 
03import java.util.Random;
04import java.util.concurrent.CountDownLatch;
05import java.util.concurrent.locks.ReentrantLock;
06 
07/**
08 * Mytask Runnable example using mutex and barrier synchronization
09 * @author Huseyin OZVEREN
10 */
11public class MyTask implements Runnable {
12    private int sleepTime; // random sleep time for thread  
13    private String threadName; // name of thread  
14    private static Random generator = new Random();
15 
16    // Mutex
17    ReentrantLock mutex;
18    // Barrier
19    CountDownLatch barrier;
20     
21    public MyTask(String name, ReentrantLock mutex, CountDownLatch barrier) {
22        // set name of thread
23        this.threadName = name;
24        // random sleep time between 0 and 5 seconds
25        this.sleepTime = generator.nextInt( 5000 );
26        //
27        this.mutex = mutex;
28        //
29        this.barrier = barrier;
30    }
31 
32    public void run() {
33        // LOCK
34        mutex.lock();
35 
36        try {
37            // put thread to sleep for sleepTime amount of time
38            System.out.printf( "%s starting and going to sleep for %d milliseconds.\n", this.threadName, this.sleepTime );
39            // put thread to sleep
40            Thread.sleep( this.sleepTime );  
41             
42            // Instruction to protect = "critical section"
43            System.out.printf("%s before modif Application.objToProtectSynchro='%s'\n", this.threadName, ApplicationBarrierMutex.objToProtectSynchro);
44 
45            // Modify the "critical resource"
46            ApplicationBarrierMutex.objToProtectSynchro = "value fixed by Task "+this.threadName;
47             
48            System.out.printf("%s after modif Application.objToProtectSynchro='%s'\n", this.threadName, ApplicationBarrierMutex.objToProtectSynchro);
49        } catch (InterruptedException exception) {
50            exception.printStackTrace();
51        } finally {
52            // UNLOCK
53            mutex.unlock();
54 
55            // decrements the counter value of the barrier "bar"
56            barrier.countDown();
57        }
58         
59        // print thread name
60        System.out.printf( "%s done sleeping\n", this.threadName );
61    }
62}

ApplicationBarrierMutex.java

01package ho.tests.thread.synchro.test1.order4.mutex;
02 
03 
04import java.util.concurrent.CountDownLatch;
05import java.util.concurrent.ExecutorService;
06import java.util.concurrent.Executors;
07import java.util.concurrent.locks.ReentrantLock;
08 
09/**
10 * Thread launching example using mutex and barrier synchronization
11 * @author Huseyin OZVEREN
12 */
13public class ApplicationBarrierMutex {
14     
15    // Variable which could be modified by several tasks = "critical resource"
16    public static String objToProtectSynchro = "";
17     
18    public static void main(String[] args) {
19        ExecutorService es = Executors.newFixedThreadPool(2);
20        //MUTEX:
21        // Param1=bboolean indicating whether the method of administration must be FIFO on this lock.
22        ReentrantLock mutex = new ReentrantLock(true);
23         
24        // BARRIER for 2 threads
25        // Param1=blocking counter initialized with an integer value
26        CountDownLatch barrier = new CountDownLatch(2);
27         
28        System.out.println( "Starting threads in main thread" );
29 
30         
31        // create and name each runnable
32        // 1st thread
33        MyTask th1 = new MyTask("threadname_1", mutex, barrier);
34        es.execute(th1);
35 
36        // 2nd thread
37        MyTask th2 = new MyTask("threadname_2", mutex, barrier);
38        es.execute(th2);
39         
40        try {
41            barrier.await(); // Current thread is blocked until all others threads have call the method bar.countDown();
42        } catch (InterruptedException ex) {
43            System.out.println(ex);
44        }
45 
46        // shutdown worker threads 
47        es.shutdown();
48 
49        System.out.println( "Threads started, main thread ends\n" );
50    }
51}

Outputs in console:

01Starting threads in main thread
02threadname_1 starting and going to sleep for 239 milliseconds.
03threadname_1 before modif Application.objToProtectSynchro=''
04threadname_1 after modif Application.objToProtectSynchro='value fixed by Task threadname_1'
05threadname_1 done sleeping
06threadname_2 starting and going to sleep for 3142 milliseconds.
07threadname_2 before modif Application.objToProtectSynchro='value fixed by Task threadname_1'
08threadname_2 after modif Application.objToProtectSynchro='value fixed by Task threadname_2'
09threadname_2 done sleeping
10Threads started, main thread ends

Here, this is an article that ends.

Best regards,

Huseyin OZVEREN

Download the sources of this article: java_thread_synchro.zip

1 thought on “Java: Multi tasks, Multi threading, Synchronization, Semaphore, Mutex, Barrier”

Leave a Reply to aishvary Cancel reply

Your email address will not be published.

Time limit is exhausted. Please reload CAPTCHA.

Related Post