Java 8 Parallel Streams Examples
Few Java 8 examples to execute streams in parallel.
1. BaseStream.parallel()
A simple parallel example to print 1 to 10.
package com.mkyong.java8;
import java.util.stream.IntStream;
public class ParallelExample1 {
public static void main(String[] args) {
System.out.println("Normal...");
IntStream range = IntStream.rangeClosed(1, 10);
range.forEach(System.out::println);
System.out.println("Parallel...");
IntStream range2 = IntStream.rangeClosed(1, 10);
range2.parallel().forEach(System.out::println);
}
}
Output
Normal...
1
2
3
4
5
6
7
8
9
10
Parallel...
7
6
8
9
10
1
4
5
3
2
2. Collection.parallelStream()
Another simple parallel example to print a
to z
. For collection, we can use parallelStream()
.
package com.mkyong.java8;
import java.util.ArrayList;
import java.util.List;
public class ParallelExample2 {
public static void main(String[] args) {
System.out.println("Normal...");
List<String> alpha = getData();
alpha.stream().forEach(System.out::println);
System.out.println("Parallel...");
List<String> alpha2 = getData();
alpha2.parallelStream().forEach(System.out::println);
}
private static List<String> getData() {
List<String> alpha = new ArrayList<>();
int n = 97; // 97 = a , 122 = z
while (n <= 122) {
char c = (char) n;
alpha.add(String.valueOf(c));
n++;
}
return alpha;
}
}
Output
Normal...
a
b
c
d
e
f
g
h
i
j
k
l
m
n
o
p
q
r
s
t
u
v
w
x
y
z
Parallel...
q
s
r
o
x
h
l
p
d
i
g
t
u
n
z
v
j
k
w
f
m
c
a
e
b
y
3. Is Stream running in parallel mode?
3.1 We can test it with isParallel()
package com.mkyong.java8;
import java.util.stream.IntStream;
public class ParallelExample3a {
public static void main(String[] args) {
System.out.println("Normal...");
IntStream range = IntStream.rangeClosed(1, 10);
System.out.println(range.isParallel()); // false
range.forEach(System.out::println);
System.out.println("Parallel...");
IntStream range2 = IntStream.rangeClosed(1, 10);
IntStream range2Parallel = range2.parallel();
System.out.println(range2Parallel.isParallel()); // true
range2Parallel.forEach(System.out::println);
}
}
3.2 Or print the current thread name like this:
package com.mkyong.java8;
import java.util.stream.IntStream;
public class ParallelExample3b {
public static void main(String[] args) {
System.out.println("Normal...");
IntStream range = IntStream.rangeClosed(1, 10);
range.forEach(x -> {
System.out.println("Thread : " + Thread.currentThread().getName() + ", value: " + x);
});
System.out.println("Parallel...");
IntStream range2 = IntStream.rangeClosed(1, 10);
range2.parallel().forEach(x -> {
System.out.println("Thread : " + Thread.currentThread().getName() + ", value: " + x);
});
}
}
Output
Normal...
Thread : main, value: 1
Thread : main, value: 2
Thread : main, value: 3
Thread : main, value: 4
Thread : main, value: 5
Thread : main, value: 6
Thread : main, value: 7
Thread : main, value: 8
Thread : main, value: 9
Thread : main, value: 10
Parallel...
Thread : main, value: 7
Thread : main, value: 6
Thread : ForkJoinPool.commonPool-worker-5, value: 3
Thread : ForkJoinPool.commonPool-worker-7, value: 8
Thread : ForkJoinPool.commonPool-worker-5, value: 5
Thread : ForkJoinPool.commonPool-worker-5, value: 4
Thread : ForkJoinPool.commonPool-worker-3, value: 9
Thread : ForkJoinPool.commonPool-worker-5, value: 1
Thread : ForkJoinPool.commonPool-worker-7, value: 2
Thread : ForkJoinPool.commonPool-worker-9, value: 10
P.S By default, parallel streams use `ForkJoinPool`
4. Calculation
4.1 Java 8 streams to print all prime numbers up to 1 million:
package com.mkyong.java8;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class ParallelExample4 {
public static void main(String[] args) {
long count = Stream.iterate(0, n -> n + 1)
.limit(1_000_000)
//.parallel() with this 23s, without this 1m 10s
.filter(ParallelExample4::isPrime)
.peek(x -> System.out.format("%s\t", x))
.count();
System.out.println("\nTotal: " + count);
}
public static boolean isPrime(int number) {
if (number <= 1) return false;
return !IntStream.rangeClosed(2, number / 2).anyMatch(i -> number % i == 0);
}
}
Result:
- For normal streams, it takes 1 minute 10 seconds.
- For parallel streams, it takes 23 seconds.
P.S Tested with i7-7700, 16G RAM, WIndows 10
4.2 Yet another parallel stream example to find out the average age of a list of employees.
List<Employee> employees = obj.generateEmployee(10000);
double age = employees
.parallelStream()
.mapToInt(Employee::getAge)
.average()
.getAsDouble();
System.out.println("Average age: " + age);
5. Case Study
5.1 Parallel streams to increase the performance of a time-consuming save file tasks.
This Java code will generate 10,000 random employees and save into 10,000 files, each employee save into a file.
- For normal stream, it takes 27-29 seconds.
- For parallel stream, it takes 7-8 seconds.
P.S Tested with i7-7700, 16G RAM, WIndows 10
package com.mkyong.java8;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ParallelExample5 {
private static final String DIR = System.getProperty("user.dir") + "/test/";
public static void main(String[] args) throws IOException {
Files.createDirectories(Paths.get(DIR));
ParallelExample5 obj = new ParallelExample5();
List<Employee> employees = obj.generateEmployee(10000);
// normal, sequential
//employees.stream().forEach(ParallelExample5::save); // 27s-29s
// parallel
employees.parallelStream().forEach(ParallelExample5::save); // 7s-8s
}
private static void save(Employee input) {
try (FileOutputStream fos = new FileOutputStream(new File(DIR + input.getName() + ".txt"));
ObjectOutputStream obs = new ObjectOutputStream(fos)) {
obs.writeObject(input);
} catch (IOException e) {
e.printStackTrace();
}
}
private List<Employee> generateEmployee(int num) {
return Stream.iterate(0, n -> n + 1)
.limit(num)
.map(x -> {
return new Employee(
generateRandomName(4),
generateRandomAge(15, 100),
generateRandomSalary(900.00, 200_000.00)
);
})
.collect(Collectors.toList());
}
private String generateRandomName(int length) {
return new Random()
.ints(5, 97, 122) // 97 = a , 122 = z
.mapToObj(x -> String.valueOf((char) x))
.collect(Collectors.joining());
}
private int generateRandomAge(int min, int max) {
return new Random()
.ints(1, min, max)
.findFirst()
.getAsInt();
}
private BigDecimal generateRandomSalary(double min, double max) {
return new BigDecimal(new Random()
.doubles(1, min, max)
.findFirst()
.getAsDouble()).setScale(2, RoundingMode.HALF_UP);
}
}
package com.mkyong.java8;
import java.io.Serializable;
import java.math.BigDecimal;
public class Employee implements Serializable {
private static final long serialVersionUID = 1L;
private String name;
private int age;
private BigDecimal salary;
//getters, setters n etc...
}
Shall we use parallel stream in production?
If you wish good performance for sure… Depending on the use case.
In the output of example 3.2, why we have difference between 7/6 and then others values ?
It seems like firsts tasks are process by the main thread while the others are delegated ?
Thread : main, value: 7
Thread : main, value: 6
Thread : ForkJoinPool.commonPool-worker-5, value: 3
Thread : ForkJoinPool.commonPool-worker-7, value: 8
I don’t understand that when use Parallel Streams and what is difference between Parallel Streams and sequential stream in java. Please help me explain. Thanks you so much.
Parallel uses all core of cpu.
long count = Stream.iterate(0, n -> n + 1)
.limit(1_000_000)
//.parallel() with this 23s, without this 1m 10s
.filter(TestPrime::isPrime)
.peek(x -> System.out.format(“%s\t”, x))
.count();
There is no class called TestPrime ,You may have to change to ParallelExample4::isPrime
Article is updated, thanks.