After my post concerning presentation of Queue/BlockingQueue, I would continue with a post about the BlockingQueue implementation: java.util.concurrent.ArrayBlockingQueue.
This implementation of BlockingQueue stores and keeps the elements internally in a bounded blocking queue backed by an array and in FIFO (First In, First Out) order. This is a classic “bounded buffer”, in which a fixed-sized array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be increased.
We will illustrate our writing by the example of pool/queue containing ‘no thread-safe’ business objects MyNoTheadSafeTask which will be accessed by a series of consumer/producer threads: each thread takes an instance from queue, calls an action on this instance and release (put) it again in the queue.
In our example, this business object is a simple POJO, however, this POJO could be a ‘no thread-safe’ bean Spring with the scope “prototype”:
<bean id="fileBean" class="com.ho.utils.businessobject.MyNoTheadSafeTask" scope="prototype"> <property name="property1" value="value1" /> </bean>
So, our business object :
- has the attribut name instanceName and random sleep time between 0 and 5 seconds:
private int sleepTime; // random sleep time for instance private String instanceName; // name of instance private static Random generator = new Random(); public MyNoTheadSafeTask(String name){ // set name of instance this.instanceName = name; // random sleep time between 0 and 5 seconds this.sleepTime = generator.nextInt( 5000 ); }
public boolean action(String paramValue1, int paramValue2) throws Throwable{ try{ // put instance to sleep for sleepTime amount of time System.out.printf("--> '%s' starting 'action' and going to sleep for %d milliseconds.\n", this.instanceName, this.sleepTime); // put instance to sleep Thread.sleep( this.sleepTime ); } catch ( InterruptedException exception ) { exception.printStackTrace(); } return true; }
Then, the MySingletonDelegate singleton:
- contains the pool/queue component ArrayBlockingQueue:
private ArrayBlockingQueue <MyNoTheadSafeTask> noTheadSafeTaskQueue = new ArrayBlockingQueue<MyNoTheadSafeTask>(5);
private MySingletonDelegate() { for (int i=0; i < 5; i++) { String instanceName = "instanceName_"+i; System.out.println("NoTheadSafeTask.createInstance("+instanceName+");"); MyNoTheadSafeTask noTheadSafeTask = new MyNoTheadSafeTask(instanceName); noTheadSafeTaskQueue.add(noTheadSafeTask); } // end-if }
private synchronized MyNoTheadSafeTask getNoTheadSafeTaskFromPool() { try { MyNoTheadSafeTask noTheadSafeTask = noTheadSafeTaskQueue.take(); System.out.printf("--> '%s' instance has been taken from the queue (new size: %d) \n", noTheadSafeTask.getInstanceName(), noTheadSafeTaskQueue.size()); return noTheadSafeTask; } catch (InterruptedException e) { System.out.println("getNoTheadSafeTaskFromPool : an InterruptedException occured:"+e.getMessage()); } // end-try return null; }
private final void releaseNoTheadSafeTaskToPool(MyNoTheadSafeTask noTheadSafeTask) { try { noTheadSafeTaskQueue.put(noTheadSafeTask); System.out.printf("--> '%s' instance has been released from the queue (new size: %d) \n", noTheadSafeTask.getInstanceName(), noTheadSafeTaskQueue.size()); } catch (InterruptedException e) { System.out.println("releaseNoTheadSafeTaskToPool : an InterruptedException occured:"+e.getMessage()); } // end-try }
public boolean completeAction(String paramValue1, int paramValue2) throws Throwable { MyNoTheadSafeTask noTheadSafeTask = getNoTheadSafeTaskFromPool(); try { return noTheadSafeTask.action(paramValue1, paramValue2); } finally { releaseNoTheadSafeTaskToPool(noTheadSafeTask); } }
At last, the main method which will:
– create the singleton MySingletonDelegate which will create QUEUE,
– create 5 threads which will each call the business method completeAction provided by the above singleton MySingletonDelegate,
– execute these 5 threads due to an ExecutorService;
public static void main(String[] args) { try { // create singleton which will create QUEUE MySingletonDelegate.getInstance(); // Client classes of the ClientDelegate ExecutorService es = Executors.newFixedThreadPool(5); // for (int i=0; i < 5; i++) { //Thread es.submit(new Runnable() { public void run() { try { MySingletonDelegate.getInstance().completeAction(null, 1); } catch (Throwable e) { e.printStackTrace(); } } }); } // end-for es.shutdown(); } catch (Throwable e) { e.printStackTrace(); } }
… and the outputs would be:
NoTheadSafeTask.createInstance(instanceName_0); NoTheadSafeTask.createInstance(instanceName_1); NoTheadSafeTask.createInstance(instanceName_2); NoTheadSafeTask.createInstance(instanceName_3); NoTheadSafeTask.createInstance(instanceName_4); --> 'instanceName_0' instance has been taken from the queue (new size: 4) --> 'instanceName_0' starting 'action' and going to sleep for 3885 milliseconds. --> 'instanceName_1' instance has been taken from the queue (new size: 3) --> 'instanceName_1' starting 'action' and going to sleep for 3050 milliseconds. --> 'instanceName_2' instance has been taken from the queue (new size: 2) --> 'instanceName_2' starting 'action' and going to sleep for 3248 milliseconds. --> 'instanceName_3' instance has been taken from the queue (new size: 1) --> 'instanceName_3' starting 'action' and going to sleep for 2834 milliseconds. --> 'instanceName_4' instance has been taken from the queue (new size: 0) --> 'instanceName_4' starting 'action' and going to sleep for 986 milliseconds. --> 'instanceName_4' instance has been released from the queue (new size: 1) --> 'instanceName_3' instance has been released from the queue (new size: 2) --> 'instanceName_1' instance has been released from the queue (new size: 3) --> 'instanceName_2' instance has been released from the queue (new size: 4) --> 'instanceName_0' instance has been released from the queue (new size: 5)
Conclusion
In this example, we could analyze that the FIFO order is kept because the instances taken by the consumer threads are in the same order than the adding’s order in the queue/pool.
NoTheadSafeTask.createInstance(instanceName_0); NoTheadSafeTask.createInstance(instanceName_1); NoTheadSafeTask.createInstance(instanceName_2); NoTheadSafeTask.createInstance(instanceName_3); NoTheadSafeTask.createInstance(instanceName_4); ... --> 'instanceName_0' instance has been taken from the queue (new size: 4) --> 'instanceName_1' instance has been taken from the queue (new size: 3) --> 'instanceName_2' instance has been taken from the queue (new size: 2) --> 'instanceName_3' instance has been taken from the queue (new size: 1) --> 'instanceName_4' instance has been taken from the queue (new size: 0)
The source code of this article and the a test cases are in the ZIP file attachement.
Download: java_queue.zip
Best regards,