스트림과 컬렉션
Java는 연속된 데이터를 처리하기 위해 List, Set같은 컬렉션 API를 지원한다. 그러나 컬렉션을 사용할 때, 사람들은 여전히 for문과 if 조건문, 그리고 여러 가지 변수들을 반복적으로 사용하게 된다.
또 다시 이전 예제를 가져오면 아래와 같은 코드가 전형적인 예시다.
List<Menu> coffee = new ArrayList<>();
for (Menu m : menu) {
if (m.type == MenuType.COFFEE) {
coffee.add(m);
}
}
menu라는 컬렉션에 담겨있는 자료들을 탐색하면서 커피 종류의 메뉴만 보고싶다. 여기서 coffee라는 리스트를 만들기 위해 필요한 것은 menu라는 원래의 컬렉션과 m.type == MenuType.COFFEE라는 조건이면 충분하다. 그런데 위의 코드는 필요한 정보에 비해 너무 과대포장된 느낌이다. 진짜 하고싶은 말은 '메뉴 중에서 커피만 골라줘'였는데, '메뉴를 하나씩 탐색하면서 만약 그 메뉴가 커피면 새로운 리스트에 넣고, 그렇지 않으면 아무것도 하지마'라고 말하는 것과 같다. 이것은 '무엇을'이 아니라 '어떻게'에 좀 더 가깝기 때문이다.
선언형 프로그래밍
'무엇을'에 집중하는 것을 선언형 프로그래밍이라고 하고, '어떻게'를 서술하는 것을 명령형 프로그래밍이라고 한다. 그럼 과연 '어떻게'를 말하지 않고도 원하는 결과를 얻는 게 가능할까? 사실 대부분은 이미 사용하고 있었을 것이다. SQL을 보자.
SELECT * FROM MENU WHERE MENU.TYPE = 'COFFEE';
위에서 말했던 것처럼 MENU라는 테이블의 정보와 MENU.TYPE = 'COFFEE'라는 조건 뿐이다. 어떤 인덱스를 타는지, 실행계획은 어떤지 알 필요가 없다. DBMS에서 알아서 처리해주니까.
서론이 길었는데, 아무튼 Java에서도 연속된 데이터에 선언형 프로그래밍을 지원해주는 것이 바로 스트림 API다. 위의 예시를 스트림 API로 바꿔 보겠다.
List<Menu> coffee = menu.stream()
.filter(m -> m.type == MenuType.COFFEE)
.collect(Collectors.toList());
menu라는 컬렉션을 스트림으로 만들고 filter를 통해 원해는 데이터만 걸러낸 후 List로 만들었다. 코드가 훨씬 간결해지고, filter, collect라는 메소드 이름도 직관적이다.
고수준 빌딩 블록
물론 filter, collect 외에도 스트림에는 많은 연산들이 존재한다. 이러한 데이터 처리 연산들은 블록을 쌓는 것처럼 메소드 체이닝으로 조합해서 마음대로 사용할 수 있다. 예를 들면, 아래와 같이 연결할 수 있다.
뿐만 아니라, 이 각각의 연산들은 매우 유연하다. 해야할 일들을 우리가 주입해줘야 하기 때문이다. 예를 들어, 위의 예시에서 종류가 커피인 것 메뉴를 선택하기 위해 filter에 조건을 넣어주었다. 마찬가지로, sorted에도 정렬하기 위한 기준을 우리가 직접 넣어주는 대로 동작한다. 위의 그림을 코드로 표현해보자.
List<String> coffeeInfo = menu.stream()
.filter(m -> m.type == MenuType.COFFEE)
.sorted(Comparator.comparing(m -> m.name))
.map(m -> m.name + " : " + m.price)
.collect(Collectors.toList());
이렇게 필요한 코드를 간결하게 넣어줄 수 있는 것은 이전 글에서 알아봤던 람다 표현식과 함수형 인터페이스 덕분이다.
병렬처리
사실 앞선 내용들도 중요하지만, 이미 익숙한 방식에서 스트림 API로 넘어와야 하는 이유로는 다소 부족한 느낌이다. 코드의 생산성이 높아지긴 했지만, 완성된 프로그램은 개선된 점을 느끼기 힘들기 때문이다. 그러나 병렬처리에 있어서는 기존의 방식보다 향상된 성능을 기대할 수 있다. 여기서 '기대할 수 있다'는 표현은 항상 그렇다는 말은 아니라는 의미이다.
이전에도 Java는 직접 스레드를 만들고, 동기화를 하는 방식으로 병렬처리를 할 수 있었다. 그러나 실제 사용하기에는 신경써야할 것들이 많았고, 싱글코어에서 굳이 멀티 스레드를 이용할 일이 없었을 것이다. 그러다가 멀티코어의 등장으로 다중 스레드 사용이 필요해지면서 Java 5에서는 ExecutorService, Callable, Future등이 추가되어 이전보다 쉽게 병렬처리를 할 수 있었다. Java 7에서는 ForkJoin 프레임워크가 생겨나 에러를 최소화할 수 있도록 도와주었다. Java 8에서는 이러한 프레임워크를 직접 사용하지 않고도 병렬처리를 쉽게 할 수 있도록 스트림 API를 제공한다.
바로 위의 예시를 병렬로 처리하는 코드를 살펴보자.
List<String> coffeeInfo = menu.parallelStream()
.filter(m -> m.type == MenuType.COFFEE)
.sorted(Comparator.comparing(m -> m.name))
.map(m -> m.name + " : " + m.price)
.collect(Collectors.toList());
바뀐 부분은 stream() -> parallelStream()이 되었다는 것 뿐이다. 나머지는 스트림 API가 알아서 처리해준다. CPU 코어 수가 몇 개이고, 스레드를 몇 개 만들 것이며, menu의 요소들을 어떻게 분배할 것인지 말해주지 않아도 된다.
병렬처리 성능 최적화
앞서 스트림 API를 사용하면 항상 개선된 성능을 보여주지는 않는다고 했다. 자세히 살펴보면 사용하기에 꽤 까다롭다. 먼저, 책에 나오는 예시를 살펴보자.
private static final long N = 10000000L;
public long iterativeSum() {
long result = 0;
for (long i = 1L; i <= N; i++) {
result += i;
}
return result;
}
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1)
.limit(N)
.reduce(0L, Long::sum);
}
public long parallelSum() {
return Stream.iterate(1L, i -> i + 1)
.limit(N)
.parallel()
.reduce(0L, Long::sum);
}
위의 세 메소드는 모두 같은 결과를 반환한다. 그러나 iterativeSum은 sequentialSum보다 약 40배가 빠르고, sequentialSum는 parallelSum보다 약 5배가 빠르다.
먼저, sequentialSum의 문제는 Long 타입을 사용했다는 점이다. 천만번의 덧셈을 하면서 천만번의 언박싱이 일어났다. Stream.iterate의 결과로 Long이 만들어지고, 그것들을 더하기 위해서 언박싱이 일어나기 때문이다. 따라서 primitive type에 특화된 LongStream을 사용해야 한다.
public long sequentialSum() {
return LongStream.rangeClosed(1, N)
.reduce(0L, Long::sum);
}
이제 언박싱 문제가 해결되었고, 이전보다 약 25배 빨라졌다. 그러나 여전히 iterativeSum보다는 느리다.
다음으로, parallelSum은 iterate 메소드를 통해 스트림의 요소가 하나씩 만들어지기 때문에 분할하기 어렵다. 위에서 사용한 rangeClosed 메소드는 만들어야 할 스트림의 요소들을 이미 다 알고있다. 따라서 스트림을 만들 때 분할하기 쉬워진다. 이제 스트림을 병렬처리하려면 parallel만 추가해주면 된다.
public static long parallelSum() {
return LongStream.rangeClosed(1, N)
.parallel()
.reduce(0L, Long::sum);
}
최종 결과는 iterativeSum보다 약 20% 빨라졌다.
병렬처리 성능 문제
앞의 예시에서는 최적화를 통해 순차적 방식보다 빠른 병렬처리를 만들 수 있었다. 그러나 항상 이정도로 최적화가 가능한 것은 아니다. 당연하게도, 병렬처리를 하기 위해서 스트림을 분할하는 것은 비용이 든다. 따라서 병렬 스트림을 사용할 때는 분할로 인해 추가적으로 발생하는 비용과 이익을 비교해 보아야 한다. 예를 들어, 5개의 요소가 있는 스트림이 있다고 하자. 만약 이 5개의 요소가 각각 Disk I/O를 통해 1분이 걸리는 작업을 해야 한다면 병렬처리할 경우 1분이 소요되고, 순차적으로 처리하면 5분이 소요될 것이다. 반면, 5개의 숫자를 덧셈하는 경우를 생각해보면 병렬처리할 때 분할하고 합치는 오버헤드가 훨씬 커질 것이다. 결국 상황에 따라 다르기 때문에, 직접 벤치마크를 돌려보고 선택하는 것이 좋다.
분할과 관련된 오버헤드를 줄이기 위한 방법으로 Spliterator 인터페이스를 직접 구현하는 방법 있다. Spliterator는 splitable iterator라는 뜻으로 스트림을 분할하기 위해서 내부적으로 사용된다. 실제로 모든 컬렉션은 Spliterator를 구현하고 있기 때문에 자동으로 분할되어 병렬처리가 가능한 것이다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characeristics();
}
Spliterator 인터페이스는 위 4가지 메소드를 구현하여 만들 수 있다. tryAdvance는 iterator에서 hasNext와 비슷하게 탐색해야 할 요소가 남아있는지를 묻는 메소드다. trySplit은 Spliterator를 분할하여 자신과 다른 하나의 Spliterator로 나눈다. estimateSize는 남아있는 요소의 개수를 알려주고, characteristics는 Spliterator의 특성들을 정의할 수 있다. 우리는 지나치게 많이 분할하는 것을 막고싶은 것이므로, trySplit에서 남은 요소들이 특정 기준 이하인 경우 Spliterator를 반환하지 않도록 만들면 된다. 직접 구현한 Spliterator를 StreamSupport.stream이라는 메소드에 전달해서 병렬 스트림을 만들 수 있다.
분할을 적절히 했고, 각 스레드에서 할 일을 끝냈다고 하자. 이제 남은 것은 분할된 데이터를 합치는 것이다. 데이터를 합치는 것은 생각보다 쉽지 않다. 예를 들어, sort 연산을 병렬로 처리한다고 하자. 단순히 두 리스트를 붙이는 것이 아니라 두 리스트의 요소들을 탐색하면서 순서대로 정렬해야 한다. 마찬가지로, distinct도 중복을 없애주는 작업을 추가로 해주면서 합쳐야 한다.
심지어는 병렬처리가 불가능한 경우도 있다. 숫자를 소수인 것과 소수가 아닌 것으로 나누는 경우, 이전까지 소수인 것들을 모두 알아야만 판단할 수 있다. 따라서 이런 경우는 하나의 스레드에서 작업을 해야 한다. 결론적으로, 스트림 API에서 병렬처리가 만능은 아니라는 것이다. 성능을 향상시킬 수 있는지, 혹은 성능 향상은 필요없고 가독성과 생산성을 위해 스트림 API를 도입할 수도 있다. 여러 가능성을 생각해보고 스트림 API 도입을 판단하기 바란다.
주의할 점
스트림은 한 번만 사용가능하다
컬렉션을 사용할 때는 for문을 여러 번 반복해도 문제가 없다. 그러나 스트림은 다르다. 한 번 만들어진 스트림은 딱 한 번만 탐색할 수 있다. 예를 들어, strings라는 컬렉션으로부터 스트림을 만드는 예시를 보자.
Stream<String> s = strings.stream();
s.forEach(System.out::println);
s.forEach(System.out::println);
위와 같이 s라는 스트림을 두 번 사용하게 되면 IllegalStateException과 함께 스트림이 이미 처리되었거나 닫혔다는 메시지를 볼 수 있다. 이전 요소로 돌아갈 수도 없고, 새로운 요소를 추가할 수도 없다. 스트림은 한 번 소비되면 끝이다.
스트림은 게으르다
스트림 API는 명령형 프로그래밍과 달리 요청이 있을 때만 실행된다. 여기서 요청이란 스트림으로부터 무언가 결과를 얻는 '최종 연산'이라고 보면 된다.
Stream<String> coffeeInfoStream = menu.stream()
.filter(m -> m.type == MenuType.COFFEE)
.sorted(Comparator.comparing(m -> m.name))
.peek(m -> System.out.println(m.name))
.map(m -> m.name + " : " + m.price);
위의 코드는 스트림을 만들고, filter와 sort를 거친 메뉴들의 이름을 peek에서 출력한 다음, map에서 String으로 바꿔주는 코드이다. 그러나 실행해보면 아무런 일도 일어나지 않는다. 그저 스트림을 다른 스트림으로 바꿨을 뿐, 스트림을 소비하지 않았기 때문이다.
List<String> list = menu.stream()
.filter(m -> m.type == MenuType.COFFEE)
.sorted(Comparator.comparing(m -> m.name))
.peek(m -> System.out.println(m.name))
.map(m -> m.name + " : " + m.price)
.toList();
이제 toList를 이용해서 List로 바꿔주는 최종 연산을 추가하면 비로소 peek에서의 결과가 출력되는 것을 확인할 수 있다.
Effectively Final
스트림을 사용할 때, 외부의 변수를 스트림 API 안에서 사용하고 싶을 수도 있다. 예를 들어, 모든 메뉴의 가격을 1000원씩 인상하여 메뉴판을 만들었다가, 마음이 바뀌어 500원만 인상하여 메뉴판을 고치는 경우를 생각해보자.
int num = 1000;
List<Menu> newMenu1 = menu.stream()
.peek(m -> m.price += num)
.toList();
num = 500;
List<Menu> newMenu2 = menu.stream()
.peek(m -> m.price += num)
.toList();
위 코드는 컴파일에 실패한다. 람다 표현식에서 참조하는 로컬 변수는 final이거나 effectively final이어야 한다는 메시지가 나온다. 여기서 effectively final이라는 말은 명시적으로 final 선언은 하지 않더라도, 사실상 final과 비슷하게 값이 변하지 않는다는 의미이다.
도대체 왜 이러한 제약이 있는 것일까? JVM 메모리를 생각해보면 답을 알 수 있다. 로컬 변수인 num은 스택에 존재하고, 람다 표현식은 인스턴스로 힙에 존재하게 된다. 따라서 로컬 변수가 해제된 이후에 람다 표현식이 로컬 변수에 접근하게 되는 경우가 발생할 수 있다. 따라서 final이거나 effectively final인 로컬 변수만 허용하고, 실제 컴파일시에는 해당 변수를 람다 표현식에 복사하는 방식으로 사용된다. 이러한 방식을 람다 캡쳐링이라고 부른다.
동기화 문제
로컬 변수의 경우 위와 같은 제약과 람다 캡쳐링을 통해 동기화 문제를 해결해주지만, 인스턴스에 대한 동기화는 스트림 API가 해결해주지 못한다. 예를 들어, 스트림 외부의 리스트에 결과를 저장하는 경우를 생각해보자.
List<Integer> evenNumbers = new ArrayList<>();
IntStream.rangeClosed(1, 1000000)
.parallel()
.filter(i -> i % 2 == 0)
.forEach(evenNumbers::add);
위 코드는 1부터 100만까지의 수를 분할하여 여러 스레드에 전달하고, 짝수만을 걸러낸 후 evenNumbers 리스트에 추가한다. 따라서 결과를 리스트에 저장하는 과정에서 충돌이 발생한다. 실제로 evenNumbers의 크기를 확인해보면 기대했던 500000보다 작은 값이 나오거나, 혹은 ArrayIndexOutOfBoundsException이 발생하기도 한다.
이러한 동기화 문제는 thread-safe한 타입들만 공유 인스턴스로 사용하거나, 혹은 직접 synchronized로 동기화하는 방법도 있다. 그러나 이러한 방법보다는 리듀싱 연산으로 안전하게 합칠 수 있다면 가장 좋은 방법이다. 위의 예시처럼 forEach에서 다른 리스트에 추가하는 것이 아니라 toList를 호출하면 스트림 API가 안전하게 List를 만들어준다.
List<Integer> evenNumbers = IntStream.rangeClosed(1, 1000000)
.parallel()
.filter(i -> i % 2 == 0)
.boxed()
.toList();
3줄 요약
- 스트림은 연속된 데이터를 순회하며 처리하기 위한 API
- 선언형 프로그래밍 방법으로 동작만 전달해주면 내부적으로 반복이 일어난다
- 잘 사용할 경우 병렬처리 성능이 좋아진다
'Java > 모던 자바 인 액션' 카테고리의 다른 글
[Java] 모던 자바 인 액션(6) - 새로운 날짜, 시간 API (0) | 2023.03.03 |
---|---|
[Java] 모던 자바 인 액션(5) - Optional (0) | 2023.03.02 |
[Java] 모던 자바 인 액션(3) - 함수형 인터페이스와 람다 표현식 (1) | 2023.02.26 |
[Java] 모던 자바 인 액션(2) - 동작 파라미터화 (0) | 2023.02.25 |
[Java] 모던 자바 인 액션(1) - Java의 진화 (0) | 2023.02.25 |