After my post concerning presentation of Queue/BlockingQueue, I would continue with a post about the BlockingQueue implementation: java.util.concurrent.DelayQueue.
This implementation of BlockingQueue stores and keeps the elements internally until a certain delay has expired. So, this queue is an unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null. Expiration occurs when an element’s getDelay(…) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.
We will illustrate our writing by example of queue containing business objects MyNoTheadSafeTask which will be accessed by:
– one producer thread which cyclically, will try to put 10 elements in the queue,
– 10 consumer threads which will take these elements from queue when their when its delay will have expired.
In our example, this business object is a simple POJO named MyNoTheadSafeTask which implements the interface Delayed:
public class MyNoTheadSafeTask implements Delayed { ... }
- has the attribut name instanceName, queueInsertTime and random delay time between 0 and 5000 milliseconds endOfDelay corresponding to the admitted time between the current time and the insertion time of element in Queue:
private String instanceName; // name of instance private static Random generator = new Random(); private long queueInsertTime; // DateTime corresponding to the insertion time in Queue private long endOfDelay; // Max Delay admitted between the current time and the insertion time in Queue public MyNoTheadSafeTask(String name) { // set name of instance this.instanceName = name; // random time between 0 and 5000 milliseconds this.endOfDelay = generator.nextInt( 5000 ); this.queueInsertTime = System.currentTimeMillis(); }
@Override public int compareTo(Delayed o) { MyNoTheadSafeTask myNoTheadSafeTask = (MyNoTheadSafeTask) o; int ret = 0; // if (this.endOfDelay < myNoTheadSafeTask.endOfDelay){ ret = -1; }else if (this.endOfDelay > myNoTheadSafeTask.endOfDelay){ ret = 1; }else if (this.getQueueInsertTime() == myNoTheadSafeTask.getQueueInsertTime()){ ret = 0; } return ret; }
@Override public long getDelay(TimeUnit unit) { long tmp = unit.convert(getQueueInsertTime() + endOfDelay - System.currentTimeMillis(), TimeUnit.MILLISECONDS); return tmp; }
Then, the MySingletonDelegate singleton:
- contains the pool/queue component DelayQueue:
private DelayQueue<MyNoTheadSafeTask> noTheadSafeTaskQueue = new DelayQueue<MyNoTheadSafeTask>();
private MySingletonDelegate() {}
public synchronized MyNoTheadSafeTask getNoTheadSafeTaskFromPool() { try { MyNoTheadSafeTask noTheadSafeTask = noTheadSafeTaskQueue.take(); long dequeueTime = System.currentTimeMillis(); StringBuffer sb = new StringBuffer(); sb.append("——————–"+"\n"); sb.append("--> '"+noTheadSafeTask.getInstanceName()+"' instance has been taken from the queue (new size: "+noTheadSafeTaskQueue.size()+") \n"); sb.append("Queue Size ( Cons ) :"+MySingletonDelegate.getInstance().getNoTheadSafeTaskQueue().size()+"\n"); sb.append("Inserted Element :"+noTheadSafeTask.getInstanceName()+"\n"); sb.append("Queue Insertion Time :"+noTheadSafeTask.getQueueInsertTime()+"\n"); sb.append("Now ( dequeue time ) :"+dequeueTime+"\n"); sb.append("Expected Delay (ms):"+noTheadSafeTask.getEndOfDelay()+"\n"); sb.append("Actual Delay (ms):"+(dequeueTime-noTheadSafeTask.getQueueInsertTime())+"\n"); sb.append("Differences Actual and Expected Delay (ms):"+((dequeueTime-noTheadSafeTask.getQueueInsertTime())-noTheadSafeTask.getEndOfDelay())+"\n"); sb.append("——————–"+"\n"); System.out.println(sb.toString()); return noTheadSafeTask; } catch (Throwable e) { System.out.println("getNoTheadSafeTaskFromPool : an Throwable occured:"+e.getMessage()); } // end-try return null; }
public final void putNoTheadSafeTaskToPool(MyNoTheadSafeTask noTheadSafeTask) { try { noTheadSafeTaskQueue.put(noTheadSafeTask); StringBuffer sb = new StringBuffer(); sb.append("——————–"+"\n"); sb.append("--> '"+ noTheadSafeTask.getInstanceName()+"' instance has been put in the queue\n"); sb.append("Queue Size ( Cons ) :"+MySingletonDelegate.getInstance().getNoTheadSafeTaskQueue().size()+"\n"); sb.append("Inserted Element :"+noTheadSafeTask.getInstanceName()+"\n"); sb.append("Queue Insertion Time :"+noTheadSafeTask.getQueueInsertTime()+"\n"); sb.append("Expected Delay (ms):"+noTheadSafeTask.getEndOfDelay()+"\n"); sb.append("——————–"+"\n"); System.out.println(sb.toString()); } catch (Throwable e) { System.out.println("putNoTheadSafeTaskToPool : an Throwable occured:"+e.getMessage()); } // end-try }
At last, the main method which will:
– create the singleton MySingletonDelegate which will create QUEUE,
– create one producer thread which will put 10 elements of “NoTheadSafeTask” in queue,
– create 10 consumer threads which will each take an element ‘expired’ of “NoTheadSafeTask” from queue,
– execute these 11 threads due to an ExecutorService;
// Counter of consumer threads created private static AtomicInteger consumerCounterAtomic = new AtomicInteger(0); public static void main(String[] args) { try { // create singleton which will create QUEUE MySingletonDelegate.getInstance(); // Consumers and producer ExecutorService es = Executors.newFixedThreadPool(11); // Producer es.submit(new Runnable() { public void run() { int i = 0; while(i<10){ // Test with 10 elements try { // This runnable will create the instances of "NoTheadSafeTask" in the pool component String instanceName = "instanceName_"+i; // Put an instance of "NoTheadSafeTask" to the pool MyNoTheadSafeTask noTheadSafeTask = new MyNoTheadSafeTask(instanceName); MySingletonDelegate.getInstance().putNoTheadSafeTaskToPool(noTheadSafeTask); Thread.sleep(1); // i++; } catch (Throwable e) { e.printStackTrace(); } } } }); //Consumers while(consumerCounterAtomic.intValue() < 10) { //Thread es.submit(new Runnable() { public void run() { try { // Retrieve of a instance of "NoTheadSafeTask" from the pool MyNoTheadSafeTask noTheadSafeTask = MySingletonDelegate.getInstance().getNoTheadSafeTaskFromPool(); } catch (Throwable e) { e.printStackTrace(); } } }); // Wait 1s between the threads' launching Thread.sleep( 1000 ); consumerCounterAtomic.incrementAndGet(); } // end-for es.shutdown(); } catch (Throwable e) { e.printStackTrace(); } }
… and the outputs would be:
——————– --> 'instanceName_0' instance has been put in the queue Queue Size ( Cons ) :1 Inserted Element :instanceName_0 Queue Insertion Time :1339716126491 Expected Delay (ms):3363 ——————– ——————– --> 'instanceName_1' instance has been put in the queue Queue Size ( Cons ) :2 Inserted Element :instanceName_1 Queue Insertion Time :1339716126522 Expected Delay (ms):229 ——————– ——————– --> 'instanceName_2' instance has been put in the queue Queue Size ( Cons ) :3 Inserted Element :instanceName_2 Queue Insertion Time :1339716126538 Expected Delay (ms):1642 ——————– ——————– --> 'instanceName_3' instance has been put in the queue Queue Size ( Cons ) :4 Inserted Element :instanceName_3 Queue Insertion Time :1339716126539 Expected Delay (ms):2309 ——————– ——————– --> 'instanceName_4' instance has been put in the queue Queue Size ( Cons ) :5 Inserted Element :instanceName_4 Queue Insertion Time :1339716126540 Expected Delay (ms):3241 ——————– ——————– --> 'instanceName_5' instance has been put in the queue Queue Size ( Cons ) :6 Inserted Element :instanceName_5 Queue Insertion Time :1339716126541 Expected Delay (ms):3881 ——————– ——————– --> 'instanceName_6' instance has been put in the queue Queue Size ( Cons ) :7 Inserted Element :instanceName_6 Queue Insertion Time :1339716126542 Expected Delay (ms):4995 ——————– ——————– --> 'instanceName_7' instance has been put in the queue Queue Size ( Cons ) :8 Inserted Element :instanceName_7 Queue Insertion Time :1339716126543 Expected Delay (ms):4834 ——————– ——————– --> 'instanceName_8' instance has been put in the queue Queue Size ( Cons ) :9 Inserted Element :instanceName_8 Queue Insertion Time :1339716126544 Expected Delay (ms):4998 ——————– ——————– --> 'instanceName_9' instance has been put in the queue Queue Size ( Cons ) :10 Inserted Element :instanceName_9 Queue Insertion Time :1339716126545 Expected Delay (ms):4002 ——————– ——————– --> 'instanceName_1' instance has been taken from the queue (new size: 9) Queue Size ( Cons ) :9 Inserted Element :instanceName_1 Queue Insertion Time :1339716126522 Now ( dequeue time ) :1339716126766 Expected Delay (ms):229 Actual Delay (ms):244 Differences Actual and Expected Delay (ms):15 ——————– ——————– --> 'instanceName_2' instance has been taken from the queue (new size: 8) Queue Size ( Cons ) :8 Inserted Element :instanceName_2 Queue Insertion Time :1339716126538 Now ( dequeue time ) :1339716128186 Expected Delay (ms):1642 Actual Delay (ms):1648 Differences Actual and Expected Delay (ms):6 ——————– ——————– --> 'instanceName_3' instance has been taken from the queue (new size: 7) Queue Size ( Cons ) :7 Inserted Element :instanceName_3 Queue Insertion Time :1339716126539 Now ( dequeue time ) :1339716128857 Expected Delay (ms):2309 Actual Delay (ms):2318 Differences Actual and Expected Delay (ms):9 ——————– ——————– --> 'instanceName_4' instance has been taken from the queue (new size: 6) Queue Size ( Cons ) :6 Inserted Element :instanceName_4 Queue Insertion Time :1339716126540 Now ( dequeue time ) :1339716129793 Expected Delay (ms):3241 Actual Delay (ms):3253 Differences Actual and Expected Delay (ms):12 ——————– ——————– --> 'instanceName_0' instance has been taken from the queue (new size: 5) Queue Size ( Cons ) :5 Inserted Element :instanceName_0 Queue Insertion Time :1339716126491 Now ( dequeue time ) :1339716130543 Expected Delay (ms):3363 Actual Delay (ms):4052 Differences Actual and Expected Delay (ms):689 ——————– ——————– --> 'instanceName_5' instance has been taken from the queue (new size: 4) Queue Size ( Cons ) :4 Inserted Element :instanceName_5 Queue Insertion Time :1339716126541 Now ( dequeue time ) :1339716131557 Expected Delay (ms):3881 Actual Delay (ms):5016 Differences Actual and Expected Delay (ms):1135 ——————– ——————– --> 'instanceName_9' instance has been taken from the queue (new size: 3) Queue Size ( Cons ) :3 Inserted Element :instanceName_9 Queue Insertion Time :1339716126545 Now ( dequeue time ) :1339716132572 Expected Delay (ms):4002 Actual Delay (ms):6027 Differences Actual and Expected Delay (ms):2025 ——————– ——————– --> 'instanceName_7' instance has been taken from the queue (new size: 2) Queue Size ( Cons ) :2 Inserted Element :instanceName_7 Queue Insertion Time :1339716126543 Now ( dequeue time ) :1339716133587 Expected Delay (ms):4834 Actual Delay (ms):7044 Differences Actual and Expected Delay (ms):2210 ——————– ——————– --> 'instanceName_6' instance has been taken from the queue (new size: 1) Queue Size ( Cons ) :1 Inserted Element :instanceName_6 Queue Insertion Time :1339716126542 Now ( dequeue time ) :1339716134602 Expected Delay (ms):4995 Actual Delay (ms):8060 Differences Actual and Expected Delay (ms):3065 ——————– ——————– --> 'instanceName_8' instance has been taken from the queue (new size: 0) Queue Size ( Cons ) :0 Inserted Element :instanceName_8 Queue Insertion Time :1339716126544 Now ( dequeue time ) :1339716135617 Expected Delay (ms):4998 Actual Delay (ms):9073 Differences Actual and Expected Delay (ms):4075 ——————–
Conclusion
In this example, we could analyze that after the adding of instances of “NoTheadSafeTask” in queue by the producer, the consumers threads take the elements expired after a period of time. This order is aleatory and different of FIFO (First In, First Out) order and FILO (First In, Last Out) order:
--> 'instanceName_1' instance has been taken from the queue (new size: 9) Expected Delay (ms):229 --> 'instanceName_2' instance has been taken from the queue (new size: 8) Expected Delay (ms):1642 --> 'instanceName_3' instance has been taken from the queue (new size: 7) Expected Delay (ms):2309 --> 'instanceName_4' instance has been taken from the queue (new size: 6) Expected Delay (ms):3241 --> 'instanceName_0' instance has been taken from the queue (new size: 5) Expected Delay (ms):3363 --> 'instanceName_5' instance has been taken from the queue (new size: 4) Expected Delay (ms):3881 --> 'instanceName_9' instance has been taken from the queue (new size: 3) Expected Delay (ms):4002 --> 'instanceName_7' instance has been taken from the queue (new size: 2) Expected Delay (ms):4834 --> 'instanceName_6' instance has been taken from the queue (new size: 1) Expected Delay (ms):4995 --> 'instanceName_8' instance has been taken from the queue (new size: 0) Expected Delay (ms):4998
The source code of this article and the a test cases are in the ZIP file attachement.
Download: java_queue.zip
Best regards,
Huseyin OZVEREN.