RxJava try catch로 에러 처리 불가능
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CanNotUserTryCatchExample {
public static void main(String[] args) {
try {
Observable.just(2)
.map(num -> num / 0)
.subscribe(System.out::print);
} catch (Exception e) {
Logger.log(LogType.PRINT, "# 에러 처리가 필요: " + e.getCause());
}
}
}
/*
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call.
*/
subscribe의 onError()에서 에러 처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 데이터를 처리하다가 예외가 발생할 경우 일반적인 예제
* 0으로 나누는 부분에서 예외가 발생한다.
*
* RxJava에서 에러를 처리하는 일반적인 방식의 예제
* - RxJava에서는 에러 발생 시, Observable을 생성한 함수에서 onError()을 호출하고,
* - subScribe의 onError()에서 해당 error를 받아서 처리하는 구조를 가진다.
*/
public class GeneralErrorHandleExample {
public static void main(String[] args) {
Observable.just(5)
.flatMap(num -> Observable
.interval(200L, TimeUnit.MILLISECONDS)
.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
.take(5)
.map(i -> num / i))
.subscribe(
data -> Logger.log(LogType.ON_NEXT, data),
error -> Logger.log(LogType.ON_ERROR, error),
() -> Logger.log(LogType.ON_COMPLETE)
);
TimeUtil.sleep(1000L);
}
}
/*
doOnNext() | RxComputationThreadPool-1 | 02:29:39.267 | 0
onERROR() | RxComputationThreadPool-1 | 02:29:39.272 | java.lang.ArithmeticException: / by zero
*/
에러 처리 연산자
onErrorReturn
- 에러가 발생했을 떄 에러를 의미하는 데이터로 대체할 수 있다.
- onErrorReturn()을 호출하면 onError 이벤트는 발생하지 않는다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* onErrorReturn()를 사용해 예외 발생 시, 우리가 원하는 값을 전달하는 예제
* - 예외가 발생될 가능성이 있는 부분에 대해서 사전에 처리를 선언할 수 있다.
* - 소비자가 예상되는 예외를 모두 사전에 알고 처리하긴 힘들기때문에 생산자쪽에서 예외 처리를 사전에 해두고 소비자는 선언된
* 예외 상황을 보고 그에 맞는 적절한 처리를 할 수 있다.
*/
public class ObservableOnErrorReturnExample {
public static void main(String[] args) {
Observable.just(5)
.flatMap(num -> Observable
.interval(200L, TimeUnit.MILLISECONDS)
.take(5)
.map(i -> num / i)
.onErrorReturn(exception -> {
if(exception instanceof ArithmeticException)
Logger.log(LogType.PRINT, "계산 처리 에러 발생: " + exception.getMessage());
return -1L;
})
)
.subscribe(
data -> {
if(data < 0)
Logger.log(LogType.PRINT, "# 예외를 알리는 데이터: " + data);
else
Logger.log(LogType.ON_NEXT, data);
},
error -> Logger.log(LogType.ON_ERROR, error),
() -> Logger.log(LogType.ON_COMPLETE)
);
TimeUtil.sleep(1000L);
}
}
/*
print() | RxComputationThreadPool-1 | 02:57:04.745 | 계산 처리 에러 발생: / by zero
print() | RxComputationThreadPool-1 | 02:57:04.749 | # 예외를 알리는 데이터: -1
onComplete() | RxComputationThreadPool-1 | 02:57:04.749
*/
onErrorResumeNext
- 에러가 발생했을 떄 에러를 의미하는 Observable로 대체할 수 있다.
- Observable로 대체할 수 있으므로 데이터 교체와 더불어 에러 처리를 위한 추가 작업을 할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* onErrorResumeNext를 이용해서 에러 발생시, 다른 Observable로 대체하는 예제.
*/
public class ObservableOnErrorResumeNextExample {
public static void main(String[] args) {
Observable.just(5L)
.flatMap(num -> Observable
.interval(200L, TimeUnit.MILLISECONDS)
.take(5)
.map(i -> num / i)
.onErrorResumeNext(throwable -> {
Logger.log(LogType.PRINT, "# 운영자에게 이메일 발송 : " + throwable.getMessage());
return Observable.interval(200L, TimeUnit.MILLISECONDS)
.take(5)
.skip(1)
.map(i -> num / i);
})
)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(2000L);
}
}
/*
print() | RxComputationThreadPool-1 | 04:15:13.219 | # 운영자에게 이메일 발송 : / by zero
onNext() | RxComputationThreadPool-2 | 04:15:13.627 | 5
onNext() | RxComputationThreadPool-2 | 04:15:13.827 | 2
onNext() | RxComputationThreadPool-2 | 04:15:14.027 | 1
onNext() | RxComputationThreadPool-2 | 04:15:14.224 | 1
*/
retry
- 데이터 통지 중 에러가 발생했을 때, 데이터 통지를 재시도 한다.
- 즉, onError 이벤트가 발생하면 subscribe()를 다시 호출하여 재구독한다.
- 에러가 발생한 시점에 통지에 실패한 데이터만 다시 통지되는 것이 아니라 처음부터 다시 통지된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ObservableRetryExample01 {
public static void main(String[] args) {
Observable.just(5)
.flatMap(
num -> Observable
.interval(200L, TimeUnit.MILLISECONDS)
.map(i -> {
long result;
try {
result = num / i;
} catch (ArithmeticException ex) {
Logger.log(LogType.PRINT, "error: " + ex.getMessage());
throw ex;
}
return result;
})
.retry(5)
.onErrorReturn(throwable -> -1L)
).subscribe(
data -> Logger.log(LogType.ON_NEXT, data),
error -> Logger.log(LogType.ON_ERROR, error),
() -> Logger.log(LogType.ON_COMPLETE)
);
TimeUtil.sleep(5000L);
}
}
/*
print() | RxComputationThreadPool-1 | 04:25:54.272 | error: / by zero
print() | RxComputationThreadPool-2 | 04:25:54.484 | error: / by zero
print() | RxComputationThreadPool-3 | 04:25:54.689 | error: / by zero
print() | RxComputationThreadPool-4 | 04:25:54.893 | error: / by zero
print() | RxComputationThreadPool-5 | 04:25:55.096 | error: / by zero
print() | RxComputationThreadPool-6 | 04:25:55.301 | error: / by zero
onNext() | RxComputationThreadPool-6 | 04:25:55.302 | -1
onComplete() | RxComputationThreadPool-6 | 04:25:55.303
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 에러 발생시 재시도를 즉시 하지 않고, 지연 시간을 주고 재시도를 하는 예제
*/
public class ObservableRetryExample02 {
private final static int RETRY_MAX = 5;
public static void main(String[] args) {
Observable.just(5)
.flatMap(
num -> Observable
.interval(200L, TimeUnit.MILLISECONDS)
.map(i -> {
long result;
try{
result = num / i;
}catch(ArithmeticException ex){
Logger.log(LogType.PRINT, "error: " + ex.getMessage());
throw ex;
}
return result;
})
.retry((retryCount, ex) -> {
Logger.log(LogType.PRINT, "# 재시도 횟수: " + retryCount);
TimeUtil.sleep(1000L);
return retryCount < RETRY_MAX ? true : false;
})
.onErrorReturn(throwable -> -1L)
).subscribe(
data -> Logger.log(LogType.ON_NEXT, data),
error -> Logger.log(LogType.ON_ERROR, error),
() -> Logger.log(LogType.ON_COMPLETE)
);
TimeUtil.sleep(6000L);
}
}
/*
print() | RxComputationThreadPool-1 | 04:28:32.857 | error: / by zero
print() | RxComputationThreadPool-1 | 04:28:32.861 | # 재시도 횟수: 1
print() | RxComputationThreadPool-2 | 04:28:34.070 | error: / by zero
print() | RxComputationThreadPool-2 | 04:28:34.070 | # 재시도 횟수: 2
print() | RxComputationThreadPool-3 | 04:28:35.278 | error: / by zero
print() | RxComputationThreadPool-3 | 04:28:35.279 | # 재시도 횟수: 3
print() | RxComputationThreadPool-4 | 04:28:36.483 | error: / by zero
print() | RxComputationThreadPool-4 | 04:28:36.483 | # 재시도 횟수: 4
print() | RxComputationThreadPool-5 | 04:28:37.688 | error: / by zero
print() | RxComputationThreadPool-5 | 04:28:37.689 | # 재시도 횟수: 5
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 에러 발생 시, 데이터 통지를 처음부터 다시 하는것을 보여주는 예제
*/
public class ObservableRetryExample03 {
private final static int RETRY_MAX = 5;
public static void main(String[] args) {
Observable.just(10, 12, 15, 16)
.zipWith(Observable.just(1, 2, 0, 4), (a, b) -> {
int result;
try {
result = a / b;
} catch (ArithmeticException ex) {
Logger.log(LogType.PRINT, "error: " + ex.getMessage());
throw ex;
}
return result;
})
.retry(3)
.onErrorReturn(throwable -> -1)
.subscribe(
data -> Logger.log(LogType.ON_NEXT, data),
error -> Logger.log(LogType.ON_ERROR, error),
() -> Logger.log(LogType.ON_COMPLETE)
);
TimeUtil.sleep(5000L);
}
}
/*
onNext() | main | 04:33:58.786 | 10
onNext() | main | 04:33:58.789 | 6
print() | main | 04:33:58.789 | error: / by zero
onNext() | main | 04:33:58.790 | 10
onNext() | main | 04:33:58.790 | 6
print() | main | 04:33:58.790 | error: / by zero
onNext() | main | 04:33:58.790 | 10
onNext() | main | 04:33:58.791 | 6
print() | main | 04:33:58.791 | error: / by zero
onNext() | main | 04:33:58.791 | 10
onNext() | main | 04:33:58.791 | 6
print() | main | 04:33:58.791 | error: / by zero
onNext() | main | 04:33:58.791 | -1
onComplete() | main | 04:33:58.792
*/
이 글은 inflearn에 있는 Kevin의 알기 쉬운 RxJava 1부를 공부하고 작성한 글입니다.
강의영상 링크