Posts RxJava - 데이터 결합 연산자
Post
Cancel

RxJava - 데이터 결합 연산자

데이터 결합 연산자

merge

  • 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Flowable/Observable로 통지한다.
  • 통지 시점이 빠른 Observable의 데이터부터 순차적으로 통지되고 통지 시점이 같을 경우에는 merge() 함수의 파라미터로 먼저 지정된 Observable의 데이터부터 통지된다.

merge

 200ms400ms600ms800ms1000ms1200ms1400ms1600ms1800ms2000ms
Observable101234     
Observable2 1000 1001 1002 1003 1004
  • 데이터 통지 순서
    • 0, 1, 1000, 2, 3, 1001, 4, 1002, 1003, 1004
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * 두개 이상의 Observable을 merge하여 통지된 시간 순서대로 출력하는 예제
 */
public class ObservableMergeExample01 {
    public static void main(String[] args) {
        Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS)
                .take(5);

        Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS)
                .take(5)
                .map(num -> num + 1000);

        Observable.merge(observable1, observable2)
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(4000);
    }
}
/*
onNext() | RxComputationThreadPool-1 | 17:06:17.714 | 0
onNext() | RxComputationThreadPool-1 | 17:06:17.859 | 1
onNext() | RxComputationThreadPool-2 | 17:06:17.864 | 1000
onNext() | RxComputationThreadPool-1 | 17:06:18.059 | 2
onNext() | RxComputationThreadPool-1 | 17:06:18.258 | 3
onNext() | RxComputationThreadPool-2 | 17:06:18.263 | 1001
onNext() | RxComputationThreadPool-1 | 17:06:18.459 | 4
onNext() | RxComputationThreadPool-2 | 17:06:18.662 | 1002
onNext() | RxComputationThreadPool-2 | 17:06:19.063 | 1003
onNext() | RxComputationThreadPool-2 | 17:06:19.463 | 1004
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 * 각 구간의 차량 속도 데이터를 3개의 Observable에서 통지된 순서대로 merge하여 출력하는 예제
 */
public class ObservableMergeExample02 {
    public static void main(String[] args) {
        Observable<String> observable1 = SampleData.getSpeedPerSection("A", 55L, SampleData.speedOfSectionA);
        Observable<String> observable2 = SampleData.getSpeedPerSection("B", 100L, SampleData.speedOfSectionA);
        Observable<String> observable3 = SampleData.getSpeedPerSection("C", 77L, SampleData.speedOfSectionA);

        Observable.merge(observable1, observable2, observable3)
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(1000L);
    }
}
/*
onNext() | RxComputationThreadPool-1 | 17:10:20.259 | A 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-1 | 17:10:20.275 | B 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-1 | 17:10:20.276 | C 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-1 | 17:10:20.276 | C 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-1 | 17:10:20.277 | A 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-1 | 17:10:20.280 | A 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-2 | 17:10:20.293 | B 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-1 | 17:10:20.315 | A 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-3 | 17:10:20.328 | C 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-1 | 17:10:20.368 | A 지점의 차량 속도: 160
onNext() | RxComputationThreadPool-2 | 17:10:20.394 | B 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-3 | 17:10:20.405 | C 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-3 | 17:10:20.481 | C 지점의 차량 속도: 160
onNext() | RxComputationThreadPool-2 | 17:10:20.493 | B 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-2 | 17:10:20.592 | B 지점의 차량 속도: 160
*/

concat

  • 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지한다.
  • 하나의 Observable에서 통지가 끝나면 다음 Observable에서 연이어서 통지가 된다.
  • 각 Observable의 통지 시점과는 상관없이 concat() 함수의 파라미터로 먼저 입력된 Observable의 데이터부터 모두 통지 된 후, 다음 Observable의 데이터가 통지된다.

concat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * concat 을 이용하여 2개 이상의 Observable을 하나의 Observable로 이어 붙여서 통지하는 예제
 */
public class ObservableConcatExample01 {
    public static void main(String[] args) {
        Observable<Long> observable1 =
                Observable.interval(500L, TimeUnit.MILLISECONDS)
                        .take(4);

        Observable<Long> observable2 =
                Observable.interval(300L, TimeUnit.MILLISECONDS)
                        .take(5)
                        .map(num -> num + 1000);

        Observable.concat(observable1, observable2)
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(3500L);
    }
}
/*
onNext() | RxComputationThreadPool-1 | 23:07:36.788 | 0
onNext() | RxComputationThreadPool-1 | 23:07:37.257 | 1
onNext() | RxComputationThreadPool-1 | 23:07:37.756 | 2
onNext() | RxComputationThreadPool-1 | 23:07:38.258 | 3
onNext() | RxComputationThreadPool-2 | 23:07:38.571 | 1000
onNext() | RxComputationThreadPool-2 | 23:07:38.871 | 1001
onNext() | RxComputationThreadPool-2 | 23:07:39.170 | 1002
onNext() | RxComputationThreadPool-2 | 23:07:39.471 | 1003
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
 * 3개의 Observable에서 통지된 순서와 상관없이 Observable이 concat()에
 * 입력된 순서대로 각 구간의 차량 속도 데이터를 이어 붙여 출력하는 예제
 */
public class ObservableConcatExample02 {
    public static void main(String[] args) {
        List<Observable<String>> speedPerSectionList = Arrays.asList(
                SampleData.getSpeedPerSection("A", 55L, SampleData.speedOfSectionA),
                SampleData.getSpeedPerSection("B", 100L, SampleData.speedOfSectionA),
                SampleData.getSpeedPerSection("C", 77L, SampleData.speedOfSectionA)
        );

        Observable.concat(speedPerSectionList)
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(2000L);
    }
}
/*
onNext() | RxComputationThreadPool-1 | 23:17:05.997 | A 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-1 | 23:17:06.014 | A 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-1 | 23:17:06.071 | A 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-1 | 23:17:06.126 | A 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-1 | 23:17:06.181 | A 지점의 차량 속도: 160
onNext() | RxComputationThreadPool-2 | 23:17:06.290 | B 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-2 | 23:17:06.395 | B 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-2 | 23:17:06.491 | B 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-2 | 23:17:06.587 | B 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-2 | 23:17:06.687 | B 지점의 차량 속도: 160
onNext() | RxComputationThreadPool-3 | 23:17:06.770 | C 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-3 | 23:17:06.846 | C 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-3 | 23:17:06.922 | C 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-3 | 23:17:06.999 | C 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-3 | 23:17:07.076 | C 지점의 차량 속도: 160
*/

zip

  • 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지한다.
  • 각 Observable에서 통지된 데이터가 모두 모이면 각 Observable에서 동일한 index의 데이터로 새로운 데이터를 생성한 후 통지한다.
  • 통지하는 데이터 개수가 가장 적은 Observable의 통지 시점에 완료 통지 시점을 맞춘다.

zip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
 * zip을 이용해 2개의 Observable이 통지하는 데이터 중에서 통지되는 순서가 일치하는 데이터들을 조합하는 예제
 */
public class ObservableZipExample01 {
    public static void main(String[] args) {
        Observable<Long> observable1 =
                Observable.interval(200L, TimeUnit.MILLISECONDS)
                .take(4);

        Observable<Long> observable2 =
                Observable.interval(400L, TimeUnit.MILLISECONDS)
                .take(6);

        Observable.zip(observable1, observable2, (data1 , data2) -> data1 + data2)
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(3000L);
    }
}
/*
onNext() | RxComputationThreadPool-2 | 23:35:25.416 | 0
onNext() | RxComputationThreadPool-1 | 23:35:25.774 | 2
onNext() | RxComputationThreadPool-2 | 23:35:26.174 | 4
onNext() | RxComputationThreadPool-2 | 23:35:26.575 | 6
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
 * 서울, 부산, 인천의 시간별 미세먼지 농도를 시간별로 zip한 후, 시간별로 가장 높은 미세머지 농도를 출력하는 예제
 */
public class ObservableZipExample02 {
    public static void main(String[] args) {
        Observable<Integer> observable1 = Observable.fromIterable(SampleData.seoulPM10List);
        Observable<Integer> observable2 = Observable.fromIterable(SampleData.busanPM10List);
        Observable<Integer> observable3 = Observable.fromIterable(SampleData.incheonPM10List);

        Observable<Integer> observable4 = Observable.range(1, 24);

        Observable.zip(observable1, observable2, observable3, observable4, (data1, data2, data3, hour) -> hour + "시: " + Collections.max(Arrays.asList(data1, data2, data3)))
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    }
}
/*
onNext() | main | 23:46:27.605 | 1시: 55
onNext() | main | 23:46:27.608 | 2시: 40
onNext() | main | 23:46:27.608 | 3시: 73
onNext() | main | 23:46:27.608 | 4시: 70
onNext() | main | 23:46:27.608 | 5시: 100
onNext() | main | 23:46:27.608 | 6시: 110
onNext() | main | 23:46:27.609 | 7시: 120
onNext() | main | 23:46:27.609 | 8시: 90
onNext() | main | 23:46:27.609 | 9시: 80
onNext() | main | 23:46:27.609 | 10시: 73
onNext() | main | 23:46:27.609 | 11시: 80
onNext() | main | 23:46:27.609 | 12시: 70
onNext() | main | 23:46:27.609 | 13시: 95
onNext() | main | 23:46:27.609 | 14시: 95
onNext() | main | 23:46:27.610 | 15시: 100
onNext() | main | 23:46:27.610 | 16시: 150
onNext() | main | 23:46:27.610 | 17시: 140
onNext() | main | 23:46:27.610 | 18시: 130
onNext() | main | 23:46:27.610 | 19시: 170
onNext() | main | 23:46:27.610 | 20시: 130
onNext() | main | 23:46:27.610 | 21시: 100
onNext() | main | 23:46:27.611 | 22시: 125
onNext() | main | 23:46:27.611 | 23시: 135
onNext() | main | 23:46:27.611 | 24시: 125
*/

combineLatest

  • 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지한다.
  • 각 Observable에서 데이터를 통지할 때마다 모든 Observable에서 마지막으로 통지한 각 데이터를 함수형 인터페이스에 전달하고, 새로운 데이터를 생성해 통지한다.

combineLatest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
 * - 각 Observable에서 통지할 때 마다 모든 Observable에서 마지막으로 통지한 데이터들을 함수형 인터페이스에 반환하고,
 * 이를 가공해서 통지하는 예제.
 * - 각 Observable 중 하나의 Observable에서만 통지가 발생하더라도 이미 통지한 Observable의 마지막 데이터와
 * 함께 전달된다.
 */
public class ObservableCombineLatestExample01 {
    public static void main(String[] args) {
        Observable<Long> observable1 =
                Observable.interval(500L, TimeUnit.MILLISECONDS)
                        //.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# observable 1 : " + data))
                        .take(4);

        Observable<Long> observable2 =
                Observable.interval(700L, TimeUnit.MILLISECONDS)
                        //.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# observable 2 : " + data))
                        .take(4);

        Observable.combineLatest(
                observable1,
                observable2,
                (data1, data2) -> "data1: " + data1 + "\tdata2: " + data2)
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(3000L);
    }
}
/*
onNext() | RxComputationThreadPool-2 | 00:08:07.081 | data1: 0	data2: 0
onNext() | RxComputationThreadPool-1 | 00:08:07.344 | data1: 1	data2: 0
onNext() | RxComputationThreadPool-2 | 00:08:07.743 | data1: 1	data2: 1
onNext() | RxComputationThreadPool-1 | 00:08:07.843 | data1: 2	data2: 1
onNext() | RxComputationThreadPool-1 | 00:08:08.344 | data1: 3	data2: 1
onNext() | RxComputationThreadPool-2 | 00:08:08.441 | data1: 3	data2: 2
onNext() | RxComputationThreadPool-2 | 00:08:09.143 | data1: 3	data2: 3
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
 * 랜덤 온도 데이터와 습도 데이터를 최신 데이터로 가져오는 예제
 */
public class ObservableCombineLatestExample02 {
    public static void main(String[] args) {
        // 랜덤 온도 데이터
        Observable<Integer> observable1 = Observable.interval(NumberUtil.randomRange(100, 500), TimeUnit.MILLISECONDS)
                .take(10)
                .map(notUse -> SampleData.temperatureOfSeoul[NumberUtil.randomRange(0, 5)]);

        // 랜덤 습도 데이터
        Observable<Integer> observable2 = Observable.interval(NumberUtil.randomRange(100, 500), TimeUnit.MILLISECONDS)
                .take(10)
                .map(notUse -> SampleData.humidityOfSeoul[NumberUtil.randomRange(0, 5)]);

        Observable.combineLatest(observable1, observable2,
                (temperature, humidity) -> "최신 온습도 데이터 - 온도: " + temperature + "도\t습도: " + humidity + "%")
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(3000L);
    }
}
/*
onNext() | RxComputationThreadPool-1 | 00:11:38.984 | 최신 온습도 데이터 - 온도: 12도	습도: 45%
onNext() | RxComputationThreadPool-1 | 00:11:38.995 | 최신 온습도 데이터 - 온도: 12도	습도: 45%
onNext() | RxComputationThreadPool-2 | 00:11:39.276 | 최신 온습도 데이터 - 온도: 12도	습도: 32%
onNext() | RxComputationThreadPool-1 | 00:11:39.278 | 최신 온습도 데이터 - 온도: 12도	습도: 32%
onNext() | RxComputationThreadPool-2 | 00:11:39.585 | 최신 온습도 데이터 - 온도: 12도	습도: 43%
onNext() | RxComputationThreadPool-1 | 00:11:39.740 | 최신 온습도 데이터 - 온도: 13도	습도: 43%
onNext() | RxComputationThreadPool-2 | 00:11:39.891 | 최신 온습도 데이터 - 온도: 13도	습도: 32%
onNext() | RxComputationThreadPool-2 | 00:11:40.195 | 최신 온습도 데이터 - 온도: 13도	습도: 62%
onNext() | RxComputationThreadPool-1 | 00:11:40.204 | 최신 온습도 데이터 - 온도: 13도	습도: 62%
onNext() | RxComputationThreadPool-2 | 00:11:40.505 | 최신 온습도 데이터 - 온도: 13도	습도: 45%
onNext() | RxComputationThreadPool-1 | 00:11:40.665 | 최신 온습도 데이터 - 온도: 12도	습도: 45%
onNext() | RxComputationThreadPool-2 | 00:11:40.812 | 최신 온습도 데이터 - 온도: 12도	습도: 32%
onNext() | RxComputationThreadPool-2 | 00:11:41.119 | 최신 온습도 데이터 - 온도: 12도	습도: 45%
onNext() | RxComputationThreadPool-1 | 00:11:41.128 | 최신 온습도 데이터 - 온도: 11도	습도: 45%
*/

기타 예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
 * zip을 이용하여 각 지점별 월별 매출(SampleData.salesOfBranchA, SampleData.salesOfBranchB, SampleData.salesOfBranchC)을
 * 월별로 합산하여 통합 월별 매출을 출력하세요.
 * (지점별 월별 매출 List(salesOfBranchA, salesOfBranchB, salesOfBranchC)는 index가 빠른 요소부터 1월입니다.)
 */
public class QuizAnswerForChapter050501 {
    public static void main(String[] args) {
//        Observable<Integer> observable1 = Observable.fromIterable(SampleData.salesOfBranchA);
//        Observable<Integer> observable2 = Observable.fromIterable(SampleData.salesOfBranchB);
//        Observable<Integer> observable3 = Observable.fromIterable(SampleData.salesOfBranchC);
//        Observable<Integer> observable4 = Observable.range(1, 12);
//
//        Observable.zip(observable1,observable2, observable3, observable4,
//                (data1, data2, data3, month) -> month + "월 매출 : " + (data1 + data2 + data3)
//                ).subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        List<Observable<Integer>> salesList = Arrays.asList(
                Observable.fromIterable(SampleData.salesOfBranchA),
                Observable.fromIterable(SampleData.salesOfBranchB),
                Observable.fromIterable(SampleData.salesOfBranchC)
        );

        Observable.zip(salesList, sales -> (int)sales[0] + (int)sales[1] + (int)sales[2])
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    }
}
/*
onNext() | main | 00:18:23.758 | 38000000
onNext() | main | 00:18:23.762 | 69000000
onNext() | main | 00:18:23.762 | 44000000
onNext() | main | 00:18:23.762 | 100000000
onNext() | main | 00:18:23.763 | 69000000
onNext() | main | 00:18:23.763 | 126000000
onNext() | main | 00:18:23.763 | 157000000
onNext() | main | 00:18:23.763 | 136000000
onNext() | main | 00:18:23.763 | 92000000
onNext() | main | 00:18:23.763 | 72000000
onNext() | main | 00:18:23.763 | 52000000
onNext() | main | 00:18:23.763 | 37000000
*/

이 글은 inflearn에 있는 Kevin의 알기 쉬운 RxJava 1부를 공부하고 작성한 글입니다.
강의영상 링크

This post is licensed under CC BY 4.0 by the author.

RxJava - 데이터 변환 연산자 (2)

RxJava - 에러 처리 연산자

Comments powered by Disqus.