유틸리티 연산자
delay 첫번째 유형
- 생산자가 데이터를 생성 및 통지를 하지만 설정한 시간만큼 소비자쪽으로의 데이터 전달을 지연시킨다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 통지된 데이터를 소비자 쪽에서 전달 받는 시간을 일정 시간동안 지연 시키는 예제
*/
public class ObservableDelayExample01 {
public static void main(String[] args) {
Logger.log(LogType.PRINT, "# 실행 시작 시간: " + TimeUtil.getCurrentTimeFormatted());
Observable.just(1, 3, 4, 6)
.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
.delay(2000L, TimeUnit.MILLISECONDS)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(2500L);
}
}
/*
print() | main | 08:09:58.989 | # 실행 시작 시간: 08:09:58.986
doOnNext() | main | 08:09:59.135 | 1
doOnNext() | main | 08:09:59.143 | 3
doOnNext() | main | 08:09:59.143 | 4
doOnNext() | main | 08:09:59.143 | 6
onNext() | RxComputationThreadPool-1 | 08:10:01.146 | 1
onNext() | RxComputationThreadPool-1 | 08:10:01.146 | 3
onNext() | RxComputationThreadPool-1 | 08:10:01.147 | 4
onNext() | RxComputationThreadPool-1 | 08:10:01.147 | 6
*/
delay 두번째 유형
- 파라미터로 생성되는 Observable이 데이터를 통지할떄까지 각각의 원본 데이터의 통지를 지연시킨다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 통지되는 데이터 각각에 지연 시간을 적용하는 예제
*/
public class ObservableDelayExample02 {
public static void main(String[] args) {
Observable.just(1, 3, 5, 7)
.delay(item -> {
TimeUtil.sleep(1000L);
return Observable.just(item); // 새로운 Observable의 통지 시점에, 원본 데이터를 통지한다.
}).subscribe(data -> Logger.log(LogType.ON_NEXT, data));
}
}
/*
onNext() | main | 08:23:08.550 | 1
onNext() | main | 08:23:09.560 | 3
onNext() | main | 08:23:10.565 | 5
onNext() | main | 08:23:11.569 | 7
*/
delaySubscription
- 생산자가 데이터의 생성 및 통지 자체를 설정한 시간만큼 지연시킨다.
- 즉, 소비자가 구독을 해도 구독 시점 자체가 지연된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
/**
* 소비자가 구독시, 데이터 생성 및 통지 자체를 지연시키는 예제.
* 즉, delay()는 소비자가 구독 시, 생성 및 통지는 즉시 하지만 소비자에게 전달하는 시간을 지연시키고,
* delaySubscription()은 데이터 생성 및 통지 자체를 지연시킨다.
*/
public class ObservableDelaySubscriptionExample {
public static void main(String[] args) {
Logger.log(LogType.PRINT, "# 실행 시작 시간: " + TimeUtil.getCurrentTimeFormatted());
Observable.just(1, 3, 4, 6)
.doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
.delaySubscription(2000L, TimeUnit.MILLISECONDS)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(2500L);
}
}
/*
print() | main | 08:35:36.644 | # 실행 시작 시간: 08:35:36.640
doOnNext() | RxComputationThreadPool-1 | 08:35:38.778 | 1
onNext() | RxComputationThreadPool-1 | 08:35:38.778 | 1
doOnNext() | RxComputationThreadPool-1 | 08:35:38.779 | 3
onNext() | RxComputationThreadPool-1 | 08:35:38.779 | 3
doOnNext() | RxComputationThreadPool-1 | 08:35:38.779 | 4
onNext() | RxComputationThreadPool-1 | 08:35:38.779 | 4
doOnNext() | RxComputationThreadPool-1 | 08:35:38.779 | 6
onNext() | RxComputationThreadPool-1 | 08:35:38.779 | 6
*/
timeout
- 각각의 데이터 통지 시, 지정된 시간안에 통지가 되지 않으면 에러를 통지한다.
- 에러 통지 시 전달되는 에러 객체는 TimeoutException 이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 각 데이터 통지 시, 지정한 시간안에 데이터가 통지 되지 않으면 에러를 발생시키는 예제
* - 네트워크 연결 지연 등으로 인한 처리를 위해 사용하기 좋은 연산자임.
*/
public class ObservableTimeOutExample {
public static void main(String[] args) {
Observable.range(1, 5)
.map(num -> {
long time = 1000L;
if (num == 4) {
time = 1500L;
}
TimeUtil.sleep(time);
return num;
})
.timeout(1200L, TimeUnit.MILLISECONDS)
.subscribe(
data -> Logger.log(LogType.ON_NEXT, data),
error -> Logger.log(LogType.ON_ERROR, error)
);
TimeUtil.sleep(4000L);
}
}
/*
onNext() | main | 08:43:43.604 | 1
onNext() | main | 08:43:44.610 | 2
onNext() | main | 08:43:45.614 | 3
onERROR() | RxComputationThreadPool-1 | 08:43:46.826 | java.util.concurrent.TimeoutException: The source did not signal an event for 1200 milliseconds and has been terminated.
*/
timeInterval
- 각각의 데이터가 통지되는데 걸린 시간을 통지한다.
- 통지된 데이터와 데이터가 통지되는데 걸린 시간을 소비자쪽에서 모두 처리할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* timeInterval을 이용해서 데이터가 통지되는데 걸린 시간을 통지하는 예제
*/
public class ObservableTimeIntervalExample {
public static void main(String[] args) {
Observable.just(1, 3, 5, 7, 9)
.delay(item -> {
TimeUtil.sleep(NumberUtil.randomRange(100, 1000));
return Observable.just(item);
})
.timeInterval()
.subscribe(
timed -> Logger.log(LogType.ON_NEXT, "# 통지하는데 걸린 시간: " + timed.time() + "\t# 통지된 데이터: " + timed.value())
);
}
}
/*
onNext() | main | 08:54:18.530 | # 통지하는데 걸린 시간: 415 # 통지된 데이터: 1
onNext() | main | 08:54:18.999 | # 통지하는데 걸린 시간: 549 # 통지된 데이터: 3
onNext() | main | 08:54:19.337 | # 통지하는데 걸린 시간: 338 # 통지된 데이터: 5
onNext() | main | 08:54:19.804 | # 통지하는데 걸린 시간: 467 # 통지된 데이터: 7
onNext() | main | 08:54:20.543 | # 통지하는데 걸린 시간: 739 # 통지된 데이터: 9
*/
materialize / dematerialize
- materialize : 통지된 데이터와 통지된 데이터의 통지 타입 자체를 Notification 객체에 담고 이 Notification 객체를 통지한다. 즉, 통지 데이터와 메타 데이터를 포함해서 통지한다고 볼 수 있다.
- dematerialize : 통지된 Notification 객체를 원래의 통지 데이터로 변환해서 통지한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ObservableMaterialExample01 {
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6)
.materialize()
.subscribe(notification -> {
String notificationType =
notification.isOnNext() ? "onNext()" : (notification.isOnError() ? "onError()" : "onComplete()");
Logger.log(LogType.PRINT, "notification 타입: " + notificationType);
Logger.log(LogType.ON_NEXT, notification.getValue());
});
}
}
/*
print() | main | 09:09:59.783 | notification 타입: onNext()
onNext() | main | 09:09:59.788 | 1
print() | main | 09:09:59.789 | notification 타입: onNext()
onNext() | main | 09:09:59.789 | 2
print() | main | 09:09:59.790 | notification 타입: onNext()
onNext() | main | 09:09:59.790 | 3
print() | main | 09:09:59.791 | notification 타입: onNext()
onNext() | main | 09:09:59.791 | 4
print() | main | 09:09:59.791 | notification 타입: onNext()
onNext() | main | 09:09:59.791 | 5
print() | main | 09:09:59.791 | notification 타입: onNext()
onNext() | main | 09:09:59.792 | 6
print() | main | 09:09:59.792 | notification 타입: onComplete()
onNext() | main | 09:09:59.792 | null
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* Material/Dematerial 연산자의 실제 활용 예제
* - 특정 Observable 에서 에러가 발생 할 경우 해당 에러에 대해서 구체적으로 처리할 수 있다.
*/
public class ObservableMaterialExample02 {
public static void main(String[] args) {
Observable.concatEager(
Observable.just(
getDBUser().subscribeOn(Schedulers.io()),
getAPIUser()
.subscribeOn(Schedulers.io())
.materialize()
.map(notification -> {
if (notification.isOnError()) {
// 관리자에게 에러 발생을 알림
Logger.log(LogType.PRINT, "# API user 에러 발생!");
}
return notification;
})
.filter(notification -> !notification.isOnError())
.dematerialize(notification -> notification)
)
).subscribe(
data -> Logger.log(LogType.ON_NEXT, data),
error -> Logger.log(LogType.ON_ERROR, error),
() -> Logger.log(LogType.ON_COMPLETE)
);
TimeUtil.sleep(1000L);
}
private static Observable<String> getDBUser() {
return Observable.fromIterable(Arrays.asList("DB user1", "DB user2", "DB user3", "DB user4", "DB user5"));
}
private static Observable<String> getAPIUser() {
return Observable
.just("API user1", "API user2", "Not User", "API user4", "API user5")
.map(user -> {
if (user.equals("Not User"))
throw new RuntimeException();
return user;
});
}
}
/*
onNext() | main | 09:29:35.195 | DB user1
print() | RxCachedThreadScheduler-1 | 09:29:35.193 | # API user 에러 발생!
onNext() | main | 09:29:35.199 | DB user2
onNext() | main | 09:29:37.189 | DB user3
onNext() | main | 09:29:37.190 | DB user4
onNext() | main | 09:29:37.190 | DB user5
onNext() | main | 09:29:37.190 | API user1
onNext() | main | 09:29:37.190 | API user2
onComplete() | main | 09:29:37.190
*/
이 글은 inflearn에 있는 Kevin의 알기 쉬운 RxJava 1부를 공부하고 작성한 글입니다.
강의영상 링크