Posts RxJava - 에러 처리 연산자
Post
Cancel

RxJava - 에러 처리 연산자

RxJava try catch로 에러 처리 불가능

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CanNotUserTryCatchExample {
    public static void main(String[] args) {
        try {
            Observable.just(2)
                    .map(num -> num / 0)
                    .subscribe(System.out::print);
        } catch (Exception e) {
            Logger.log(LogType.PRINT, "# 에러 처리가 필요: " + e.getCause());
        }
    }
}
/*
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call.
*/

subscribe의 onError()에서 에러 처리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
 * 데이터를 처리하다가 예외가 발생할 경우 일반적인 예제
 * 0으로 나누는 부분에서 예외가 발생한다.
 *
 * RxJava에서 에러를 처리하는 일반적인 방식의 예제
 * - RxJava에서는 에러 발생 시, Observable을 생성한 함수에서 onError()을 호출하고,
 * - subScribe의 onError()에서 해당 error를 받아서 처리하는 구조를 가진다.
 */
public class GeneralErrorHandleExample {
    public static void main(String[] args) {
        Observable.just(5)
                .flatMap(num -> Observable
                        .interval(200L, TimeUnit.MILLISECONDS)
                        .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
                        .take(5)
                        .map(i -> num / i))
                .subscribe(
                        data  -> Logger.log(LogType.ON_NEXT, data),
                        error -> Logger.log(LogType.ON_ERROR, error),
                        () -> Logger.log(LogType.ON_COMPLETE)
                );

        TimeUtil.sleep(1000L);
    }
}
/*
doOnNext() | RxComputationThreadPool-1 | 02:29:39.267 | 0
onERROR() | RxComputationThreadPool-1 | 02:29:39.272 | java.lang.ArithmeticException: / by zero
*/

에러 처리 연산자

onErrorReturn

  • 에러가 발생했을 떄 에러를 의미하는 데이터로 대체할 수 있다.
  • onErrorReturn()을 호출하면 onError 이벤트는 발생하지 않는다.

onErrorReturn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * onErrorReturn()를 사용해 예외 발생 시, 우리가 원하는 값을 전달하는 예제
 * - 예외가 발생될 가능성이 있는 부분에 대해서 사전에 처리를 선언할 수 있다.
 * - 소비자가 예상되는 예외를 모두 사전에 알고 처리하긴 힘들기때문에 생산자쪽에서 예외 처리를 사전에 해두고 소비자는 선언된
 * 예외 상황을 보고 그에 맞는 적절한 처리를 할 수 있다.
 */
public class ObservableOnErrorReturnExample {
    public static void main(String[] args) {
        Observable.just(5)
                .flatMap(num -> Observable
                        .interval(200L, TimeUnit.MILLISECONDS)
                        .take(5)
                        .map(i -> num / i)
                        .onErrorReturn(exception -> {
                            if(exception instanceof ArithmeticException)
                                Logger.log(LogType.PRINT, "계산 처리 에러 발생: " + exception.getMessage());

                            return -1L;
                        })
                )
                .subscribe(
                        data -> {
                            if(data < 0)
                                Logger.log(LogType.PRINT, "# 예외를 알리는 데이터: " + data);
                            else
                                Logger.log(LogType.ON_NEXT, data);
                        },
                        error -> Logger.log(LogType.ON_ERROR, error),
                        () -> Logger.log(LogType.ON_COMPLETE)
                );

        TimeUtil.sleep(1000L);
    }
}
/*
print() | RxComputationThreadPool-1 | 02:57:04.745 | 계산 처리 에러 발생: / by zero
print() | RxComputationThreadPool-1 | 02:57:04.749 | # 예외를 알리는 데이터: -1
onComplete() | RxComputationThreadPool-1 | 02:57:04.749
*/

onErrorResumeNext

  • 에러가 발생했을 떄 에러를 의미하는 Observable로 대체할 수 있다.
  • Observable로 대체할 수 있으므로 데이터 교체와 더불어 에러 처리를 위한 추가 작업을 할 수 있다.

onErrorResumeNext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * onErrorResumeNext를 이용해서 에러 발생시, 다른 Observable로 대체하는 예제.
 */
public class ObservableOnErrorResumeNextExample {
    public static void main(String[] args) {
        Observable.just(5L)
                .flatMap(num -> Observable
                        .interval(200L, TimeUnit.MILLISECONDS)
                        .take(5)
                        .map(i -> num / i)
                        .onErrorResumeNext(throwable -> {
                            Logger.log(LogType.PRINT, "# 운영자에게 이메일 발송 : " + throwable.getMessage());
                            return Observable.interval(200L, TimeUnit.MILLISECONDS)
                                    .take(5)
                                    .skip(1)
                                    .map(i -> num / i);
                        })
                )
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(2000L);
    }
}
/*
print() | RxComputationThreadPool-1 | 04:15:13.219 | # 운영자에게 이메일 발송 : / by zero
onNext() | RxComputationThreadPool-2 | 04:15:13.627 | 5
onNext() | RxComputationThreadPool-2 | 04:15:13.827 | 2
onNext() | RxComputationThreadPool-2 | 04:15:14.027 | 1
onNext() | RxComputationThreadPool-2 | 04:15:14.224 | 1
*/

retry

  • 데이터 통지 중 에러가 발생했을 때, 데이터 통지를 재시도 한다.
  • 즉, onError 이벤트가 발생하면 subscribe()를 다시 호출하여 재구독한다.
  • 에러가 발생한 시점에 통지에 실패한 데이터만 다시 통지되는 것이 아니라 처음부터 다시 통지된다.

retry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ObservableRetryExample01 {
    public static void main(String[] args) {
        Observable.just(5)
                .flatMap(
                        num -> Observable
                                .interval(200L, TimeUnit.MILLISECONDS)
                                .map(i -> {
                                    long result;
                                    try {
                                        result = num / i;
                                    } catch (ArithmeticException ex) {
                                        Logger.log(LogType.PRINT, "error: " + ex.getMessage());
                                        throw ex;
                                    }
                                    return result;
                                })
                                .retry(5)
                                .onErrorReturn(throwable -> -1L)
                ).subscribe(
                data -> Logger.log(LogType.ON_NEXT, data),
                error -> Logger.log(LogType.ON_ERROR, error),
                () -> Logger.log(LogType.ON_COMPLETE)
        );

        TimeUtil.sleep(5000L);
    }
}
/*
print() | RxComputationThreadPool-1 | 04:25:54.272 | error: / by zero
print() | RxComputationThreadPool-2 | 04:25:54.484 | error: / by zero
print() | RxComputationThreadPool-3 | 04:25:54.689 | error: / by zero
print() | RxComputationThreadPool-4 | 04:25:54.893 | error: / by zero
print() | RxComputationThreadPool-5 | 04:25:55.096 | error: / by zero
print() | RxComputationThreadPool-6 | 04:25:55.301 | error: / by zero
onNext() | RxComputationThreadPool-6 | 04:25:55.302 | -1
onComplete() | RxComputationThreadPool-6 | 04:25:55.303
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
 * 에러 발생시 재시도를 즉시 하지 않고, 지연 시간을 주고 재시도를 하는 예제
 */
public class ObservableRetryExample02 {
    private final static int RETRY_MAX = 5;
    public static void main(String[] args) {
        Observable.just(5)
                .flatMap(
                        num -> Observable
                                .interval(200L, TimeUnit.MILLISECONDS)
                                .map(i -> {
                                    long result;
                                    try{
                                        result = num / i;
                                    }catch(ArithmeticException ex){
                                        Logger.log(LogType.PRINT, "error: " + ex.getMessage());
                                        throw ex;
                                    }
                                    return result;
                                })
                                .retry((retryCount, ex) -> {
                                    Logger.log(LogType.PRINT, "# 재시도 횟수: " + retryCount);
                                    TimeUtil.sleep(1000L);
                                    return retryCount < RETRY_MAX ? true : false;
                                })
                                .onErrorReturn(throwable -> -1L)

                ).subscribe(
                data -> Logger.log(LogType.ON_NEXT, data),
                error -> Logger.log(LogType.ON_ERROR, error),
                () -> Logger.log(LogType.ON_COMPLETE)
        );


        TimeUtil.sleep(6000L);
    }
}
/*
print() | RxComputationThreadPool-1 | 04:28:32.857 | error: / by zero
print() | RxComputationThreadPool-1 | 04:28:32.861 | # 재시도 횟수: 1
print() | RxComputationThreadPool-2 | 04:28:34.070 | error: / by zero
print() | RxComputationThreadPool-2 | 04:28:34.070 | # 재시도 횟수: 2
print() | RxComputationThreadPool-3 | 04:28:35.278 | error: / by zero
print() | RxComputationThreadPool-3 | 04:28:35.279 | # 재시도 횟수: 3
print() | RxComputationThreadPool-4 | 04:28:36.483 | error: / by zero
print() | RxComputationThreadPool-4 | 04:28:36.483 | # 재시도 횟수: 4
print() | RxComputationThreadPool-5 | 04:28:37.688 | error: / by zero
print() | RxComputationThreadPool-5 | 04:28:37.689 | # 재시도 횟수: 5
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
 * 에러 발생 시, 데이터 통지를 처음부터 다시 하는것을 보여주는 예제
 */
public class ObservableRetryExample03 {
    private final static int RETRY_MAX = 5;

    public static void main(String[] args) {
        Observable.just(10, 12, 15, 16)
                .zipWith(Observable.just(1, 2, 0, 4), (a, b) -> {
                    int result;
                    try {
                        result = a / b;
                    } catch (ArithmeticException ex) {
                        Logger.log(LogType.PRINT, "error: " + ex.getMessage());
                        throw ex;
                    }
                    return result;
                })
                .retry(3)
                .onErrorReturn(throwable -> -1)
                .subscribe(
                        data -> Logger.log(LogType.ON_NEXT, data),
                        error -> Logger.log(LogType.ON_ERROR, error),
                        () -> Logger.log(LogType.ON_COMPLETE)
                );

        TimeUtil.sleep(5000L);
    }
}
/*
onNext() | main | 04:33:58.786 | 10
onNext() | main | 04:33:58.789 | 6
print() | main | 04:33:58.789 | error: / by zero
onNext() | main | 04:33:58.790 | 10
onNext() | main | 04:33:58.790 | 6
print() | main | 04:33:58.790 | error: / by zero
onNext() | main | 04:33:58.790 | 10
onNext() | main | 04:33:58.791 | 6
print() | main | 04:33:58.791 | error: / by zero
onNext() | main | 04:33:58.791 | 10
onNext() | main | 04:33:58.791 | 6
print() | main | 04:33:58.791 | error: / by zero
onNext() | main | 04:33:58.791 | -1
onComplete() | main | 04:33:58.792
*/

이 글은 inflearn에 있는 Kevin의 알기 쉬운 RxJava 1부를 공부하고 작성한 글입니다.
강의영상 링크

This post is licensed under CC BY 4.0 by the author.

RxJava - 데이터 결합 연산자

RxJava - 유틸리티 연산자

Comments powered by Disqus.