Posts RxJava - Flowable과 Observable
Post
Cancel

RxJava - Flowable과 Observable

Flowable과 Observable의 비교

FlowableObservable
Reactive Streams 인터페이스를 구현함Reactive Streams 인터페이스를 구현하지 않음
Subscriber에서 데이터를 처리한다.Observer에서 데이터를 처리한다.
데이터 개수를 제어하는 배압 기능이 있음데이터 개수를 제어하는 배압 기능이 없음
Subscription으로 전달 받는 데이터 개수를 제어할 수 있다.배압 기능이 없기때문에 데이터 개수를 제어할 수 없다.
Subscription으로 구독을 해지한다.Disposable로 구독을 해지한다.

배압(Back Pressure)이란?

  • Flowable에서 데이터를 통지하는 속도가 Subscriber에서 통지된 데이터를 전달받아 처리하는 속도 보다 빠를 때 밸런스를 맞추기 위해 데이터 통지량을 제어하는 기능을 말한다.

Back Pressure

배압(Back Pressure) 예시 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class MissingBackpressureExceptionExample {
    public static void main(String[] agrs) throws InterruptedException {
        Flowable.interval(1L, TimeUnit.MILLISECONDS)
                .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
                .observeOn(Schedulers.computation())
                .subscribe(
                        data -> {
                            Logger.log(LogType.PRINT, "# 소비자 처리 대기 중..");
                            TimeUtil.sleep(1000L);
                            Logger.log(LogType.ON_NEXT, data);
                        },
                        error -> Logger.log(LogType.ON_ERROR, error),
                        () -> Logger.log(LogType.ON_COMPLETE)
                );

        Thread.sleep(2000L);
    }
}
/*
doOnNext() | RxComputationThreadPool-2 | 21:12:57.559 | 0
doOnNext() | RxComputationThreadPool-2 | 21:12:57.562 | 1
print() | RxComputationThreadPool-1 | 21:12:57.562 | # 소비자 처리 대기 중..
doOnNext() | RxComputationThreadPool-2 | 21:12:57.562 | 2
doOnNext() | RxComputationThreadPool-2 | 21:12:57.562 | 3
doOnNext() | RxComputationThreadPool-2 | 21:12:57.562 | 4
doOnNext() | RxComputationThreadPool-2 | 21:12:57.562 | 5
doOnNext() | RxComputationThreadPool-2 | 21:12:57.562 | 6
.
.
.
doOnNext() | RxComputationThreadPool-2 | 21:12:57.647 | 126
doOnNext() | RxComputationThreadPool-2 | 21:12:57.648 | 127
onNext() | RxComputationThreadPool-1 | 21:12:58.563 | 0
onERROR() | RxComputationThreadPool-1 | 21:12:58.564 | io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
*/

배압 전략(BackpresuureStrategy)

  • RxJava에서는 BackpressureStrategy를 통해 Flowable이 통지 대기 중인 데이터를 어떻게 다룰지에 대한 배압 전략을 제공한다.

Missing 전략

  • 배압을 적용하지 않는다.
  • 나중에 onBackpressureXXX()로 배압 적용을 할 수 있다.

ERROR 전략

  • 통지된 데이터가 버퍼의 크기를 초과하면 MissingBackpressureException 에러를 통지한다.
  • 즉, 소비자가 생산자의 통지 속도를 따라 잡지 못할 떄 발생한다.
  • 해당 코드와 같은 경우 : 링크

BUFFER 전략 : DROP_LATEST

  • 버퍼가 가득 찬 시점에 버퍼내에서 가장 최근에 버퍼로 들어온 데이터를 DROP 한다.
  • DROP된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채운다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

/**
 * - DROP_LATEST 전략  : 생산자쪽에서 데이터 통지 시점에 버퍼가 가득 차있으면 버퍼내에 있는 데이터 중에서
 * 가장 최근에 버퍼안에 들어온 데이터를 삭제하고 버퍼 밖에서 대기하는 데이터를 그 자리에 채운다.
 */
public class BackpressureBufferExample01 {
    public static void main(String[] args) {
        System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
        Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
                .onBackpressureBuffer(
                        2,
                        () -> Logger.log("overflow!"),
                        BackpressureOverflowStrategy.DROP_LATEST)
                .doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data))
                .observeOn(Schedulers.computation(), false, 1)
                .subscribe(
                        data -> {
                            TimeUtil.sleep(1000L);
                            Logger.log(LogType.ON_NEXT, data);
                        },
                        error -> Logger.log(LogType.ON_ERROR, error)
                );

        TimeUtil.sleep(2800L);
    }
}
/*
# start : 23:16:11.557
#inverval doOnNext() | RxComputationThreadPool-2 | 23:16:11.992 | 0
#onBackpressureBuffer doOnNext() | RxComputationThreadPool-2 | 23:16:11.992 | 0
#inverval doOnNext() | RxComputationThreadPool-2 | 23:16:12.292 | 1
#inverval doOnNext() | RxComputationThreadPool-2 | 23:16:12.591 | 2
#inverval doOnNext() | RxComputationThreadPool-2 | 23:16:12.891 | 3
overflow! | RxComputationThreadPool-2 | 23:16:12.893
onNext() | RxComputationThreadPool-1 | 23:16:12.998 | 0
#onBackpressureBuffer doOnNext() | RxComputationThreadPool-1 | 23:16:12.998 | 1
#inverval doOnNext() | RxComputationThreadPool-2 | 23:16:13.191 | 4
#inverval doOnNext() | RxComputationThreadPool-2 | 23:16:13.492 | 5
overflow! | RxComputationThreadPool-2 | 23:16:13.492
#inverval doOnNext() | RxComputationThreadPool-2 | 23:16:13.792 | 6
overflow! | RxComputationThreadPool-2 | 23:16:13.792
onNext() | RxComputationThreadPool-1 | 23:16:14.004 | 1
#onBackpressureBuffer doOnNext() | RxComputationThreadPool-1 | 23:16:14.004 | 3
#inverval doOnNext() | RxComputationThreadPool-2 | 23:16:14.091 | 7
#inverval doOnNext() | RxComputationThreadPool-2 | 23:16:14.392 | 8
overflow! | RxComputationThreadPool-2 | 23:16:14.392
*/

BUFFER 전략 : DROP_OLDEST

  • 버퍼가 가득 찬 시점에 버퍼내에서 가장 오래전에(먼저) 들어온 데이터를 DROP한다.
  • DROP 된 빈 자리에는 버퍼 밖에서 대기하던 데이터를 채운다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

/**
 * - DROP_OLDEST 전략 : 생산자쪽에서 데이터 통지 시점에 버퍼가 가득 차있으면 버퍼내에 있는 데이터 중에서 가장 먼저(OLDEST) 버퍼
 * 안에 들어온 데이터를 삭제하고 버퍼 밖에서 대기하는 데이터를 채운다.
 */
public class BackpressureBufferExample02 {
    public static void main(String[] args){
        System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
        Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
                .onBackpressureBuffer(
                        2,
                        () -> Logger.log("overflow!"),
                        BackpressureOverflowStrategy.DROP_OLDEST)
                .doOnNext(data -> Logger.log("#onBackpressureBuffer doOnNext()", data))
                .observeOn(Schedulers.computation(), false, 1)
                .subscribe(
                        data -> {
                            TimeUtil.sleep(1000L);
                            Logger.log(LogType.ON_NEXT, data);
                        },
                        error -> Logger.log(LogType.ON_ERROR, error)
                );

        TimeUtil.sleep(2500L);
    }
}
/*
# start : 00:12:33.570
#inverval doOnNext() | RxComputationThreadPool-2 | 00:12:34.078 | 0
#onBackpressureBuffer doOnNext() | RxComputationThreadPool-2 | 00:12:34.079 | 0
#inverval doOnNext() | RxComputationThreadPool-2 | 00:12:34.377 | 1
#inverval doOnNext() | RxComputationThreadPool-2 | 00:12:34.677 | 2
#inverval doOnNext() | RxComputationThreadPool-2 | 00:12:34.978 | 3
overflow! | RxComputationThreadPool-2 | 00:12:34.980
onNext() | RxComputationThreadPool-1 | 00:12:35.084 | 0
#onBackpressureBuffer doOnNext() | RxComputationThreadPool-1 | 00:12:35.084 | 2
#inverval doOnNext() | RxComputationThreadPool-2 | 00:12:35.277 | 4
#inverval doOnNext() | RxComputationThreadPool-2 | 00:12:35.577 | 5
overflow! | RxComputationThreadPool-2 | 00:12:35.577
#inverval doOnNext() | RxComputationThreadPool-2 | 00:12:35.877 | 6
overflow! | RxComputationThreadPool-2 | 00:12:35.877
onNext() | RxComputationThreadPool-1 | 00:12:36.089 | 2
#onBackpressureBuffer doOnNext() | RxComputationThreadPool-1 | 00:12:36.089 | 5
#inverval doOnNext() | RxComputationThreadPool-2 | 00:12:36.177 | 7
*/

DROP 전략

  • 버퍼에 데이터가 모두 채워진 상태가 되면 이후에 생성되는 데이터를 버리고(DROP), 버퍼가 비워지는 시점에 DROP되지 않은 데이터부터 다시 버퍼에 담는다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class BackpressureDropExample {
    public static void main(String[] args){
        Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
                .onBackpressureDrop(dropData -> Logger.log(LogType.PRINT, dropData + " Drop!"))
                .observeOn(Schedulers.computation(), false, 1)
                .subscribe(
                        data -> {
                            TimeUtil.sleep(1000L);
                            Logger.log(LogType.ON_NEXT, data);
                        },
                        error -> Logger.log(LogType.ON_ERROR, error)
                );

        TimeUtil.sleep(5500L);
    }
}
/*
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:27.949 | 0
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:28.204 | 1
print() | RxComputationThreadPool-2 | 00:27:28.205 | 1 Drop!
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:28.504 | 2
print() | RxComputationThreadPool-2 | 00:27:28.504 | 2 Drop!
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:28.805 | 3
print() | RxComputationThreadPool-2 | 00:27:28.805 | 3 Drop!
onNext() | RxComputationThreadPool-1 | 00:27:28.957 | 0
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:29.103 | 4
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:29.403 | 5
print() | RxComputationThreadPool-2 | 00:27:29.403 | 5 Drop!
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:29.705 | 6
print() | RxComputationThreadPool-2 | 00:27:29.705 | 6 Drop!
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:30.005 | 7
print() | RxComputationThreadPool-2 | 00:27:30.005 | 7 Drop!
onNext() | RxComputationThreadPool-1 | 00:27:30.108 | 4
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:30.303 | 8
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:30.604 | 9
print() | RxComputationThreadPool-2 | 00:27:30.604 | 9 Drop!
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:30.904 | 10
print() | RxComputationThreadPool-2 | 00:27:30.904 | 10 Drop!
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:31.205 | 11
print() | RxComputationThreadPool-2 | 00:27:31.205 | 11 Drop!
onNext() | RxComputationThreadPool-1 | 00:27:31.308 | 8
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:31.504 | 12
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:31.804 | 13
print() | RxComputationThreadPool-2 | 00:27:31.804 | 13 Drop!
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:32.104 | 14
print() | RxComputationThreadPool-2 | 00:27:32.104 | 14 Drop!
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:32.405 | 15
print() | RxComputationThreadPool-2 | 00:27:32.405 | 15 Drop!
onNext() | RxComputationThreadPool-1 | 00:27:32.509 | 12
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:32.703 | 16
#inverval doOnNext() | RxComputationThreadPool-2 | 00:27:33.004 | 17
print() | RxComputationThreadPool-2 | 00:27:33.004 | 17 Drop!
*/

LATEST 전략

  • 버퍼에 데이터가 모두 채워진 상태가 되면 버퍼가 비워질 때까지 통지된 데이터는 버퍼 밖에서 대기하며 버퍼가 비워지는 시점에 가장 나중에(최근에) 통지된 데이터부터 버퍼에 담는다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * 통지된 데이터로 채워진 버퍼의 데이터를 소비자가 모두 소비하면 버퍼 밖에서 대기중인 통지된 데이터 중에서
 * 가장 나중에(최근에) 통지된 데이터부터 다시 버퍼에 채운다.
 */
public class BackpressureLatestExample {
    public static void main(String[] args){
        Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .doOnNext(data -> Logger.log("#inverval doOnNext()", data))
                .onBackpressureLatest()
                .observeOn(Schedulers.computation(), false, 1)
                .subscribe(
                        data -> {
                            TimeUtil.sleep(1000L);
                            Logger.log(LogType.ON_NEXT, data);
                        },
                        error -> Logger.log(LogType.ON_ERROR, error)
                );

        TimeUtil.sleep(5500L);
    }
}
/*
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:17.666 | 0
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:17.921 | 1
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:18.221 | 2
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:18.520 | 3
onNext() | RxComputationThreadPool-1 | 01:32:18.673 | 0
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:18.820 | 4
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:19.121 | 5
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:19.421 | 6
onNext() | RxComputationThreadPool-1 | 01:32:19.676 | 3
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:19.721 | 7
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:20.021 | 8
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:20.320 | 9
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:20.621 | 10
onNext() | RxComputationThreadPool-1 | 01:32:20.681 | 6
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:20.921 | 11
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:21.222 | 12
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:21.522 | 13
onNext() | RxComputationThreadPool-1 | 01:32:21.685 | 10
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:21.820 | 14
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:22.122 | 15
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:22.421 | 16
onNext() | RxComputationThreadPool-1 | 01:32:22.690 | 13
#inverval doOnNext() | RxComputationThreadPool-2 | 01:32:22.722 | 17
*/

Flowable 과 Observable 내부 동작 원리

Flowable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class HelloRxJavaFlowableCreateExample {
    public static void main(String[] args) throws InterruptedException {
        Flowable<String> flowable =
                Flowable.create(new FlowableOnSubscribe<String>() {
                    @Override
                    public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                        String[] datas = {"Hello", "RxJava!"};
                        for(String data : datas) {
                            // 구독이 해지되면 처리 중단
                            if (emitter.isCancelled())
                                return;

                            // 데이터 통지
                            emitter.onNext(data);
                        }

                        // 데이터 통지 완료를 알린다
                        emitter.onComplete();
                    }
                }, BackpressureStrategy.BUFFER); // 구독자의 처리가 늦을 경우 데이터를 버퍼에 담아두는 설정.

        flowable.observeOn(Schedulers.computation())
                .subscribe(new Subscriber<String>() {
                    // 데이터 개수 요청 및 구독을 취소하기 위한 Subscription 객체
                    private Subscription subscription;

                    @Override
                    public void onSubscribe(Subscription subscription) {
                        this.subscription = subscription;
                        this.subscription.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String data) {
                        Logger.log(LogType.ON_NEXT, data);
                    }

                    @Override
                    public void onError(Throwable error) {
                        Logger.log(LogType.ON_ERROR, error);
                    }

                    @Override
                    public void onComplete() {
                        Logger.log(LogType.ON_COMPLETE);
                    }
                });

        Thread.sleep(500L);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Lamda 사용
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

public class HelloRxJavaFlowableCreateLamdaExample {
    public static void main(String[] args) throws InterruptedException {
        Flowable<String> flowable =
                Flowable.create(emitter -> {
                    String[] datas = {"Hello", "RxJava!"};
                    for(String data : datas) {
                        // 구독이 해지되면 처리 중단
                        if (emitter.isCancelled())
                            return;

                        // 데이터 발행
                        emitter.onNext(data);
                    }

                    // 데이터 발행 완료를 알린다
                    emitter.onComplete();
                }, BackpressureStrategy.BUFFER);

        flowable.observeOn(Schedulers.computation())
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data),
                        error -> Logger.log(LogType.ON_ERROR, error),
                        () -> Logger.log(LogType.ON_COMPLETE),
                        subscription -> subscription.request(Long.MAX_VALUE));

        Thread.sleep(500L);
    }
}

Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public class HelloRxJavaObservableCreateExample {
    public static void main(String[] args) throws InterruptedException {
        Observable<String> observable =
                Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                        String[] datas = {"Hello", "RxJava!"};
                        for(String data : datas){
                            if(emitter.isDisposed())
                                return;

                            emitter.onNext(data);
                        }
                        emitter.onComplete();
                    }
                });

        observable.observeOn(Schedulers.computation())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable disposable) {
                        // 아무 처리도 하지 않음.
                    }

                    @Override
                    public void onNext(String data) {
                        Logger.log(LogType.ON_NEXT, data);
                    }

                    @Override
                    public void onError(Throwable error) {
                        Logger.log(LogType.ON_ERROR, error);
                    }

                    @Override
                    public void onComplete() {
                        Logger.log(LogType.ON_COMPLETE);
                    }
                });

        Thread.sleep(500L);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Lamda 사용
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class HelloRxJavaObservableCreateLamdaExample {
    public static void main(String[] args) throws InterruptedException {
        Observable<String> observable =
                Observable.create(emitter -> {
                    String[] datas = {"Hello", "RxJava!"};
                    for(String data : datas){
                        if(emitter.isDisposed())
                            return;

                        emitter.onNext(data);
                    }
                    emitter.onComplete();
                });

        observable.observeOn(Schedulers.computation())
                .subscribe(
                        data -> Logger.log(LogType.ON_NEXT, data),
                        error -> Logger.log(LogType.ON_ERROR, error),
                        () -> Logger.log(LogType.ON_COMPLETE),
                        disposable -> {/**아무것도 하지 않는다.*/}
                );

        Thread.sleep(500L);
    }
}

이 글은 inflearn에 있는 Kevin의 알기 쉬운 RxJava 1부를 공부하고 작성한 글입니다.
강의영상 링크

This post is licensed under CC BY 4.0 by the author.

RxJava - Reative Streams란?

RxJava - Single vs Maybe vs Completable

Comments powered by Disqus.