Main Tutorials

Java BlockingQueue examples

In Java, we can use BlockingQueue to create a queue which shared by both producer and the consumer.

  1. Producer – Generate data and put it into the queue.
  2. Consumer – Remove the data from the queue.
Note
Read this article to understand what is producer and consumer.

The BlockingQueue implementations are thread-safe, safely be used with multiple producers and multiple consumers.

1. BlockingQueue

A simple BlockingQueue example, a producer generates data and put it into a queue, at the same time, a consumer takes the data from the same queue.

1.1 Producer – A Runnable object to put 20 integers into a queue.

ExecutorExample1.java

package com.mkyong.concurrency.queue.simple.raw;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private final BlockingQueue<Integer> queue;

    @Override
    public void run() {

        try {
            process();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    private void process() throws InterruptedException {

        // Put 20 ints into Queue
        for (int i = 0; i < 20; i++) {
            System.out.println("[Producer] Put : " + i);
            queue.put(i);
            System.out.println("[Producer] Queue remainingCapacity : " + queue.remainingCapacity());
            Thread.sleep(100);
        }

    }

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
}

1.2 Consumer – A Runnable object to take items from a queue.

Consumer.java

package com.mkyong.concurrency.queue.simple.raw;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private final BlockingQueue<Integer> queue;

    @Override
    public void run() {

        try {
            while (true) {
                Integer take = queue.take();
                process(take);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    private void process(Integer take) throws InterruptedException {
        System.out.println("[Consumer] Take : " + take);
        Thread.sleep(500);
    }

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
}

1.3 Run it. Start 1 producer and 1 consumer, and create a queue with size of 10.

Main.java

package com.mkyong.concurrency.queue.simple;

import com.mkyong.concurrency.queue.simple.raw.Consumer;
import com.mkyong.concurrency.queue.simple.raw.Producer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    public static void main(String[] args) {

        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();

    }

}

Output

The producer won’t try to put more data into the queue if it is full.


[Producer] Put : 0
[Producer] Queue remainingCapacity : 9
[Consumer] Take : 0
[Producer] Put : 1
[Producer] Queue remainingCapacity : 9
[Producer] Put : 2
[Producer] Queue remainingCapacity : 8
[Producer] Put : 3
[Producer] Queue remainingCapacity : 7
[Producer] Put : 4
[Producer] Queue remainingCapacity : 6
[Producer] Put : 5
[Producer] Queue remainingCapacity : 5
[Consumer] Take : 1
[Producer] Put : 6
[Producer] Queue remainingCapacity : 5
[Producer] Put : 7
[Producer] Queue remainingCapacity : 4
[Producer] Put : 8
[Producer] Queue remainingCapacity : 3
[Producer] Put : 9
[Producer] Queue remainingCapacity : 2
[Producer] Put : 10
[Producer] Queue remainingCapacity : 1
[Consumer] Take : 2
[Producer] Put : 11
[Producer] Queue remainingCapacity : 1
[Producer] Put : 12
[Producer] Queue remainingCapacity : 0
[Producer] Put : 13
[Consumer] Take : 3
[Producer] Queue remainingCapacity : 0
[Producer] Put : 14
[Consumer] Take : 4
[Producer] Queue remainingCapacity : 0
[Producer] Put : 15
[Consumer] Take : 5
[Producer] Queue remainingCapacity : 0
[Producer] Put : 16
[Consumer] Take : 6
[Producer] Queue remainingCapacity : 0
[Producer] Put : 17
[Consumer] Take : 7
[Producer] Queue remainingCapacity : 0
[Producer] Put : 18
[Consumer] Take : 8
[Producer] Queue remainingCapacity : 0
[Producer] Put : 19
[Consumer] Take : 9
[Producer] Queue remainingCapacity : 0
[Consumer] Take : 10
[Consumer] Take : 11
[Consumer] Take : 12
[Consumer] Take : 13
[Consumer] Take : 14
[Consumer] Take : 15
[Consumer] Take : 16
[Consumer] Take : 17
[Consumer] Take : 18
[Consumer] Take : 19

The program will not stop or exit, it will keep on running there to put and take data from the BlockingQueue

2. BlockingQueue + Poison Pill

The “poison pill” is a general solution to stop or interrupt both producer and consumer threads. The idea is the producer put a “poison pill” into the queue and exit, if the “consumer” see the “poison pill” then stop and exit.

2.1 A producer with poison pill solution.

ProducerPoison.java

package com.mkyong.concurrency.queue.simple.poison;

import java.util.concurrent.BlockingQueue;

public class ProducerPoison implements Runnable {

    private final BlockingQueue<Integer> queue;
    private final Integer POISON;

    @Override
    public void run() {

        try {
            process();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            while (true) {
                try {
                    queue.put(POISON);
                    break;
                } catch (InterruptedException e) {
                    //...
                }
            }
        }

    }

    private void process() throws InterruptedException {

        // Put 20 elements into Queue
        for (int i = 0; i < 20; i++) {
            System.out.println("[Producer] Put : " + i);
            queue.put(i);
            System.out.println("[Producer] Queue remainingCapacity : " + queue.remainingCapacity());
            Thread.sleep(100);
        }

    }

    public ProducerPoison(BlockingQueue<Integer> queue, Integer POISON) {
        this.queue = queue;
        this.POISON = POISON;
    }

}

2.2 A consumer with poison pill solution.

ConsumerPoison.java

package com.mkyong.concurrency.queue.simple.poison;

import java.util.concurrent.BlockingQueue;

public class ConsumerPoison implements Runnable {

    private final BlockingQueue<Integer> queue;
    private final Integer POISON;

    @Override
    public void run() {

        try {
            while (true) {
                Integer take = queue.take();
                process(take);

                // if this is a poison pill, break, exit
                if (take == POISON) {
                    break;
                }

            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    private void process(Integer take) throws InterruptedException {
        System.out.println("[Consumer] Take : " + take);
        Thread.sleep(500);
    }

    public ConsumerPoison(BlockingQueue<Integer> queue, Integer POISON) {
        this.queue = queue;
        this.POISON = POISON;
    }
}

2.3 Start 2 producers and 2 consumers.

Main.java

package com.mkyong.concurrency.queue.simple;

import com.mkyong.concurrency.queue.simple.poison.ConsumerPoison;
import com.mkyong.concurrency.queue.simple.poison.ProducerPoison;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    public static void main(String[] args) {

        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

        //new Thread(new Producer(queue)).start();
        //new Thread(new Consumer(queue)).start();

        Integer poison = -1;
        new Thread(new ProducerPoison(queue, poison)).start();
        new Thread(new ProducerPoison(queue, poison)).start();

        new Thread(new ConsumerPoison(queue, poison)).start();
        new Thread(new ConsumerPoison(queue, poison)).start();

    }

}

Output


[Producer] Put : 0
[Producer] Put : 0
[//...
[Consumer] Take : 18
[Consumer] Take : 18
[Consumer] Take : 19
[Consumer] Take : 19
[Consumer] Take : -1
[Consumer] Take : -1

Process finished with exit code 0

3. BlockingQueue + File Indexing

A BlockingQueue example to create a simple file indexing engine. A producer crawls a directory and puts the filename into the queue, at the same time, the consumer take the filename from the same queue and index it.

3.1 Producer.

FileCrawlerProducer.java

package com.mkyong.concurrency.queue.crawler;

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;

// Producer
// Crawl file system and put the filename in BlockingQueue.
public class FileCrawlerProducer implements Runnable {

    private final BlockingQueue<File> fileQueue;
    private final FileFilter fileFilter;
    private final File file;
    private final File POISON;
    private final int N_POISON_PILL_PER_PRODUCER;

    @Override
    public void run() {

        try {
            crawl(file);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            while (true) {
                try {
                    System.out.println(Thread.currentThread().getName()
                            + " - FileCrawlerProducer is done, try poison all the consumers!");
                    // poison all threads
                    for (int i = 0; i < N_POISON_PILL_PER_PRODUCER; i++) {
                        System.out.println(Thread.currentThread().getName() + " - puts poison pill!");
                        fileQueue.put(POISON);
                    }
                    break;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public FileCrawlerProducer(BlockingQueue<File> fileQueue,
                               FileFilter fileFilter, File file, 
                               File POISON, int n_POISON_PILL_PER_PRODUCER) {
        this.fileQueue = fileQueue;
        this.fileFilter = fileFilter;
        this.file = file;
        this.POISON = POISON;
        N_POISON_PILL_PER_PRODUCER = n_POISON_PILL_PER_PRODUCER;
    }

    private void crawl(File root) throws InterruptedException {

        File[] entries = root.listFiles(fileFilter);
        if (entries != null) {
            for (File entry : entries) {
                if (entry.isDirectory()) {
                    crawl(entry);
                } else if (!isIndexed(entry)) {
                    System.out.println("[FileCrawlerProducer] - Found..." 
                            + entry.getAbsoluteFile());
                    fileQueue.put(entry);
                }
            }
        }

    }

    private boolean isIndexed(File f) {
        return false;
    }

}

3.2 Consumer.

IndexerConsumer.java

package com.mkyong.concurrency.queue.crawler;

import java.io.File;
import java.util.concurrent.BlockingQueue;

// Consumer
public class IndexerConsumer implements Runnable {

    private final BlockingQueue<File> fileQueue;
    private final File POISON;

    @Override
    public void run() {

        try {
            while (true) {
                File take = fileQueue.take();
                if (take == POISON) {
                    System.out.println(Thread.currentThread().getName() + " die");
                    break;
                }
                indexFile(take);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    public void indexFile(File file) {
        if (file.isFile()) {
            System.out.println(Thread.currentThread().getName() 
                    + " [IndexerConsumer] - Indexing..." + file.getAbsoluteFile());
        }

    }

    public IndexerConsumer(BlockingQueue<File> fileQueue, File POISON) {
        this.fileQueue = fileQueue;
        this.POISON = POISON;
    }
}

3.3 Start 1 producer and 2 consumers.

Main.java

package com.mkyong.concurrency.queue.crawler;

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    private static final File POISON = new File("This is a POISON PILL");

    public static void main(String[] args) {

        int N_PRODUCERS = 1;
        int N_CONSUMERS = 2;//Runtime.getRuntime().availableProcessors();
        int N_POISON_PILL_PER_PRODUCER = N_CONSUMERS / N_PRODUCERS;
        int N_POISON_PILL_REMAIN = N_CONSUMERS % N_PRODUCERS;

        System.out.println("N_PRODUCERS : " + N_PRODUCERS);
        System.out.println("N_CONSUMERS : " + N_CONSUMERS);
        System.out.println("N_POISON_PILL_PER_PRODUCER : " + N_POISON_PILL_PER_PRODUCER);
        System.out.println("N_POISON_PILL_REMAIN : " + N_POISON_PILL_REMAIN);

        //unbound queue, no limit
        BlockingQueue<File> queue = new LinkedBlockingQueue<>();

        FileFilter filter = new FileFilter() {
            public boolean accept(File file) {
                return true;
            }
        };

        File root = new File("C:\\users");

        for (int i = 0; i < N_PRODUCERS - 1; i++) {
            new Thread(new FileCrawlerProducer(queue, filter, root,
                    POISON, N_POISON_PILL_PER_PRODUCER)).start();
        }
        new Thread(new FileCrawlerProducer(queue, filter, root, POISON,
                N_POISON_PILL_PER_PRODUCER + N_POISON_PILL_REMAIN)).start();

        for (int i = 0; i < N_CONSUMERS; i++) {
            new Thread(new IndexerConsumer(queue, POISON)).start();
        }

    }
}

Output


//...
[FileCrawlerProducer] - Found...C:\users\Public\Videos\desktop.ini
Thread-2 [IndexerConsumer] - Indexing...C:\users\Public\Videos\desktop.ini
Thread-0 - FileCrawlerProducer is done, try poison all the consumers!
Thread-0 - puts poison pill!
Thread-0 - puts poison pill!
Thread-1 die
Thread-2 die

Process finished with exit code 0

Download Source Code

References

  1. Producer–consumer problem
  2. BlockingQueue JavaDoc

About Author

author image
Founder of Mkyong.com, love Java and open source stuff. Follow him on Twitter. If you like my tutorials, consider make a donation to these charities.

Comments

Subscribe
Notify of
1 Comment
Most Voted
Newest Oldest
Inline Feedbacks
View all comments
Dimos
4 years ago

Hello and thank you for the great tutorials

One question: is the “.start()” correct since the IndexerConsumer implements “run()” function. The same applies for every example.

for (int i = 0; i < N_CONSUMERS; i++) {
new Thread(new IndexerConsumer(queue, POISON)).start();
}