Posts RxJava - 유틸리티 연산자
Post
Cancel

RxJava - 유틸리티 연산자

유틸리티 연산자

delay 첫번째 유형

  • 생산자가 데이터를 생성 및 통지를 하지만 설정한 시간만큼 소비자쪽으로의 데이터 전달을 지연시킨다.

delay

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
 * 통지된 데이터를 소비자 쪽에서 전달 받는 시간을 일정 시간동안 지연 시키는 예제
 */
public class ObservableDelayExample01 {
    public static void main(String[] args) {
        Logger.log(LogType.PRINT, "# 실행 시작 시간: " + TimeUtil.getCurrentTimeFormatted());

        Observable.just(1, 3, 4, 6)
                .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
                .delay(2000L, TimeUnit.MILLISECONDS)
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(2500L);
    }
}
/*
print() | main | 08:09:58.989 | # 실행 시작 시간: 08:09:58.986
doOnNext() | main | 08:09:59.135 | 1
doOnNext() | main | 08:09:59.143 | 3
doOnNext() | main | 08:09:59.143 | 4
doOnNext() | main | 08:09:59.143 | 6
onNext() | RxComputationThreadPool-1 | 08:10:01.146 | 1
onNext() | RxComputationThreadPool-1 | 08:10:01.146 | 3
onNext() | RxComputationThreadPool-1 | 08:10:01.147 | 4
onNext() | RxComputationThreadPool-1 | 08:10:01.147 | 6
*/

delay 두번째 유형

  • 파라미터로 생성되는 Observable이 데이터를 통지할떄까지 각각의 원본 데이터의 통지를 지연시킨다.

delay second

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
 * 통지되는 데이터 각각에 지연 시간을 적용하는 예제
 */
public class ObservableDelayExample02 {
    public static void main(String[] args) {
        Observable.just(1, 3, 5, 7)
                .delay(item -> {
                    TimeUtil.sleep(1000L);
                    return Observable.just(item); // 새로운 Observable의 통지 시점에, 원본 데이터를 통지한다.
                }).subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    }
}
/*
onNext() | main | 08:23:08.550 | 1
onNext() | main | 08:23:09.560 | 3
onNext() | main | 08:23:10.565 | 5
onNext() | main | 08:23:11.569 | 7
*/

delaySubscription

  • 생산자가 데이터의 생성 및 통지 자체를 설정한 시간만큼 지연시킨다.
  • 즉, 소비자가 구독을 해도 구독 시점 자체가 지연된다.

delaySubscription

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

/**
 * 소비자가 구독시, 데이터 생성 및 통지 자체를 지연시키는 예제.
 * 즉, delay()는 소비자가 구독 시, 생성 및 통지는 즉시 하지만 소비자에게 전달하는 시간을 지연시키고,
 * delaySubscription()은 데이터 생성 및 통지 자체를 지연시킨다.
 */
public class ObservableDelaySubscriptionExample {
    public static void main(String[] args) {
        Logger.log(LogType.PRINT, "# 실행 시작 시간: " + TimeUtil.getCurrentTimeFormatted());

        Observable.just(1, 3, 4, 6)
                .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
                .delaySubscription(2000L, TimeUnit.MILLISECONDS)
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(2500L);
    }
}
/*
print() | main | 08:35:36.644 | # 실행 시작 시간: 08:35:36.640
doOnNext() | RxComputationThreadPool-1 | 08:35:38.778 | 1
onNext() | RxComputationThreadPool-1 | 08:35:38.778 | 1
doOnNext() | RxComputationThreadPool-1 | 08:35:38.779 | 3
onNext() | RxComputationThreadPool-1 | 08:35:38.779 | 3
doOnNext() | RxComputationThreadPool-1 | 08:35:38.779 | 4
onNext() | RxComputationThreadPool-1 | 08:35:38.779 | 4
doOnNext() | RxComputationThreadPool-1 | 08:35:38.779 | 6
onNext() | RxComputationThreadPool-1 | 08:35:38.779 | 6
*/

timeout

  • 각각의 데이터 통지 시, 지정된 시간안에 통지가 되지 않으면 에러를 통지한다.
  • 에러 통지 시 전달되는 에러 객체는 TimeoutException 이다.

timeout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * 각 데이터 통지 시, 지정한 시간안에 데이터가 통지 되지 않으면 에러를 발생시키는 예제
 * - 네트워크 연결 지연 등으로 인한 처리를 위해 사용하기 좋은 연산자임.
 */
public class ObservableTimeOutExample {
    public static void main(String[] args) {
        Observable.range(1, 5)
                .map(num -> {
                    long time = 1000L;
                    if (num == 4) {
                        time = 1500L;
                    }
                    TimeUtil.sleep(time);
                    return num;
                })
                .timeout(1200L, TimeUnit.MILLISECONDS)
                .subscribe(
                        data -> Logger.log(LogType.ON_NEXT, data),
                        error -> Logger.log(LogType.ON_ERROR, error)
                );

        TimeUtil.sleep(4000L);
    }
}
/*
onNext() | main | 08:43:43.604 | 1
onNext() | main | 08:43:44.610 | 2
onNext() | main | 08:43:45.614 | 3
onERROR() | RxComputationThreadPool-1 | 08:43:46.826 | java.util.concurrent.TimeoutException: The source did not signal an event for 1200 milliseconds and has been terminated.
*/

timeInterval

  • 각각의 데이터가 통지되는데 걸린 시간을 통지한다.
  • 통지된 데이터와 데이터가 통지되는데 걸린 시간을 소비자쪽에서 모두 처리할 수 있다.

timeInterval

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * timeInterval을 이용해서 데이터가 통지되는데 걸린 시간을 통지하는 예제
 */
public class ObservableTimeIntervalExample {
    public static void main(String[] args) {
        Observable.just(1, 3, 5, 7, 9)
                .delay(item -> {
                    TimeUtil.sleep(NumberUtil.randomRange(100, 1000));
                    return Observable.just(item);
                })
                .timeInterval()
                .subscribe(
                        timed -> Logger.log(LogType.ON_NEXT, "# 통지하는데 걸린 시간: " + timed.time() + "\t# 통지된 데이터: " + timed.value())
                );
    }
}
/*
onNext() | main | 08:54:18.530 | # 통지하는데 걸린 시간: 415	# 통지된 데이터: 1
onNext() | main | 08:54:18.999 | # 통지하는데 걸린 시간: 549	# 통지된 데이터: 3
onNext() | main | 08:54:19.337 | # 통지하는데 걸린 시간: 338	# 통지된 데이터: 5
onNext() | main | 08:54:19.804 | # 통지하는데 걸린 시간: 467	# 통지된 데이터: 7
onNext() | main | 08:54:20.543 | # 통지하는데 걸린 시간: 739	# 통지된 데이터: 9
*/

materialize / dematerialize

  • materialize : 통지된 데이터와 통지된 데이터의 통지 타입 자체를 Notification 객체에 담고 이 Notification 객체를 통지한다. 즉, 통지 데이터와 메타 데이터를 포함해서 통지한다고 볼 수 있다.
  • dematerialize : 통지된 Notification 객체를 원래의 통지 데이터로 변환해서 통지한다.

materialize/dematerialize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ObservableMaterialExample01 {
    public static void main(String[] args) {
        Observable.just(1, 2, 3, 4, 5, 6)
                .materialize()
                .subscribe(notification -> {
                    String notificationType =
                            notification.isOnNext() ? "onNext()" : (notification.isOnError() ? "onError()" : "onComplete()");
                    Logger.log(LogType.PRINT, "notification 타입: " + notificationType);
                    Logger.log(LogType.ON_NEXT, notification.getValue());
                });
    }
}
/*
print() | main | 09:09:59.783 | notification 타입: onNext()
onNext() | main | 09:09:59.788 | 1
print() | main | 09:09:59.789 | notification 타입: onNext()
onNext() | main | 09:09:59.789 | 2
print() | main | 09:09:59.790 | notification 타입: onNext()
onNext() | main | 09:09:59.790 | 3
print() | main | 09:09:59.791 | notification 타입: onNext()
onNext() | main | 09:09:59.791 | 4
print() | main | 09:09:59.791 | notification 타입: onNext()
onNext() | main | 09:09:59.791 | 5
print() | main | 09:09:59.791 | notification 타입: onNext()
onNext() | main | 09:09:59.792 | 6
print() | main | 09:09:59.792 | notification 타입: onComplete()
onNext() | main | 09:09:59.792 | null
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
 * Material/Dematerial 연산자의 실제 활용 예제
 * - 특정 Observable 에서 에러가 발생 할 경우 해당 에러에 대해서 구체적으로 처리할 수 있다.
 */
public class ObservableMaterialExample02 {
    public static void main(String[] args) {
        Observable.concatEager(
                Observable.just(
                        getDBUser().subscribeOn(Schedulers.io()),
                        getAPIUser()
                                .subscribeOn(Schedulers.io())
                                .materialize()
                                .map(notification -> {
                                    if (notification.isOnError()) {
                                        // 관리자에게 에러 발생을 알림
                                        Logger.log(LogType.PRINT, "# API user 에러 발생!");
                                    }
                                    return notification;
                                })
                                .filter(notification -> !notification.isOnError())
                                .dematerialize(notification -> notification)
                )
        ).subscribe(
                data -> Logger.log(LogType.ON_NEXT, data),
                error -> Logger.log(LogType.ON_ERROR, error),
                () -> Logger.log(LogType.ON_COMPLETE)
        );

        TimeUtil.sleep(1000L);
    }

    private static Observable<String> getDBUser() {
        return Observable.fromIterable(Arrays.asList("DB user1", "DB user2", "DB user3", "DB user4", "DB user5"));
    }

    private static Observable<String> getAPIUser() {
        return Observable
                .just("API user1", "API user2", "Not User", "API user4", "API user5")
                .map(user -> {
                    if (user.equals("Not User"))
                        throw new RuntimeException();
                    return user;
                });
    }
}
/*
onNext() | main | 09:29:35.195 | DB user1
print() | RxCachedThreadScheduler-1 | 09:29:35.193 | # API user 에러 발생!
onNext() | main | 09:29:35.199 | DB user2
onNext() | main | 09:29:37.189 | DB user3
onNext() | main | 09:29:37.190 | DB user4
onNext() | main | 09:29:37.190 | DB user5
onNext() | main | 09:29:37.190 | API user1
onNext() | main | 09:29:37.190 | API user2
onComplete() | main | 09:29:37.190
*/

이 글은 inflearn에 있는 Kevin의 알기 쉬운 RxJava 1부를 공부하고 작성한 글입니다.
강의영상 링크

This post is licensed under CC BY 4.0 by the author.

RxJava - 에러 처리 연산자

RxJava - 조건과 불린 연산자

Comments powered by Disqus.