스케쥴러(Scheduler)란?
- RxJava에서의 스케쥴러는 RxJava 비동기 프로그래밍을 위한 쓰레드(Thread) 관리자이다.
- 즉, 스케쥴러를 이용해서 어떤 쓰레드에서 무엇을 처리할 지에 대해서 제어할 수 있다.
- 스케쥴러를 이용해서 데이터를 통지하는 쪽과 데이터를 처리하는 쪽 쓰레드를 별도로 지정해서 분리할 수 있다.
- RxJava의 스케쥴러를 통해 쓰레드를 위한 코드의 간결성 및 쓰레드 관리의 복잡함을 줄일 수 있다.
- RxJava에서 스케쥴러를 지정하기 위해서 subscribeOn(), observeOn() 유틸리티 연산자를 사용한다.
- 생산자쪽의 데이터 흐름을 제어하기 위해서는 subscribeOn() 연산자를 사용한다.
- 소비자쪽에서 전달받은 데이터 처리를 제어하기 위해서는 observeOn() 연산자를 사용한다.
- subscribeOn(), observeOn() 연산자는 각각 파라미터로 Scheduler를 지정해야 한다.
스케쥴러(Scheduler)의 종류 (1)
스케쥴러 | 설명 |
---|---|
Schdulers.io() | - I/O 처리 작업을 할 때 사용하는 스케쥴러 - 네트워크 요청 처리, 각종 입/출력 작업, 데이터베이스 쿼리 등에 사용 - 쓰레드 풀에서 쓰레드를 가져오거나 가져올 쓰레드가 없으면 새로운 쓰레드를 생성한다. |
Schdulers.computation() | - 논리적인 연산 처리 시, 사용하는 스케쥴러 - CPU 코어의 물리적 쓰레드 수를 넘지 않는 범위에서 쓰레들르 생성한다. - 대기 시간 없이 빠르게 계산 작업을 수행하기위해 사용한다. |
Schdulers.newThread() | - 요청시마다 매번 새로운 쓰레드를 생성한다. - 매번 생성되면 쓰레드 비용도 많이 들고, 재사용도 되지 않는다. |
예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Scheduler.io()를 이용한 파일 입/출력 예제
* subscribeOn() 만 지정하면 데이터 통지 및 처리를 모두 RxCachedThreadScheduler 쓰레드에서 실행한다.
*/
public class SchedulerIOExample01 {
public static void main(String[] args) {
File[] files = new File("src/main/java/com/itvillage/").listFiles();
Observable.fromArray(files)
.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data.getName()))
.filter(data -> data.isDirectory())
.map(dir -> dir.getName())
.subscribeOn(Schedulers.io())
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(1000L);
}
}
/*
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.277 | chapter05
onNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter05
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter02
onNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter02
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter03
onNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter03
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter04
onNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter04
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | utils
onNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | utils
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | common
onNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | common
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | section01
onNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | section01
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | section00
onNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | section00
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Scheduler.io()를 이용한 파일 입/출력 예제
* subscribeOn(), observeOn()을 모두 지정하면 데이터 통지와 데이터 처리를 개별 쓰레드에서 진행한다.
*/
public class SchedulerIOExample02 {
public static void main(String[] args) {
File[] files = new File("src/main/java/com/itvillage/").listFiles();
Observable.fromArray(files)
.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data.getName()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.filter(data -> data.isDirectory())
.map(dir -> dir.getName())
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(1000L);
}
}
/*
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.372 | chapter05
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.378 | chapter02
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.379 | chapter03
onNext() | RxComputationThreadPool-1 | 23:56:57.379 | chapter05
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.379 | chapter04
onNext() | RxComputationThreadPool-1 | 23:56:57.379 | chapter02
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.379 | utils
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.379 | common
onNext() | RxComputationThreadPool-1 | 23:56:57.379 | chapter03
onNext() | RxComputationThreadPool-1 | 23:56:57.380 | chapter04
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.380 | section01
onNext() | RxComputationThreadPool-1 | 23:56:57.380 | utils
onNext() | RxComputationThreadPool-1 | 23:56:57.380 | common
onNext() | RxComputationThreadPool-1 | 23:56:57.381 | section01
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.382 | section00
onNext() | RxComputationThreadPool-1 | 23:56:57.384 | section00
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Scheduler.io()를 이용한 파일 입/출력 예제
* observeOn()을 여러개 지정하면 지정한 다음의 데이터 처리를 각각 개별 쓰레드에서 진행한다.
*/
public class SchedulerIOExample03 {
public static void main(String[] args) {
File[] files = new File("src/main/java/com/itvillage/").listFiles();
Observable.fromArray(files)
.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# 데이터 통지"))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.filter(data -> data.isDirectory())
.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# filter() 거침"))
.observeOn(Schedulers.computation())
.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# map() 거침"))
.map(dir -> dir.getName())
.observeOn(Schedulers.computation())
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(1000L);
}
}
/*
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.078 | # 데이터 통지
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.083 | # 데이터 통지
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.083 | # 데이터 통지
doOnNext() | RxComputationThreadPool-1 | 00:04:29.083 | # filter() 거침
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.083 | # 데이터 통지
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.083 | # 데이터 통지
doOnNext() | RxComputationThreadPool-1 | 00:04:29.083 | # filter() 거침
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.083 | # 데이터 통지
doOnNext() | RxComputationThreadPool-1 | 00:04:29.084 | # filter() 거침
doOnNext() | RxComputationThreadPool-2 | 00:04:29.083 | # map() 거침
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.084 | # 데이터 통지
doOnNext() | RxComputationThreadPool-1 | 00:04:29.084 | # filter() 거침
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.084 | # 데이터 통지
doOnNext() | RxComputationThreadPool-1 | 00:04:29.084 | # filter() 거침
doOnNext() | RxComputationThreadPool-2 | 00:04:29.084 | # map() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.084 | chapter05
onNext() | RxComputationThreadPool-3 | 00:04:29.085 | chapter02
doOnNext() | RxComputationThreadPool-1 | 00:04:29.084 | # filter() 거침
doOnNext() | RxComputationThreadPool-2 | 00:04:29.085 | # map() 거침
doOnNext() | RxComputationThreadPool-1 | 00:04:29.085 | # filter() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.085 | chapter03
doOnNext() | RxComputationThreadPool-1 | 00:04:29.085 | # filter() 거침
doOnNext() | RxComputationThreadPool-2 | 00:04:29.085 | # map() 거침
doOnNext() | RxComputationThreadPool-2 | 00:04:29.086 | # map() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.086 | chapter04
doOnNext() | RxComputationThreadPool-2 | 00:04:29.086 | # map() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.086 | utils
doOnNext() | RxComputationThreadPool-2 | 00:04:29.086 | # map() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.086 | common
doOnNext() | RxComputationThreadPool-2 | 00:04:29.087 | # map() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.087 | section01
onNext() | RxComputationThreadPool-3 | 00:04:29.087 | section00
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Schedulers.computation() 을 이용해서 계산 작업을 처리하는 예제
*/
public class SchedulerComputationExample {
public static void main(String[] args) {
Observable<Integer> observable1 = Observable.fromIterable(SampleData.seoulPM10List);
Observable<Integer> observable2 = Observable.fromIterable(SampleData.busanPM10List);
Observable<Integer> observable3 = Observable.fromIterable(SampleData.incheonPM10List);
Observable<Integer> observable4 = Observable.range(1, 24);
Observable source = Observable.zip(observable1, observable2, observable3, observable4,
(data1, data2, data3, hour) -> hour + "시: " + Collections.max(Arrays.asList(data1, data2, data3)));
source.subscribeOn(Schedulers.computation())
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
source.subscribeOn(Schedulers.computation())
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(500L);
}
}
/*
onNext() | RxComputationThreadPool-2 | 01:32:41.941 | 1시: 55
onNext() | RxComputationThreadPool-1 | 01:32:41.941 | 1시: 55
onNext() | RxComputationThreadPool-2 | 01:32:41.944 | 2시: 40
onNext() | RxComputationThreadPool-1 | 01:32:41.944 | 2시: 40
onNext() | RxComputationThreadPool-2 | 01:32:41.944 | 3시: 73
onNext() | RxComputationThreadPool-1 | 01:32:41.944 | 3시: 73
onNext() | RxComputationThreadPool-2 | 01:32:41.944 | 4시: 70
onNext() | RxComputationThreadPool-1 | 01:32:41.944 | 4시: 70
...
onNext() | RxComputationThreadPool-2 | 01:32:41.947 | 22시: 125
onNext() | RxComputationThreadPool-1 | 01:32:41.947 | 21시: 100
onNext() | RxComputationThreadPool-2 | 01:32:41.947 | 23시: 135
onNext() | RxComputationThreadPool-1 | 01:32:41.947 | 22시: 125
onNext() | RxComputationThreadPool-2 | 01:32:41.947 | 24시: 125
onNext() | RxComputationThreadPool-1 | 01:32:41.947 | 23시: 135
onNext() | RxComputationThreadPool-1 | 01:32:41.947 | 24시: 125
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.itvillage.section01;
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
/**
* Schedulers.newThread()를 이용하여 구독할때 마다 새로운 쓰레드를 생성하는 예제
* - 쓰레드 생성 비용이 들고, 재사용 되지 않으므로 권장 되지 않는 방법이다.
*/
public class SchedulerNewThreadExample {
public static void main(String[] args) {
Observable<String> observable = Observable.just("1", "2", "3", "4", "5");
observable.subscribeOn(Schedulers.newThread())
.map(data -> "## " + data + " ##")
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
observable.subscribeOn(Schedulers.newThread())
.map(data -> "$$ " + data + " $$")
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(300L);
}
}
/*
onNext() | RxNewThreadScheduler-1 | 01:39:11.471 | ## 1 ##
onNext() | RxNewThreadScheduler-2 | 01:39:11.471 | $$ 1 $$
onNext() | RxNewThreadScheduler-1 | 01:39:11.474 | ## 2 ##
onNext() | RxNewThreadScheduler-2 | 01:39:11.474 | $$ 2 $$
onNext() | RxNewThreadScheduler-1 | 01:39:11.474 | ## 3 ##
onNext() | RxNewThreadScheduler-2 | 01:39:11.474 | $$ 3 $$
onNext() | RxNewThreadScheduler-1 | 01:39:11.474 | ## 4 ##
onNext() | RxNewThreadScheduler-2 | 01:39:11.474 | $$ 4 $$
onNext() | RxNewThreadScheduler-1 | 01:39:11.474 | ## 5 ##
onNext() | RxNewThreadScheduler-2 | 01:39:11.474 | $$ 5 $$
*/
이 글은 inflearn에 있는 Kevin의 알기 쉬운 RxJava 2부를 공부하고 작성한 글입니다.
강의영상 링크