////
Search
🌊

4장 스트림 : 3부 포크/조인

외부 반복자를 이용할때 우리는 병렬 실행을 위해서 많은 고민과 시간, 노력을 기울였다. 하지만 자바 7의 등장으로 자바의 병렬 실행은 더 이상 어려운 것이 아니게 되었다. 이장에서는 병렬 실행이 얼마나 쉽게 이룰 수 있는지 알아볼 것이다.
1.
병렬 스트림으로 데이터를 병렬 처리하기
2.
병렬 스트림의 성능
3.
포크/조인 프레임워크
4.
Spliterator로 스트림 데이터 쪼개기 (자동 분할 방법)

병렬 스트림

스트림 인터페이스를 이용하면 아주 간단히 요소를 병렬로 처리할 수 있다. 간단히 parallelStream의 호출로 병렬 스트림이 생성된다. 순차 처리 코드와 병렬 처리 코드의 차이점은 아주 간단히 청크(덩어리)로 분할되어 연산이 실행 된다는 점이다.
... 병렬 처리 코드 public class Main { public static void main(String[] args) { final long l =parallelSum(10L); System.out.println(l); final long l1 = iterativeSum(10L); System.out.println(l1); } public static long parallelSum(long n){ return Stream.iterate(1L, i -> i +1L) .limit(n) .parallel() // 병렬 처리 .peek(s -> System.out.println(String.format("current Thread : %s | value : %d ", Thread.currentThread().getName(), s))) .reduce(0L,Long::sum); } /* 병렬 과정 current Thread : ForkJoinPool.commonPool-worker-13 | value : 10 current Thread : ForkJoinPool.commonPool-worker-5 | value : 3 current Thread : ForkJoinPool.commonPool-worker-7 | value : 7 current Thread : ForkJoinPool.commonPool-worker-7 | value : 1 current Thread : ForkJoinPool.commonPool-worker-11 | value : 2 current Thread : ForkJoinPool.commonPool-worker-9 | value : 8 current Thread : ForkJoinPool.commonPool-worker-15 | value : 6 current Thread : ForkJoinPool.commonPool-worker-3 | value : 9 current Thread : ForkJoinPool.commonPool-worker-5 | value : 4 current Thread : ForkJoinPool.commonPool-worker-13 | value : 5 55 */ ... 순차 처리 코드 public static long iterativeSum(long n){ long result = 0L; for (long i = 1; i <= n; i++) { result += i; System.out.println(String.format("current Thread : %s | value : %d ", Thread.currentThread().getName(), i)); } return result; } /* 순차 과정 current Thread : main | value : 1 current Thread : main | value : 2 current Thread : main | value : 3 current Thread : main | value : 4 current Thread : main | value : 5 current Thread : main | value : 6 current Thread : main | value : 7 current Thread : main | value : 8 current Thread : main | value : 9 current Thread : main | value : 10 55 */ } 병렬 과정에서도 알 수 있듯이 parallelStream의 호출로 서로다른 스레드에서 작업이 분산되어 처리된다. 이말인즉, 어떻게 분할되는지는 모르지만 청크(덩어리)로 나누어져 서로 다른 스레드에서 연산이 이루어진다는건 확실히 알 수 있다. 여기서 신기하고 유추할 수 있는점 우리는 Thread.currentThread().getName() 을 통하여 스레드의 이름을 알아보았다. 그 이유는 과연 병렬로 처리될까? 하는 의문때문이였다. 근데 실제로 다른 스레드에서 동작하는 것은 알았다. 여기서 더 중요하게 생각할 수 있는점은 parallelStream()ForkJoinPool 이라는 스레드에서 동작한다는 점이다. 유추해보자면 parallel은 ForkJoin을 기반으로 만들어진것같다는 점이다.
Java
복사
분할은 어떤 식으로 이루어지는 걸까? (뇌피셜)
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","2"); // 이것으로 setting 하고 다시 실행 시켜봣다 result .. 원본 current Thread : ForkJoinPool.commonPool-worker-1 | value : 3 current Thread : ForkJoinPool.commonPool-worker-3 | value : 7 current Thread : ForkJoinPool.commonPool-worker-1 | value : 5 current Thread : ForkJoinPool.commonPool-worker-3 | value : 6 current Thread : ForkJoinPool.commonPool-worker-1 | value : 4 current Thread : ForkJoinPool.commonPool-worker-3 | value : 9 current Thread : ForkJoinPool.commonPool-worker-1 | value : 2 current Thread : ForkJoinPool.commonPool-worker-3 | value : 10 current Thread : ForkJoinPool.commonPool-worker-1 | value : 1 current Thread : ForkJoinPool.commonPool-worker-3 | value : 8 55 .. 정렬 current Thread : ForkJoinPool.commonPool-worker-1 | value : 1 current Thread : ForkJoinPool.commonPool-worker-1 | value : 2 current Thread : ForkJoinPool.commonPool-worker-1 | value : 3 current Thread : ForkJoinPool.commonPool-worker-1 | value : 4 current Thread : ForkJoinPool.commonPool-worker-1 | value : 5 current Thread : ForkJoinPool.commonPool-worker-3 | value : 6 current Thread : ForkJoinPool.commonPool-worker-3 | value : 7 current Thread : ForkJoinPool.commonPool-worker-3 | value : 8 current Thread : ForkJoinPool.commonPool-worker-3 | value : 9 current Thread : ForkJoinPool.commonPool-worker-3 | value : 10 55 Thread ForkJoinPool.commonPool-worker-1 [1,2,3,4,5] Thread ForkJoinPool.commonPool-worker-3 [6,7,8,9,10] 앞에서부터 2분할 되는 것 같다. 4번 정도 테스트 해본 결과 동일하게 나왓다. 트리같은 구조인것같다.
Java
복사
병렬 스트림은 언제 사용하지 말아야할까?
숫자를 더하려면 언박싱을 해야한다. 결국 박싱된 객체가 만들어지는 작업이라면 부적합
반복작업과 같은 독립단위로 나누기 어려운 작업은 병렬 스트림에 부적합
완성되지 못한 컬렉션을 이용해야하는 경우 부적합
병렬 스트림을 잘 사용하려면 어떻게 해야할까?
박싱과 언박싱 오버헤드가 발생하지 않도록 한다. 예를 들어 LongStream과 같은 기본 특화형 스트림 사용
코어간 데이터 전송 시간보다 훨씬 오래걸리는 작업만 병렬로 수행하는 것이 바람직하다.

병렬 스트림의 올바른 사용법

직접 측정하라, 확신이 서지 않으면 직접측정을 통하여 실제로 더 빠르고 효율적인 것을 찾아라
박싱 주의, 자바는 오토박싱이 있다. 이는 병렬 스트림을 사용할때 매우 큰 성능 저하를 가져온다. 특화된 스트림을 사용하자
순서의 상관여부를 주의하라, limit, findFirst와 같은 순서와 관계있는 연산은 병렬 스트림에서는 사용을 주의해야한다. 왜냐하면 어떤녀석이 나올지 어떤게 늦게 수행될지 그건 아무도 모르기 때문이다.
요소의 개수가 아닌 연산의 비용을 고려하라, 개수가 많다고 병렬 스트림을 적용하는 것이 아니다. 요소의 연산 비용이 높아질때 비로서 병렬 스트림이 활약할 수 있음을 의미한다.
적절한 자료구조를 찾아라, ArrayList는 LinkedList보다 병렬 스트림에서 효율이 높다. 왜냐하면 더욱 분할하기 쉽기 때문이다. Linked의 경우 메모리를 다 찾아야 비로서 사이즈를 알 수 있다. 하지만 Array의 경우는 size를 쉽게 알 수 있어 분할에 큰 장점이 있다.
정확한 크기가 병렬성을 극대화한다, 자료구조와 비슷한 맥락으로 사이즈를 정확히 안다면 분해하기 쉽다.
병합비용을 고려하라, 병합비용이 복잡하다면 병렬 스트림을 이용한 이득은 줄어들 것이다. (예를 들어 병합을 위해 객체를 생성하고 캐스팅이 필요하다면?) 기본형에 맞는 작업이 우선적으로 병렬 스트림을 적용하는 곳이 될 것이다.

포크/조인 프레임 워크

실제로 병렬 스트림은 포크/조인 프레임워크로 처리가 된다. 포크 조인 프레임 워크는 작업을 재귀적으로 분할한 다음 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되어 있다. 그래서 포크조인 프레임워크는 서브태스크를 스레드 풀의 작업자 스레드에 분산하는 ExecutorService인터페이스 를 구현해야한다.
// 스레드 풀을 이용하려면 RecursiveTask를 구현해야한다. public class Task extends RecursiveTask<V> { @Override protected V compute() { return null; } }
Java
복사
// 구현 public class Task extends RecursiveTask<Long> { private final long[] numbers; private final int start; private final int end; private final long THRESHOLD = 20; public Task(long[] numbers){ this(numbers, 0, numbers.length); } private Task(long[] numbers, int start, int end){ System.out.println(String.format("새로운 태스크 생성 [%d ~ %d] | 현재 스레드 %s", start, end, Thread.currentThread().getName())); this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int len = end - start; if (len <= THRESHOLD) { System.out.println(len); return computeSequentially(); } Task left = new Task(numbers, start, start + len/2); left.fork(); // 다른 스레드로 할당 Task right = new Task(numbers,start + len/2, end); Long rightResult = right.compute(); //현재스레드 실행-추가분할가능성있음 Long leftResult = left.join(); return leftResult + rightResult; } private long computeSequentially(){ long sum = 0; for (int i = start; i < end; i++){ sum += numbers[i]; } return sum; } } ... public class Main { public static void main(String[] args) { final long l = forkJoinSum(100); System.out.println(l); } public static long forkJoinSum(long n){ long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new Task(numbers); return new ForkJoinPool().invoke(task); } }
Java
복사
정리
일반적으로 애플리케이션에서 둘 이상의 ForkJoinPool을 사용하지 않는다.
ForkJoinPool을 한번만 인스턴스화해서 정적필드에 싱글턴으로 저장 사용
RunTime.availableProcessors의 반환값으로 풀에 사용할 스레드 수 결정
포크/조인 프레임워크는 분할 정복 알고리즘의 병렬화 버전이다.
포크/조인 제대로 사용하기
Join 호출 주의, join() 을 호출하면 태스크가 생산하는 결과가 준비될 때 즉, 반환이 존재할때까지 호출자를 정지시킨다. 극단적으로 아래의 예시
코드 보기
RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하면 안된다. 대신 compute나 fork메소드를 직접 호출할 수 있다. 순차 코드에서 병렬 계산을 시작할때만 invoke를 사용한다. 더 자세한 내용 https://www.codejava.net/java-core/concurrency/understanding-java-fork-join-framework-with-examples
서브 태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다. fork는 분할 스레드 실행, compute는 현 스레드 동기 실행
포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다. 다른 스레드에서 compute를 호출하므로 스택트레이스가 도움이 안된다.
병렬처리로 성능개선을 하려면 독립적인 서브태스크로 분할할 수 있어야한다. 또한 각 서브태스크의 실행시간은 새로운 태스크를 포킹하는 데 드는 시간보다 길어야한다. 즉, 분할 시간보다 실행하는데 시간이 길어야한다는 말이다. 무슨말인가할 수 있다. 쉽게 컬렉션이 완성되어 있어야한다는 것이다. 아니면 분할하기 어려워 성능이 저하된다.

작업훔치기

코어는 4개 인데 1000만개의 작업을 1000개 단위로 포크한다면? 몇개의 스레드로 분할되어야할까? 10,000,000 / 1,000 = 10,000개의 스레드로 분할되어 작업이 돌아간다. 그럼 성능상으로 이점이 전혀 없다. 또한 자원 낭비 같아보인다. 그리고 동일하게 10,000개로 나누어져 각 스레드로 할당되어 실행된 작업은 동일 시간에 시작, 종료 되는 것처럼 보인다. 하지만 전혀 아니다. 그래서 포크/조인에서는 먼저 작업이 끝난 스레드가 다른 스레드의 작업을 훔쳐오는 "작업 훔치기"가 존재한다.
작업훔치기는 할당된 일을 마친 스레드가 다른 스레드의 작업을 훔쳐온다. 즉, 큐에 할당된 task들에 제일 마지막 꼬리부분을 훔쳐와 자기 자신이 실행시키는 것이다. 이를 계속 반복하며 쉬는 상태가 아닌 늘 자원을 가지고 일을 하는 상태를 task가 사라질때까지 반복한다.
중간 점검
이번 장은 상당히 난해하다. 그래서 내 생각과 함께 뭔가 다시 한번 돌아보는 시간을 가지고 더 나아가보려고 한다.
병렬 스트림의 구조는 fork/join 을 사용한다.
fork/join 을 실제로 구현할 수 있다. (Recursive<V> 의 구현을 통하여)
fork/join은 작업을 재귀적으로 분할하여 각각의 스레드로 할당/실행한다.
포크가 많이 일어난다면 스레드가 많아져야할 것 같지만 실제로는 작업훔치기를 통하여 쉬는 스레드 없이 다같이 일한다.
이번 장에서는 본질적으로 얻고자 하는건 병렬 스트림의 동작 방식이다. 잊지말자.

spliterator 인터페이스

갑자기 이게 뭐지라고 생각할 수 있다. 근데 우리는 위의 예제에서 THRESHOLD 라는 변수를 만들어 수량?적으로 분할하였다. 하지만 우리가 병렬 스트림을 사용할땐 이러한 수량을 정해주지 않고도 알아서 잘 분할해주었다. 그런 역할을 하는 것이 spliterator 인터페이스이다.
spliterator은 "분할 할 수 있는 반복자"라는 의미를 가진다. Iterator과 유사하지만 spliterator은 자르고 반복할 수 있다. 즉, 병렬작업에 특화되어있다. (책에서도 적혀 있고 나도 그렇게 생각한다. 이걸 직접 구현할 일은 없을 것 같다. 하지만 알아두면 내 코드가 좋아지는건 분명하다.)
public interface Spliterator<T>{ // T 탐색요소 boolean tryAdvance(Consumer<? super T> action); // Iterator의 hasNext()와 비슷한 동작 Spliterator trySplit(); // 자신을 분할하여 두번째 Spliterator을 만든다. (핵심 같아보임..) long estimateSize(); // 탐색해야할 요소 수 제공(핵심 같아보임..) int characteristics(); }
Java
복사
Spliterator 특성
ORDERED : 순서가 존재한다. DISTINCT : X, Y 의 두 요소를 탐색했을때 X.equals(Y)는 항상 false를 반환한다. SORTED : 미리 정렬된 순서를 따른다. SIZED : 크기가 알려진 소스 NON-NULL : 탐색요소는 NULL 이 아니다. IMMUTABLE : spliterator의 소스는 불변이다. 요소 탐색 중 추가, 삭제, 수정 불가 CONCURRENT : 동기화 없이 여러 스레드에서 고칠 수 있다. SUBSIZED : spliterator 에서 분할된 spliterator 도 SIZED 특성을 지닌다.
커스텀 Spliterator 구현
반복형으로 단어 수 계산 코드 보기
함수형으로 단어 수 계산 코드 보기
Spliterator을 이용한 병렬 스트림

결론

내부 반복을 이용하면 명시적으로 스레드를 사용하지 않고도 병렬처리를 할 수 있다. (예를들어 synchronized와 같이 동기화작업, 왜냐하면 공간적으로 흩어져 있는 것이 아닌 시간상으로 흩어져있는 자료를 가져와 사용하는것이기 때문이고 스트림은 단 한번만 소비하는 데이터 형태이기 때문에)
병렬이 늘 빠른것이 아니다.
요소처리의 비용이 높을때 병렬처리는 높은 효율을 가진다.
자료구조가 병렬로 처리하는 것보다 성능에 큰영향을 미칠 수 있다.
포크조인을 이용하여 병렬화를 이룰 수 있다.
Spliterator을 이용하여 스트림을 어떻게 병렬화할 것인지 정의할 수 있다.