데이터 결합 연산자
merge
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Flowable/Observable로 통지한다.
- 통지 시점이 빠른 Observable의 데이터부터 순차적으로 통지되고 통지 시점이 같을 경우에는 merge() 함수의 파라미터로 먼저 지정된 Observable의 데이터부터 통지된다.
200ms | 400ms | 600ms | 800ms | 1000ms | 1200ms | 1400ms | 1600ms | 1800ms | 2000ms | |
---|---|---|---|---|---|---|---|---|---|---|
Observable1 | 0 | 1 | 2 | 3 | 4 | |||||
Observable2 | 1000 | 1001 | 1002 | 1003 | 1004 |
- 데이터 통지 순서
- 0, 1, 1000, 2, 3, 1001, 4, 1002, 1003, 1004
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 두개 이상의 Observable을 merge하여 통지된 시간 순서대로 출력하는 예제
*/
public class ObservableMergeExample01 {
public static void main(String[] args) {
Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS)
.take(5);
Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS)
.take(5)
.map(num -> num + 1000);
Observable.merge(observable1, observable2)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(4000);
}
}
/*
onNext() | RxComputationThreadPool-1 | 17:06:17.714 | 0
onNext() | RxComputationThreadPool-1 | 17:06:17.859 | 1
onNext() | RxComputationThreadPool-2 | 17:06:17.864 | 1000
onNext() | RxComputationThreadPool-1 | 17:06:18.059 | 2
onNext() | RxComputationThreadPool-1 | 17:06:18.258 | 3
onNext() | RxComputationThreadPool-2 | 17:06:18.263 | 1001
onNext() | RxComputationThreadPool-1 | 17:06:18.459 | 4
onNext() | RxComputationThreadPool-2 | 17:06:18.662 | 1002
onNext() | RxComputationThreadPool-2 | 17:06:19.063 | 1003
onNext() | RxComputationThreadPool-2 | 17:06:19.463 | 1004
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 각 구간의 차량 속도 데이터를 3개의 Observable에서 통지된 순서대로 merge하여 출력하는 예제
*/
public class ObservableMergeExample02 {
public static void main(String[] args) {
Observable<String> observable1 = SampleData.getSpeedPerSection("A", 55L, SampleData.speedOfSectionA);
Observable<String> observable2 = SampleData.getSpeedPerSection("B", 100L, SampleData.speedOfSectionA);
Observable<String> observable3 = SampleData.getSpeedPerSection("C", 77L, SampleData.speedOfSectionA);
Observable.merge(observable1, observable2, observable3)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(1000L);
}
}
/*
onNext() | RxComputationThreadPool-1 | 17:10:20.259 | A 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-1 | 17:10:20.275 | B 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-1 | 17:10:20.276 | C 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-1 | 17:10:20.276 | C 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-1 | 17:10:20.277 | A 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-1 | 17:10:20.280 | A 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-2 | 17:10:20.293 | B 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-1 | 17:10:20.315 | A 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-3 | 17:10:20.328 | C 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-1 | 17:10:20.368 | A 지점의 차량 속도: 160
onNext() | RxComputationThreadPool-2 | 17:10:20.394 | B 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-3 | 17:10:20.405 | C 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-3 | 17:10:20.481 | C 지점의 차량 속도: 160
onNext() | RxComputationThreadPool-2 | 17:10:20.493 | B 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-2 | 17:10:20.592 | B 지점의 차량 속도: 160
*/
concat
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지한다.
- 하나의 Observable에서 통지가 끝나면 다음 Observable에서 연이어서 통지가 된다.
- 각 Observable의 통지 시점과는 상관없이 concat() 함수의 파라미터로 먼저 입력된 Observable의 데이터부터 모두 통지 된 후, 다음 Observable의 데이터가 통지된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* concat 을 이용하여 2개 이상의 Observable을 하나의 Observable로 이어 붙여서 통지하는 예제
*/
public class ObservableConcatExample01 {
public static void main(String[] args) {
Observable<Long> observable1 =
Observable.interval(500L, TimeUnit.MILLISECONDS)
.take(4);
Observable<Long> observable2 =
Observable.interval(300L, TimeUnit.MILLISECONDS)
.take(5)
.map(num -> num + 1000);
Observable.concat(observable1, observable2)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(3500L);
}
}
/*
onNext() | RxComputationThreadPool-1 | 23:07:36.788 | 0
onNext() | RxComputationThreadPool-1 | 23:07:37.257 | 1
onNext() | RxComputationThreadPool-1 | 23:07:37.756 | 2
onNext() | RxComputationThreadPool-1 | 23:07:38.258 | 3
onNext() | RxComputationThreadPool-2 | 23:07:38.571 | 1000
onNext() | RxComputationThreadPool-2 | 23:07:38.871 | 1001
onNext() | RxComputationThreadPool-2 | 23:07:39.170 | 1002
onNext() | RxComputationThreadPool-2 | 23:07:39.471 | 1003
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 3개의 Observable에서 통지된 순서와 상관없이 Observable이 concat()에
* 입력된 순서대로 각 구간의 차량 속도 데이터를 이어 붙여 출력하는 예제
*/
public class ObservableConcatExample02 {
public static void main(String[] args) {
List<Observable<String>> speedPerSectionList = Arrays.asList(
SampleData.getSpeedPerSection("A", 55L, SampleData.speedOfSectionA),
SampleData.getSpeedPerSection("B", 100L, SampleData.speedOfSectionA),
SampleData.getSpeedPerSection("C", 77L, SampleData.speedOfSectionA)
);
Observable.concat(speedPerSectionList)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(2000L);
}
}
/*
onNext() | RxComputationThreadPool-1 | 23:17:05.997 | A 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-1 | 23:17:06.014 | A 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-1 | 23:17:06.071 | A 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-1 | 23:17:06.126 | A 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-1 | 23:17:06.181 | A 지점의 차량 속도: 160
onNext() | RxComputationThreadPool-2 | 23:17:06.290 | B 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-2 | 23:17:06.395 | B 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-2 | 23:17:06.491 | B 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-2 | 23:17:06.587 | B 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-2 | 23:17:06.687 | B 지점의 차량 속도: 160
onNext() | RxComputationThreadPool-3 | 23:17:06.770 | C 지점의 차량 속도: 100
onNext() | RxComputationThreadPool-3 | 23:17:06.846 | C 지점의 차량 속도: 110
onNext() | RxComputationThreadPool-3 | 23:17:06.922 | C 지점의 차량 속도: 115
onNext() | RxComputationThreadPool-3 | 23:17:06.999 | C 지점의 차량 속도: 130
onNext() | RxComputationThreadPool-3 | 23:17:07.076 | C 지점의 차량 속도: 160
*/
zip
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지한다.
- 각 Observable에서 통지된 데이터가 모두 모이면 각 Observable에서 동일한 index의 데이터로 새로운 데이터를 생성한 후 통지한다.
- 통지하는 데이터 개수가 가장 적은 Observable의 통지 시점에 완료 통지 시점을 맞춘다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* zip을 이용해 2개의 Observable이 통지하는 데이터 중에서 통지되는 순서가 일치하는 데이터들을 조합하는 예제
*/
public class ObservableZipExample01 {
public static void main(String[] args) {
Observable<Long> observable1 =
Observable.interval(200L, TimeUnit.MILLISECONDS)
.take(4);
Observable<Long> observable2 =
Observable.interval(400L, TimeUnit.MILLISECONDS)
.take(6);
Observable.zip(observable1, observable2, (data1 , data2) -> data1 + data2)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(3000L);
}
}
/*
onNext() | RxComputationThreadPool-2 | 23:35:25.416 | 0
onNext() | RxComputationThreadPool-1 | 23:35:25.774 | 2
onNext() | RxComputationThreadPool-2 | 23:35:26.174 | 4
onNext() | RxComputationThreadPool-2 | 23:35:26.575 | 6
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 서울, 부산, 인천의 시간별 미세먼지 농도를 시간별로 zip한 후, 시간별로 가장 높은 미세머지 농도를 출력하는 예제
*/
public class ObservableZipExample02 {
public static void main(String[] args) {
Observable<Integer> observable1 = Observable.fromIterable(SampleData.seoulPM10List);
Observable<Integer> observable2 = Observable.fromIterable(SampleData.busanPM10List);
Observable<Integer> observable3 = Observable.fromIterable(SampleData.incheonPM10List);
Observable<Integer> observable4 = Observable.range(1, 24);
Observable.zip(observable1, observable2, observable3, observable4, (data1, data2, data3, hour) -> hour + "시: " + Collections.max(Arrays.asList(data1, data2, data3)))
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
}
}
/*
onNext() | main | 23:46:27.605 | 1시: 55
onNext() | main | 23:46:27.608 | 2시: 40
onNext() | main | 23:46:27.608 | 3시: 73
onNext() | main | 23:46:27.608 | 4시: 70
onNext() | main | 23:46:27.608 | 5시: 100
onNext() | main | 23:46:27.608 | 6시: 110
onNext() | main | 23:46:27.609 | 7시: 120
onNext() | main | 23:46:27.609 | 8시: 90
onNext() | main | 23:46:27.609 | 9시: 80
onNext() | main | 23:46:27.609 | 10시: 73
onNext() | main | 23:46:27.609 | 11시: 80
onNext() | main | 23:46:27.609 | 12시: 70
onNext() | main | 23:46:27.609 | 13시: 95
onNext() | main | 23:46:27.609 | 14시: 95
onNext() | main | 23:46:27.610 | 15시: 100
onNext() | main | 23:46:27.610 | 16시: 150
onNext() | main | 23:46:27.610 | 17시: 140
onNext() | main | 23:46:27.610 | 18시: 130
onNext() | main | 23:46:27.610 | 19시: 170
onNext() | main | 23:46:27.610 | 20시: 130
onNext() | main | 23:46:27.610 | 21시: 100
onNext() | main | 23:46:27.611 | 22시: 125
onNext() | main | 23:46:27.611 | 23시: 135
onNext() | main | 23:46:27.611 | 24시: 125
*/
combineLatest
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지한다.
- 각 Observable에서 데이터를 통지할 때마다 모든 Observable에서 마지막으로 통지한 각 데이터를 함수형 인터페이스에 전달하고, 새로운 데이터를 생성해 통지한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* - 각 Observable에서 통지할 때 마다 모든 Observable에서 마지막으로 통지한 데이터들을 함수형 인터페이스에 반환하고,
* 이를 가공해서 통지하는 예제.
* - 각 Observable 중 하나의 Observable에서만 통지가 발생하더라도 이미 통지한 Observable의 마지막 데이터와
* 함께 전달된다.
*/
public class ObservableCombineLatestExample01 {
public static void main(String[] args) {
Observable<Long> observable1 =
Observable.interval(500L, TimeUnit.MILLISECONDS)
//.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# observable 1 : " + data))
.take(4);
Observable<Long> observable2 =
Observable.interval(700L, TimeUnit.MILLISECONDS)
//.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# observable 2 : " + data))
.take(4);
Observable.combineLatest(
observable1,
observable2,
(data1, data2) -> "data1: " + data1 + "\tdata2: " + data2)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(3000L);
}
}
/*
onNext() | RxComputationThreadPool-2 | 00:08:07.081 | data1: 0 data2: 0
onNext() | RxComputationThreadPool-1 | 00:08:07.344 | data1: 1 data2: 0
onNext() | RxComputationThreadPool-2 | 00:08:07.743 | data1: 1 data2: 1
onNext() | RxComputationThreadPool-1 | 00:08:07.843 | data1: 2 data2: 1
onNext() | RxComputationThreadPool-1 | 00:08:08.344 | data1: 3 data2: 1
onNext() | RxComputationThreadPool-2 | 00:08:08.441 | data1: 3 data2: 2
onNext() | RxComputationThreadPool-2 | 00:08:09.143 | data1: 3 data2: 3
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 랜덤 온도 데이터와 습도 데이터를 최신 데이터로 가져오는 예제
*/
public class ObservableCombineLatestExample02 {
public static void main(String[] args) {
// 랜덤 온도 데이터
Observable<Integer> observable1 = Observable.interval(NumberUtil.randomRange(100, 500), TimeUnit.MILLISECONDS)
.take(10)
.map(notUse -> SampleData.temperatureOfSeoul[NumberUtil.randomRange(0, 5)]);
// 랜덤 습도 데이터
Observable<Integer> observable2 = Observable.interval(NumberUtil.randomRange(100, 500), TimeUnit.MILLISECONDS)
.take(10)
.map(notUse -> SampleData.humidityOfSeoul[NumberUtil.randomRange(0, 5)]);
Observable.combineLatest(observable1, observable2,
(temperature, humidity) -> "최신 온습도 데이터 - 온도: " + temperature + "도\t습도: " + humidity + "%")
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(3000L);
}
}
/*
onNext() | RxComputationThreadPool-1 | 00:11:38.984 | 최신 온습도 데이터 - 온도: 12도 습도: 45%
onNext() | RxComputationThreadPool-1 | 00:11:38.995 | 최신 온습도 데이터 - 온도: 12도 습도: 45%
onNext() | RxComputationThreadPool-2 | 00:11:39.276 | 최신 온습도 데이터 - 온도: 12도 습도: 32%
onNext() | RxComputationThreadPool-1 | 00:11:39.278 | 최신 온습도 데이터 - 온도: 12도 습도: 32%
onNext() | RxComputationThreadPool-2 | 00:11:39.585 | 최신 온습도 데이터 - 온도: 12도 습도: 43%
onNext() | RxComputationThreadPool-1 | 00:11:39.740 | 최신 온습도 데이터 - 온도: 13도 습도: 43%
onNext() | RxComputationThreadPool-2 | 00:11:39.891 | 최신 온습도 데이터 - 온도: 13도 습도: 32%
onNext() | RxComputationThreadPool-2 | 00:11:40.195 | 최신 온습도 데이터 - 온도: 13도 습도: 62%
onNext() | RxComputationThreadPool-1 | 00:11:40.204 | 최신 온습도 데이터 - 온도: 13도 습도: 62%
onNext() | RxComputationThreadPool-2 | 00:11:40.505 | 최신 온습도 데이터 - 온도: 13도 습도: 45%
onNext() | RxComputationThreadPool-1 | 00:11:40.665 | 최신 온습도 데이터 - 온도: 12도 습도: 45%
onNext() | RxComputationThreadPool-2 | 00:11:40.812 | 최신 온습도 데이터 - 온도: 12도 습도: 32%
onNext() | RxComputationThreadPool-2 | 00:11:41.119 | 최신 온습도 데이터 - 온도: 12도 습도: 45%
onNext() | RxComputationThreadPool-1 | 00:11:41.128 | 최신 온습도 데이터 - 온도: 11도 습도: 45%
*/
기타 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* zip을 이용하여 각 지점별 월별 매출(SampleData.salesOfBranchA, SampleData.salesOfBranchB, SampleData.salesOfBranchC)을
* 월별로 합산하여 통합 월별 매출을 출력하세요.
* (지점별 월별 매출 List(salesOfBranchA, salesOfBranchB, salesOfBranchC)는 index가 빠른 요소부터 1월입니다.)
*/
public class QuizAnswerForChapter050501 {
public static void main(String[] args) {
// Observable<Integer> observable1 = Observable.fromIterable(SampleData.salesOfBranchA);
// Observable<Integer> observable2 = Observable.fromIterable(SampleData.salesOfBranchB);
// Observable<Integer> observable3 = Observable.fromIterable(SampleData.salesOfBranchC);
// Observable<Integer> observable4 = Observable.range(1, 12);
//
// Observable.zip(observable1,observable2, observable3, observable4,
// (data1, data2, data3, month) -> month + "월 매출 : " + (data1 + data2 + data3)
// ).subscribe(data -> Logger.log(LogType.ON_NEXT, data));
List<Observable<Integer>> salesList = Arrays.asList(
Observable.fromIterable(SampleData.salesOfBranchA),
Observable.fromIterable(SampleData.salesOfBranchB),
Observable.fromIterable(SampleData.salesOfBranchC)
);
Observable.zip(salesList, sales -> (int)sales[0] + (int)sales[1] + (int)sales[2])
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
}
}
/*
onNext() | main | 00:18:23.758 | 38000000
onNext() | main | 00:18:23.762 | 69000000
onNext() | main | 00:18:23.762 | 44000000
onNext() | main | 00:18:23.762 | 100000000
onNext() | main | 00:18:23.763 | 69000000
onNext() | main | 00:18:23.763 | 126000000
onNext() | main | 00:18:23.763 | 157000000
onNext() | main | 00:18:23.763 | 136000000
onNext() | main | 00:18:23.763 | 92000000
onNext() | main | 00:18:23.763 | 72000000
onNext() | main | 00:18:23.763 | 52000000
onNext() | main | 00:18:23.763 | 37000000
*/
이 글은 inflearn에 있는 Kevin의 알기 쉬운 RxJava 1부를 공부하고 작성한 글입니다.
강의영상 링크