Posts RxJava - 스케쥴러란? + 스케쥴러의 종류(1)
Post
Cancel

RxJava - 스케쥴러란? + 스케쥴러의 종류(1)

스케쥴러(Scheduler)란?

  • RxJava에서의 스케쥴러는 RxJava 비동기 프로그래밍을 위한 쓰레드(Thread) 관리자이다.
  • 즉, 스케쥴러를 이용해서 어떤 쓰레드에서 무엇을 처리할 지에 대해서 제어할 수 있다.
  • 스케쥴러를 이용해서 데이터를 통지하는 쪽과 데이터를 처리하는 쪽 쓰레드를 별도로 지정해서 분리할 수 있다.
  • RxJava의 스케쥴러를 통해 쓰레드를 위한 코드의 간결성 및 쓰레드 관리의 복잡함을 줄일 수 있다.
  • RxJava에서 스케쥴러를 지정하기 위해서 subscribeOn(), observeOn() 유틸리티 연산자를 사용한다.
  • 생산자쪽의 데이터 흐름을 제어하기 위해서는 subscribeOn() 연산자를 사용한다.
  • 소비자쪽에서 전달받은 데이터 처리를 제어하기 위해서는 observeOn() 연산자를 사용한다.
  • subscribeOn(), observeOn() 연산자는 각각 파라미터로 Scheduler를 지정해야 한다.

스케쥴러(Scheduler)의 종류 (1)

스케쥴러설명
Schdulers.io()- I/O 처리 작업을 할 때 사용하는 스케쥴러
- 네트워크 요청 처리, 각종 입/출력 작업, 데이터베이스 쿼리 등에 사용
- 쓰레드 풀에서 쓰레드를 가져오거나 가져올 쓰레드가 없으면 새로운 쓰레드를 생성한다.
Schdulers.computation()- 논리적인 연산 처리 시, 사용하는 스케쥴러
- CPU 코어의 물리적 쓰레드 수를 넘지 않는 범위에서 쓰레들르 생성한다.
- 대기 시간 없이 빠르게 계산 작업을 수행하기위해 사용한다.
Schdulers.newThread()- 요청시마다 매번 새로운 쓰레드를 생성한다.
- 매번 생성되면 쓰레드 비용도 많이 들고, 재사용도 되지 않는다.

예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
 * Scheduler.io()를 이용한 파일 입/출력 예제
 * subscribeOn() 만 지정하면 데이터 통지 및 처리를 모두 RxCachedThreadScheduler 쓰레드에서 실행한다.
 */
public class SchedulerIOExample01 {
    public static void main(String[] args) {
        File[] files = new File("src/main/java/com/itvillage/").listFiles();

        Observable.fromArray(files)
                .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data.getName()))
                .filter(data -> data.isDirectory())
                .map(dir -> dir.getName())
                .subscribeOn(Schedulers.io())
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(1000L);
    }
}
/*
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.277 | chapter05
onNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter05
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter02
onNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter02
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter03
onNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter03
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter04
onNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | chapter04
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | utils
onNext() | RxCachedThreadScheduler-1 | 23:49:45.280 | utils
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | common
onNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | common
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | section01
onNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | section01
doOnNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | section00
onNext() | RxCachedThreadScheduler-1 | 23:49:45.281 | section00
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * Scheduler.io()를 이용한 파일 입/출력 예제
 * subscribeOn(), observeOn()을 모두 지정하면 데이터 통지와 데이터 처리를 개별 쓰레드에서 진행한다.
 */
public class SchedulerIOExample02 {
    public static void main(String[] args) {
        File[] files = new File("src/main/java/com/itvillage/").listFiles();

        Observable.fromArray(files)
                .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data.getName()))
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.computation())
                .filter(data -> data.isDirectory())
                .map(dir -> dir.getName())
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(1000L);
    }
}
/*
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.372 | chapter05
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.378 | chapter02
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.379 | chapter03
onNext() | RxComputationThreadPool-1 | 23:56:57.379 | chapter05
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.379 | chapter04
onNext() | RxComputationThreadPool-1 | 23:56:57.379 | chapter02
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.379 | utils
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.379 | common
onNext() | RxComputationThreadPool-1 | 23:56:57.379 | chapter03
onNext() | RxComputationThreadPool-1 | 23:56:57.380 | chapter04
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.380 | section01
onNext() | RxComputationThreadPool-1 | 23:56:57.380 | utils
onNext() | RxComputationThreadPool-1 | 23:56:57.380 | common
onNext() | RxComputationThreadPool-1 | 23:56:57.381 | section01
doOnNext() | RxCachedThreadScheduler-1 | 23:56:57.382 | section00
onNext() | RxComputationThreadPool-1 | 23:56:57.384 | section00
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
 * Scheduler.io()를 이용한 파일 입/출력 예제
 * observeOn()을 여러개 지정하면 지정한 다음의 데이터 처리를 각각 개별 쓰레드에서 진행한다.
 */
public class SchedulerIOExample03 {
    public static void main(String[] args) {
        File[] files = new File("src/main/java/com/itvillage/").listFiles();

        Observable.fromArray(files)
                .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# 데이터 통지"))
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.computation())
                .filter(data -> data.isDirectory())
                .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# filter() 거침"))
                .observeOn(Schedulers.computation())
                .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# map() 거침"))
                .map(dir -> dir.getName())
                .observeOn(Schedulers.computation())
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(1000L);
    }
}
/*
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.078 | # 데이터 통지
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.083 | # 데이터 통지
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.083 | # 데이터 통지
doOnNext() | RxComputationThreadPool-1 | 00:04:29.083 | # filter() 거침
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.083 | # 데이터 통지
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.083 | # 데이터 통지
doOnNext() | RxComputationThreadPool-1 | 00:04:29.083 | # filter() 거침
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.083 | # 데이터 통지
doOnNext() | RxComputationThreadPool-1 | 00:04:29.084 | # filter() 거침
doOnNext() | RxComputationThreadPool-2 | 00:04:29.083 | # map() 거침
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.084 | # 데이터 통지
doOnNext() | RxComputationThreadPool-1 | 00:04:29.084 | # filter() 거침
doOnNext() | RxCachedThreadScheduler-1 | 00:04:29.084 | # 데이터 통지
doOnNext() | RxComputationThreadPool-1 | 00:04:29.084 | # filter() 거침
doOnNext() | RxComputationThreadPool-2 | 00:04:29.084 | # map() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.084 | chapter05
onNext() | RxComputationThreadPool-3 | 00:04:29.085 | chapter02
doOnNext() | RxComputationThreadPool-1 | 00:04:29.084 | # filter() 거침
doOnNext() | RxComputationThreadPool-2 | 00:04:29.085 | # map() 거침
doOnNext() | RxComputationThreadPool-1 | 00:04:29.085 | # filter() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.085 | chapter03
doOnNext() | RxComputationThreadPool-1 | 00:04:29.085 | # filter() 거침
doOnNext() | RxComputationThreadPool-2 | 00:04:29.085 | # map() 거침
doOnNext() | RxComputationThreadPool-2 | 00:04:29.086 | # map() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.086 | chapter04
doOnNext() | RxComputationThreadPool-2 | 00:04:29.086 | # map() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.086 | utils
doOnNext() | RxComputationThreadPool-2 | 00:04:29.086 | # map() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.086 | common
doOnNext() | RxComputationThreadPool-2 | 00:04:29.087 | # map() 거침
onNext() | RxComputationThreadPool-3 | 00:04:29.087 | section01
onNext() | RxComputationThreadPool-3 | 00:04:29.087 | section00
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
 * Schedulers.computation() 을 이용해서 계산 작업을 처리하는 예제
 */
public class SchedulerComputationExample {
    public static void main(String[] args) {
        Observable<Integer> observable1 = Observable.fromIterable(SampleData.seoulPM10List);
        Observable<Integer> observable2 = Observable.fromIterable(SampleData.busanPM10List);
        Observable<Integer> observable3 = Observable.fromIterable(SampleData.incheonPM10List);

        Observable<Integer> observable4 = Observable.range(1, 24);

        Observable source = Observable.zip(observable1, observable2, observable3, observable4,
                (data1, data2, data3, hour) -> hour + "시: " + Collections.max(Arrays.asList(data1, data2, data3)));

        source.subscribeOn(Schedulers.computation())
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        source.subscribeOn(Schedulers.computation())
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(500L);
    }
}
/*
onNext() | RxComputationThreadPool-2 | 01:32:41.941 | 1시: 55
onNext() | RxComputationThreadPool-1 | 01:32:41.941 | 1시: 55
onNext() | RxComputationThreadPool-2 | 01:32:41.944 | 2시: 40
onNext() | RxComputationThreadPool-1 | 01:32:41.944 | 2시: 40
onNext() | RxComputationThreadPool-2 | 01:32:41.944 | 3시: 73
onNext() | RxComputationThreadPool-1 | 01:32:41.944 | 3시: 73
onNext() | RxComputationThreadPool-2 | 01:32:41.944 | 4시: 70
onNext() | RxComputationThreadPool-1 | 01:32:41.944 | 4시: 70
...
onNext() | RxComputationThreadPool-2 | 01:32:41.947 | 22시: 125
onNext() | RxComputationThreadPool-1 | 01:32:41.947 | 21시: 100
onNext() | RxComputationThreadPool-2 | 01:32:41.947 | 23시: 135
onNext() | RxComputationThreadPool-1 | 01:32:41.947 | 22시: 125
onNext() | RxComputationThreadPool-2 | 01:32:41.947 | 24시: 125
onNext() | RxComputationThreadPool-1 | 01:32:41.947 | 23시: 135
onNext() | RxComputationThreadPool-1 | 01:32:41.947 | 24시: 125
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.itvillage.section01;

import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

/**
 * Schedulers.newThread()를 이용하여 구독할때 마다 새로운 쓰레드를 생성하는 예제
 * - 쓰레드 생성 비용이 들고, 재사용 되지 않으므로 권장 되지 않는 방법이다.
 */
public class SchedulerNewThreadExample {
    public static void main(String[] args) {
        Observable<String> observable = Observable.just("1", "2", "3", "4", "5");

        observable.subscribeOn(Schedulers.newThread())
                .map(data -> "## " + data + " ##")
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        observable.subscribeOn(Schedulers.newThread())
                .map(data -> "$$ " + data + " $$")
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(300L);
    }
}
/*
onNext() | RxNewThreadScheduler-1 | 01:39:11.471 | ## 1 ##
onNext() | RxNewThreadScheduler-2 | 01:39:11.471 | $$ 1 $$
onNext() | RxNewThreadScheduler-1 | 01:39:11.474 | ## 2 ##
onNext() | RxNewThreadScheduler-2 | 01:39:11.474 | $$ 2 $$
onNext() | RxNewThreadScheduler-1 | 01:39:11.474 | ## 3 ##
onNext() | RxNewThreadScheduler-2 | 01:39:11.474 | $$ 3 $$
onNext() | RxNewThreadScheduler-1 | 01:39:11.474 | ## 4 ##
onNext() | RxNewThreadScheduler-2 | 01:39:11.474 | $$ 4 $$
onNext() | RxNewThreadScheduler-1 | 01:39:11.474 | ## 5 ##
onNext() | RxNewThreadScheduler-2 | 01:39:11.474 | $$ 5 $$
*/

이 글은 inflearn에 있는 Kevin의 알기 쉬운 RxJava 2부를 공부하고 작성한 글입니다.
강의영상 링크

This post is licensed under CC BY 4.0 by the author.

RxJava - 물리적인 쓰레드와 논리적인 쓰레드의 이해

RxJava - 스케쥴러의 종류(2)

Comments powered by Disqus.