Posts RxJava - 연산자 개요 및 생성 연산자
Post
Cancel

RxJava - 연산자 개요 및 생성 연산자

RxJava의 연산자(Operator)란?

  • RxJava에서의 연산자는 메서드(함수)다.
  • 연산자를 이용하여 데이터를 생성하고 통지하는 Flowable이나 Observable 등의 생산자를 생성할 수 있다.
  • Flowable이나 Observable에서 통지한 데이터를 다양한 연산자를 사용하여 가공 처리하여 결과값을 만들어 낸다.
  • 연산자의 특성에 따라 카테고리로 분류된다.
    • Flowable/Observable 생성 연산자
    • 통지된 데이터를 필터링 해주는 연산자
    • 통지된 데이터를 변환 해주는 연산자
    • 여러 개의 Flowable/Observable을 결합하는 연산자
    • 에러 처리 연산자
    • 유틸리티 연산자
    • 조건과 불린 연산자
    • 통지된 데이터를 집계 해주는 연산자

Flowable/Observable 생성 연산자

interval

  • 지정한 시간 간격마다 0부터 시작하는 숫자(Long)를 통지한다.
  • initialDelay 파라미터 이용해서 최초 통지에 대한 대기 시간을 지정할 수 있다.
  • 완료 없이 계속 통지한다.
  • 호출한 스레드와는 별도의 스레드에서 실행된다.
  • polling(특정 요청을 반복적으로 하는 것) 용도의 작업을 수행할 때 활용할 수 있다.

interval

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
 * polling 용도로 주로 사용.
 */
public class ObservableIntervalExample {
    public static void main(String[] args){
        Observable.interval(0, 1000L, TimeUnit.MILLISECONDS)
                .map(num -> num + " count")
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data));

        TimeUtil.sleep(3000);
    }
}
/*
onNext() | RxComputationThreadPool-1 | 12:31:54.677 | 0 count
onNext() | RxComputationThreadPool-1 | 12:31:55.610 | 1 count
onNext() | RxComputationThreadPool-1 | 12:31:56.610 | 2 count
onNext() | RxComputationThreadPool-1 | 12:31:57.610 | 3 count
*/

range

  • 지정한 값(n) 부터 m 개의 숫자(Integer)를 통지한다.
  • for, while 문 등의 반복문을 대체할 수 있다.

range

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ObservableRangeExample {
    public static void main(String[] args) {
        Observable<Integer> source = Observable.range(0,5);
        source.subscribe(num -> Logger.log(LogType.ON_NEXT, num));
    }
}
/*
onNext() | main | 12:45:20.086 | 0
onNext() | main | 12:45:20.090 | 1
onNext() | main | 12:45:20.091 | 2
onNext() | main | 12:45:20.091 | 3
onNext() | main | 12:45:20.091 | 4
*/

timer

  • 지정한 시간이 지나면 0(Long)을 통지한다.
  • 0을 통지하고 onComplete() 이벤트가 발생하여 종료한다.
  • 호출한 스레드와는 별도의 스레드에서 실행된다.
  • 특정 시간을 대기한 후에 어떤 처리를 하고자 할 때 활용할 수 있다.

timer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ObservableTimerExample {
    public static void main(String[] args) {
        Logger.log(LogType.PRINT, "# Start!");
        Observable<String> observable =
                Observable.timer(2000, TimeUnit.MILLISECONDS)
                .map(count -> "Do work!");

        observable.subscribe(data -> Logger.log(LogType.ON_NEXT , data));

        TimeUtil.sleep(3000);
    }
}
/*
print() | main | 13:40:15.640 | # Start!
onNext() | RxComputationThreadPool-1 | 13:40:17.777 | Do work!
*/

defer

  • 구독이 발생할 때마다 즉, subscribe()가 호출될 때마다 새로운 Observable을 생성한다.
  • 선언한 시점의 데이터를 통지하는 것이 아니라 호출 시점의 데이터를 통지한다.
  • 데이터 생성을 미루는 효과가 있기때문에 최신 데이터를 얻고자할 때 활용할 수 있다.

defer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
 * 실제 구독이 발생할 때 Observable을 새로 반환하여 새로운 Observable을 생성한다.
 * defer()를 활용하면 데이터 흐름의 생성을 지연하는 효과를 보여준다.
 */
public class ObservableDeferExample {
    public static void main(String[] args) throws InterruptedException{
        Observable<LocalTime> observable = Observable.defer(() -> {
            LocalTime currentTime = LocalTime.now();
            return Observable.just(currentTime);
        });

        Observable<LocalTime> observableJust = Observable.just(LocalTime.now());

        observable.subscribe(time -> Logger.log(LogType.PRINT, " # defer() 구독1의 구독 시간: " + time));
        observableJust.subscribe(time -> Logger.log(LogType.PRINT, " # just() 구독1의 구독 시간: " + time));

        Thread.sleep(3000);

        observable.subscribe(time -> Logger.log(LogType.PRINT, " # defer() 구독2의 구독 시간: " + time));
        observableJust.subscribe(time -> Logger.log(LogType.PRINT, " # just() 구독2의 구독 시간: " + time));

    }
}
/*
print() | main | 14:12:44.402 |  # defer() 구독1의 구독 시간: 14:12:44.380150
print() | main | 14:12:44.408 |  # just() 구독1의 구독 시간: 14:12:44.365412
print() | main | 14:12:47.412 |  # defer() 구독2의 구독 시간: 14:12:47.412465
print() | main | 14:12:47.413 |  # just() 구독2의 구독 시간: 14:12:44.365412
*/

fromIterable

  • Iterable 인터페이스를 구현한 클래(ArrayList 등)를 파라미터로 받는다.
  • Iterable에 담긴 데이터를 순서대로 통지한다.

fromIterable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ObservableFromIterableExample {
    public static void main(String[] args) {
        List<String> countries = Arrays.asList("Korea", "Canada", "USA", "Italy");

        Observable.fromIterable(countries)
                .subscribe(country -> Logger.log(LogType.ON_NEXT, country));
    }
}
/*
onNext() | main | 14:53:07.569 | Korea
onNext() | main | 14:53:07.575 | Canada
onNext() | main | 14:53:07.576 | USA
onNext() | main | 14:53:07.576 | Italy
*/

fromFuture

  • Future 인터페이스는 자바 5에서 비동기 처리를 위해 추가된 동시성 API이다.
  • 시간이 오래 걸리는 작업은 Future를 반환하는 ExcutorService에게 맡기고 비동기로 다른 작업을 수행할 수 있다.
  • Java 8 에서는 CompletableFuture 클래스를 통해 구현이 간겨해졌다.

fromFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 일반적인 처리
public class FutureExampleSync {
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();

        CarRepairShop shop = new CarRepairShop();

        // 차량 수리비
        int carRepairCost = shop.getCarRepairCostSync(10);
        Logger.log(LogType.PRINT, "# (1) 차량 수리비 계산이 완료되었습니다.");
        Logger.log(LogType.PRINT, "# 차량 수리비는 " + carRepairCost + "원 입니다.");

        // 회사에 병가 신청
        requestSickLeave("20170903-01");

        // 보험 접수
        requestInsurance("44나4444");

        long endTime = System.currentTimeMillis();

        double executeTime = (endTime - startTime) / 1000.0;

        System.out.println();
        System.out.println("# 처리 시간: " + executeTime + "초");
    }

    private static void requestSickLeave(String empNumber) {
        try {
            Thread.sleep(1000);

            Logger.log(LogType.PRINT, "# (2) 병가 신청이 완료되었습니다.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void requestInsurance(String carNumber) {
        try {
            Thread.sleep(1000);

            Logger.log(LogType.PRINT, "# (3) 보험 접수가 완료 되었습니다.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
/*
print() | main | 15:22:20.759 | # 차량 수리비 계산 중................
print() | main | 15:22:23.765 | # (1) 차량 수리비 계산이 완료되었습니다.
print() | main | 15:22:23.765 | # 차량 수리비는 200000원 입니다.
print() | main | 15:22:24.770 | # (2) 병가 신청이 완료되었습니다.
print() | main | 15:22:25.774 | # (3) 보험 접수가 완료 되었습니다.

# 처리 시간: 5.141초
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// Future를 사용한 처리
public class FutureExampleAsync {
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        Logger.log(LogType.PRINT, "# Start");
        CarRepairShop shop = new CarRepairShop();

        // 차량 수리비(시간이 더 오래 걸리는 미래에 끝날 일)
        Future<Integer> future = shop.getCarRepairCostAsync(10);

        // 회사에 병가 신청(짧은 처리 시간)
        requestSickLeave("20170903-01");

        // 보험 청구(짧은 처리 시간)
        requestInsurance("44나4444");

        try {
            int carRepairCost = future.get();
            Logger.log(LogType.PRINT, "# (1) 차량 수리비 계산이 완료되었습니다.");
            Logger.log(LogType.PRINT, "# 차량 수리비는 " + carRepairCost + "원 입니다.");
        } catch (Exception e) {
            e.printStackTrace();
        }

        long endTime = System.currentTimeMillis();

        double executeTime = (endTime - startTime) / 1000.0;

        System.out.println();
        System.out.println("# 처리 시간: " + executeTime);
    }

    private static void requestSickLeave(String empNumber) {
        try {
            Thread.sleep(1000);
            Logger.log(LogType.PRINT, "# (2) 병가 신청이 완료되었습니다.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void requestInsurance(String carNumber) {
        try {
            Thread.sleep(1000);
            Logger.log(LogType.PRINT, "# (3) 보험 접수가 완료 되었습니다.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
/*
print() | main | 15:22:49.799 | # Start
print() | ForkJoinPool.commonPool-worker-3 | 15:22:49.812 | # 차량 수리비 계산 중................
print() | main | 15:22:50.817 | # (2) 병가 신청이 완료되었습니다.
print() | main | 15:22:51.820 | # (3) 보험 접수가 완료 되었습니다.
print() | main | 15:22:52.817 | # (1) 차량 수리비 계산이 완료되었습니다.
print() | main | 15:22:52.817 | # 차량 수리비는 200000원 입니다.

# 처리 시간: 3.105
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// RxJava fromFuture 사용
public class ObservableFromFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Logger.log(LogType.PRINT, "# start time");

        // 긴 처리 시간이 걸리는 작업
        Future<Double> future = longTimeWork();

        // 짧은 처리 시간이 걸리는 작업
        shortTimeWork();

        Observable.fromFuture(future)
                .subscribe(data -> Logger.log(LogType.PRINT, "# 긴 처리 시간 작업 결과 : " + data));

        Logger.log(LogType.PRINT, "# end time");
    }


    public static CompletableFuture<Double> longTimeWork() {
        return CompletableFuture.supplyAsync(() -> calculate());
    }

    private static Double calculate() {
        Logger.log(LogType.PRINT, "# 긴 처리 시간이 걸리는 작업 중.........");
        TimeUtil.sleep(6000L);
        return 100000000000000000.0;
    }

    private static void shortTimeWork() {
        TimeUtil.sleep(3000L);
        Logger.log(LogType.PRINT, "# 짧은 처리 시간 작업 완료!");
    }
}
/*
print() | main | 15:25:11.989 | # start time
print() | ForkJoinPool.commonPool-worker-3 | 15:25:12.003 | # 긴 처리 시간이 걸리는 작업 중.........
print() | main | 15:25:15.007 | # 짧은 처리 시간 작업 완료!
print() | main | 15:25:18.008 | # 긴 처리 시간 작업 결과 : 1.0E17
print() | main | 15:25:18.008 | # end time
*/

이 글은 inflearn에 있는 Kevin의 알기 쉬운 RxJava 1부를 공부하고 작성한 글입니다.
강의영상 링크

This post is licensed under CC BY 4.0 by the author.

RxJava - 함수형 메서드 레퍼런스

RxJava - 데이터 필터링 연산자

Comments powered by Disqus.