본문 바로가기
자바

[모던 자바 인 액션] 병렬 데이터 처리와 성능

by __Minnie_ 2025. 4. 25.
병렬 스트림

 

병렬스트림이란 각가의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 병렬 스트림은 간단하게 parallelStream을 통해서 사용이 가능하다. 

 

첫번째 예제는 순차 스트림이고, 두번째 예제는 병렬 스트림 예제이다. 

Stream.iterate(1L, (i) -> {
    return i + 1L;
}).limit(n).reduce(Long::sum).get();


Stream.iterate(1L, (i) -> {
    return i + 1L;
}).limit(n).parallel().reduce(Long::sum).get();

 

 

성능 측정

 

위 두개의 코드를 하니스를 통해서 성능을 측정하면 어떻게 될까? 책에서 제공되는 값 기준으로 첫번째 코드는 121.843, 두번째 코드는 604.059이다. 스트림을 사용하지 않고 단순 반복문을 사용한 경우는 3.278이다.

 

우리의 예상대로라면 두번째 예제가 가장 빨라야 하는데, 이게 압도적으로 느렸고, 스트림을 사용하지 않고 단순 반복문을 사용한 경우가 더 빨랐다. 여기에는 두가지 원인이 있다. 첫번째는 박싱된 객체가 생성되기 때문에 계산을 위해서 언박싱된 객체를 만들어야 한다. 두번째는 반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기 어렵다. iterator는 청크로 분할하기가 어렵다. 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않기 때문이다. 

 

그렇다면 이 문제를 해결하기 위해서는 더 특화된 메서드를 사용해야 한다. LongStream.rangeClosed는 리듀싱 계산 시점에 범위가 정해져 있어서 서브파트로 분할이 쉽고, long 기본 타입이라서 언박싱 과정이 필요없다. 첫번째 예제는 순차형 스트림, 두번째 예제는 병렬 스트림이다. 

LongStream.rangeClosed(1, N).reduce(0L, Long::sum);

LongStream.rangeClosed(1, N).parallel().reduce(0L, Long::sum);

이 경우, 성능을 측정하면 순차형 스트림의 경우 5.315, 병렬 스트림은 2.677이 된다.

 

이를 통해서 알 수 있는 점은 단순히 병렬 스트림을 사용한다는 것이 중요한 것이 아니라 특화된 메서드를 잘 사용하여 박싱, 언박싱 과정을 제거하는 것이 성능에 더 큰 영향을 미치며, 성능은 상황에 따라 다르기 때문에 꼭 측정을 통해서 확인을 해야 한다는 것이다. 

 

병렬 스트림 올바르게/효과적으로 사용하기

 

1. 공유된 상태 사용하지 않기

병렬 스트림에서 공유된 누적자를 바꾸도록 구현하게 되면 값이 일정하게 계산이 되지 않을 가능성이 높고, 최악의 경우에는 레이스 컨디션 문제가 발생할 수 있다. 이를 해결하기 위해서는 synchronized 같은 것을 이용해야 할 텐데 그렇게 되면 병렬화의 의미가 사라지게 된다.

 

2. 측정

병렬 스트림이 무조건 순차스트림보다 빠른 것은 아니기 때문에 확실하지 않은 경우에는 측정을 통해서 확인하자

 

3. 박싱/언박싱

박싱과 언박싱은 성능에 큰 영향을 미치기 때문에 기본형 특화 스트림을 잘사용하자

 

4. 병렬스트림에서 성능이 떨어지는 연산

findFirst, limit처럼 요소의 순서에 의존하는 연산은 병렬 연산시 성능이 굉장히 떨어진다. 따라서, 병렬 연산시에는 findAny 같이 순서가 상관없는 메서드를 사용하는 것이 좋다. 

 

5. 전체 파이프라인 연산 비용 고려

처리해야 할 요소의 수가 N이고, 하나의 요소를 처리하는데 드는 비용이 Q라면 전체 비용은 N*Q이다. Q가 높다는 것은 병렬로 처리했을 때, 더 효과적일 가능성이 더 높다는 것이다. 

 

6. 대량의 데이터

병렬로 처리를 하게 되면 스트림을 나누고 다시 합치는 과정이 수반되기 때문에 소량에 데이터에서는 이런 과정들이 오히려 시간을 더 잡아먹을수도 있다. 따라서 너무 소량의 데이터에서는 병렬을 사용하지 않는 것이 오히려 이득일 수 있다. 

 

7. 병합에 드는 비용

최종 연산의 병합에 드는 비용이 너무 크다면 병렬 처리로 인해서 얻는 이득보다 손해가 더 클수도 있다. 

 

8. 스트림의 특징을 잘 사용하라

SIZED 스트림은 병렬 처리시 분할에 유리하지만 iterator처럼 사이즈가 정해지지 않은 스트림은 효과적으로 처리하기 힘들다

 

 

포크/조인 프레임워크

 

병렬 스트림은 내부적으로 포크/조인 프레임워크를 사용한다. 포크/조인 프레임워크가 내부적으로 어떻게 사용되는지 확인함으로 인해서 병렬 스트림에 대해서 더 이해할 수 있다. 

 

아래는 RecursiveTask클래스를 상속받아 생성한 ForkJoinSumCalculator클래스이다. RecursiveTask클래스의 핵심은 compute메서드이다. 해당 메서드는 태스크가 분할 가능하다면 분할하고, 분할할 수 없으면 태스크 계산을 수행한다.

 

아래 예제에서는 요소의 수가 만개이하인 경우에는 분할을 중지하고 계산을 하고, 그 보다 많을 때는 left, right로 두개의 서브 파트로 나누어서 다른 스레드에서 실행시키는 것을 볼 수 있다. 

public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    public static final long THRESHOLD = 10000L;
    private final long[] numbers;
    private final int start;
    private final int end;

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    protected Long compute() {
        int length = this.end - this.start;
        if ((long)length <= 10000L) {
            return this.computeSequentially();
        } else {
            ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(this.numbers, this.start, this.start + length / 2);
            leftTask.fork();
            ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(this.numbers, this.start + length / 2, this.end);
            Long rightResult = rightTask.compute();
            Long leftResult = (Long)leftTask.join();
            return leftResult + rightResult;
        }
    }

    private long computeSequentially() {
        long sum = 0L;

        for(int i = this.start; i < this.end; ++i) {
            sum += this.numbers[i];
        }

        return sum;
    }

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1L, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        return (Long)ParallelStreamsHarness.FORK_JOIN_POOL.invoke(task);
    }
}

위 코드에서 보면 leftTask는 fork하는 것을 볼 수 있고, 두번째 task는 compute하는 것을 볼 수 있는데, 굳이 두개의 태스크 모두 새로운 스레드에서 실행할 필요없이 하나는 새로운 스레드에서 비동기로 실행시키고 하나는 기존 스레드에서 계산하도록 한 것이다. 

 

 

포크/조인 프레임워크 효과적으로 사용하기

 

1. join

join을 사용하게 되면 태스크가 생산하는 결과값이 반환될 때까지 대기하게 된다. 따라서 letf, right의 태스크가 모두 시작된 후에 join을 호출해야지, left fork -> left join -> right compute 하게 되면 left가 완료될 때까지 right는 실행되지 못해서 오히려 더 비효율적인 코드가 된다. 

 

2. fork

fork를 통해서 새로운 스레드에서 비동기로 작업이 실행되도록 하여 시간을 절약할 수 있다. 그러나 left, right 모두 fork하게 되면 총 3개의 스레드를 사용하게 되는 것이니 하나의 스레드가 낭비되게 된다. 그러므로 하나는 fork하고 하나는 compute하여 2개의 스레드를 통해서 처리할 수 있도록 하는 것이 경제적이다. 

 

3. 성능 측정

앞에서 설명했던 것처럼 병렬 처리가 순차 처리보다 항상 빠른 것은 아니기 때문에 확실하지 않을 때는 측정을 통해서 판단해야 한다. 

 

 

포크/조인 프레임워크 작업 훔치기

 

기본적으로 병렬 처리시 코어의 개수와는 상관없이 적절한 크기로 나누어진 많은 태스크를 스레드에 포킹하는 것이 바람직한데, 이론적으로는 병렬화된 태스크의 작업부하를 분산하면 각 코어에서 동일하게 작업을 끝내야 한다. 그러나, 현실에서는 다른 서비스와의 자원 공유, 디스크 접근 속도 저하 등의 이유로 속도가 저하될 수 있다. 이런 경우를 우해서 포크/조인 프레임워크에서는 작업 훔치기라는 것을 통해서 문제를 해결한다.

 

각 스레드는 본인이 현재 작업하고 있는 태스크를 포함한 이중 연결 리스트를 가지고 있고, 작업하는 태스크가 끝나면 헤드에서 다음 태스크를 가지고 와서 실행한다. 이때 이미 작업을 마친 다른 스레드는 이중 연결 리스트의 꼬리에서 작업을 훔쳐서 실행한다. 이 과정을 태스크가 모두 끝날 때까지 실행한다. 이를 통해서 스레드간의 작업 부하를 비슷하게 유지한다. 

 

 

Spliterator 인터페이스

 

Spliterator는 분할할 수 있는 반복자라는 의미로, 병렬 작업에 특화되어 있다. 이를 이용하면 병렬 스트림에서 서브 파트로 분할시 어떻게 분할할 것인지 커스텀할 수 있다. 기본적으로 모든 컬렉션에 대해서 디폴트 spliterator가 선언되어 있기 때문에 따로 구현할 필요는 없다.

 

splititerator의 인터페이스는 아래와 같다. 

public interface Spliterator<T> {
	boolean tryAdvance(Consumer<? super T> action);
    Spliterator<T> trySplit();
    long estimateSize();
    int characterisics();
}

 

tryAdvance는 Spliterator의 요소를 하나씩 소비하면서 탐색해야 할 요소가 남아 있으면 참을 반환한다.

 

trySplit은 spliterator를 분할해서 두번째 spliterator를 생성한다. 분할할 수 없는 경우에는 null을 반환하게 되는데, spliterator의 경우 재귀적으로 호출하여 null이 반환되면 재귀 분할 과정이 종료된다. 

 

estimateSize를 통해서 탐색해야 할 요소의 수를 확인한다. 

 

characteristic은 자체특성을 반환하는 메서드이다. ordered은 정해진 순서가 있는 경우이고, distinct는 중복이 없는 경우, sorted는 정렬된 경우, sized는 크기가 정해진 경우, non-null은 모든 요소가 null이 아닌 경우 immutable은 값이 불변인 경우, concurrent동기화 없이 여러 스레드에서 동시에 고칠 수 있는 경우, subsized 분할되는 모든 요소가 sized 특성을 갖는 경우이다. 

 

만약 우리가 아래와 같은 문자열에서 단어의 개수를 센다고 해보자. 우리는 아마 공백에서 공백이 아닌 캐릭터로 변화되는 경우 단어를 세게 될 것이다. 이를 만약 병렬로 수행하게 되면 각각의 스레드에서 생성한 단어수를 더하게 될 것이다. 그렇다면 아래 문자열을 어떻게 나눠야 할까?

Nel   mezzo del cammin  di nostra  vita mi  ritrovai in una  selva oscura che la  dritta via era   smarrita

 

만약 아래처럼 단어의 중간에서 나눠버리게 되면 단어의 개수가 +1이 될 것이다. 따라서 우리는 서브파트로 나눌 때 공백을 기준으로 나누도록 해야 결과가 정상적으로 도출이 될 것이다. 

Nel   mezzo del cammin  di nostra  vita mi  ritrovai in una  selva osc
ura che la  dritta via era   smarrita

 

 

그런 경우 우리는 아래처럼 코드를 작성할 수 있다. 사이즈가 10 미만인 경우에는 분할하지 않고, 이외의 경우에는 분할을 하되 분할할 지점이 공백이 아니라면 한칸씩 미뤄가면서 공백을 찾아서 분할한다. 

private static class WordCounterSpliterator implements Spliterator<Character> {
        private final String string;
        private int currentChar;

        private WordCounterSpliterator(String string) {
            this.currentChar = 0;
            this.string = string;
        }

        public boolean tryAdvance(Consumer<? super Character> action) {
            action.accept(this.string.charAt(this.currentChar++));
            return this.currentChar < this.string.length();
        }

        public Spliterator<Character> trySplit() {
            int currentSize = this.string.length() - this.currentChar;
            if (currentSize < 10) {
                return null;
            } else {
                for(int splitPos = currentSize / 2 + this.currentChar; splitPos < this.string.length(); ++splitPos) {
                    if (Character.isWhitespace(this.string.charAt(splitPos))) {
                        Spliterator<Character> spliterator = new WordCounterSpliterator(this.string.substring(this.currentChar, splitPos));
                        this.currentChar = splitPos;
                        return spliterator;
                    }
                }

                return null;
            }
        }

        public long estimateSize() {
            return (long)(this.string.length() - this.currentChar);
        }

        public int characteristics() {
            return 17744;
        }
    }

 

이렇게 생성된 spliterator는 아래처럼 사용될 수 있다.

public static int countWords(String s) {
    Spliterator<Character> spliterator = new WordCounterSpliterator(s);
    Stream<Character> stream = StreamSupport.stream(spliterator, true);
    return countWords(stream);
}