관리 메뉴

정상에서 IT를 외치다

[리액티브 프로그래밍] 리액티브 프로그래밍 이란? 본문

안드로이드

[리액티브 프로그래밍] 리액티브 프로그래밍 이란?

사용자 Black-Jin 2018. 12. 18. 00:57
728x90
반응형


안녕하세요. 블랙진입니다.


한빛미디어 RxJava프로그래밍을 보며 리액티브를 공부한 내용을 정리하기 위한 포스팅입니다.




리액티브 프로그래밍이란?


리액티브 프로그래밍은 데이터 흐름과 전달에 관한 프로그래밍 패러다임입니다. 기존의 명령형 프로그래밍은 주로 컴퓨터 하드웨어를 대상으로 프로그래머가 작성한 코드가 정해진 절차에 따라 순서대로 실행되는 방식입니다. 리액티브 프로그래밍은 데이터 흐름을 먼저 정의하고 데이터가 변경되었을 때 연관되는 함수나 수식이 업데이트되는 방식입니다.



요점


Observable 클래스는 상황에 맞게 세분화 되어있음


1. Observable

-> 데이터에 흐름에 맞게 알림을 보내 구독자가 데이터를 처리


2. Single 

-> 오직 1개의 데이터만 발행


3. Maybe 

-> reduce() 함수나 firseElement() 함수와 같이 데이터가 발행될 수 있거나 혹은 발행되지 않고도 완료되는 경우 사용


4. Flowable

-> Observable 에서 데이터가 발행되는 속도가 구독자가 처리하는 속도보다 현저하게 빠른 경우 발생하는 배압 이슈에 대응하는 기능이 추가




Chapter 2


2-1 Observable 처음 만들기

- Observable은 세 가지의 알림을 구독자에게 전달합니다.

/**
* onNext : Observable 이 데이터의 발행을 알립니다. 기존의 옵서버 패턴과 같습니다.
*
* onComplete : 모든 데이터의 발행을 완료했을을 알립니다. onComplete 이벤트는 단 한 번만 발생하며,
* 발행한 후에는 더 이상 onNext 이벤트가 발생해선 안됩니다.
*
* onError : Observable 에서 어떤 이유로 에러가 발생했을을 알립니다.
*/


1. just

public void emitJust() {
Observable.just(1,2,3,4,5,6)
.subscribe(data -> {
MyLog.Log(String.valueOf(data));
});
}

한 개의 값을 넣을 수도 있고 인자로 여러개의 값(최대 10개)을 넣을 수 있습니다. 단 타입은 모두 같아야 합니다.


2. create

public void emitCreate() {
Observable<Integer> source = Observable.create(emitter -> {
emitter.onNext(100);
emitter.onNext(200);
emitter.onNext(300);
emitter.onComplete();
});

source.subscribe(data -> MyLog.Log(String.valueOf(data)));
}

onNext(), onComplete(), onError() 같은 알림을 개발자가 직접 호출해야 합니다.



3. fromArray

public void fromArray() {
Integer[] arr ={100,200,300};
Observable<Integer> source = Observable.fromArray(arr);

source.subscribe(data -> MyLog.Log(data.toString()));
}


4. fromIterable

// 이터레이터 패턴을 구현 한 것으로 어떤 데이터가 있는지와 그 값을 얻어오는 것만 관여
public void fromIterable() {
List<String> names = new ArrayList<>();
names.add("Bob");
names.add("Jerry");
names.add("Peter");

Observable<String> source = Observable.fromIterable(names);
source.subscribe(name -> MyLog.Log(name));
}


5. fromCallable

// 비동기 클래스나 인터페이스와의 연동
public void fromCallable() {

Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "Hello Callable";
}
};

Observable<String> source = Observable.fromCallable(callable);
source.subscribe(System.out::println);
}


6. fromFuture

// 자바 5에서 추가된 동시성 API로 비동기 계산의 결과를 구할 때 사용
public void fromFuture() {

Future<String> future = Executors.newSingleThreadExecutor().submit(() -> {
Thread.sleep(1000);
return "Hello Future";
});

Observable<String> source = Observable.fromFuture(future);
source.subscribe(System.out::println);
}

Executor 인터페이스를 구현한 클래스에 Callable 객체를 인자로 넣어 Future 객체를 반환합니다. Executor 클래스는 단일 스레드 실행자(SingleThreadExecutor)뿐만 아니라 다양한 스레드풀을 지원합니다. 


어느 스레드에서 실행할지를 정할 수 있다는 점이 Callable 과의 차이점입니다. 하지만 Executor 클래스의 실행자 보다는 RxJava에서 제공하는 스케줄러를 활용하여 스레드를 정하는 법을 권장합니다.



7. fromPublisher

// 기능적으로는 Observable create 와 같지만 패키지 이름이 다르다.
public void fromPublisher() {

Publisher<String> publisher = new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> s) {
s.onNext("Hello publisher");
s.onComplete();
}
};

Observable<String> source = Observable.fromPublisher(publisher);
source.subscribe(System.out::println);
}



2-2 Single 클래스

/**
* Observable 클래스는 데이터를 무한하게 발행할 수 있지만
* Single 클래스는 오직 1개의 데이터만 발행하도록 한정합니다.
* 보통 결과가 유일한 서버 API 를 호출할 때 유용하게 사용할 수 있습니다.
*/


/**
* Single Class
*
* 오직 1개의 데이터만 발행하도록 한정
*
* 데이터 하나가 발행과 동시에 종료
* (onNext 와 onComplete 가 onSuccess 로 통합됨)
*/
public void single() {

// single class 사용
Single<String> source = Single.just("Hello Single");
source.subscribe(System.out::println);

// single 객체 사용
// just 의 인자가 2개 이상이면 에러 발생
Observable.just("Hello Single")
.single("default item")
.subscribe(System.out::println);

// first 사용
String[] colors = {"red","blue","gold"};
Observable.fromArray(colors)
.first("default value")
.subscribe(System.out::println);

// take 사용
Observable.just(1,2,3,4,5,6)
.take(1)
.subscribe(System.out::println);

}



2-3 Maybe 클래스

/**
* Maybe Class
*
* Single 클래스와 마찬가지로 최대 데이터 하나를 가질 수 있지만 데이터 발행 없이 바로 데이터 발행을 완료할 수 있습니다.
* 즉 Maybe 클래스는 Single 클래스에 onComplete 이벤트가 추가된 형태입니다.
* (Single 클래스는 1개 완료, Maybe 클래스는 0 혹은 1개 완료)
*/

- Maybe 객체를 생성할 수 있는 연산자

elementAt(), firstElement(), FlatMapMaybe(), lastElement(), reduce(), singleElement() 함수 등이 있습니다.



2-4 뜨거운 Observable


차가운 Observable

- Observable 을 선언하고 just(), fromIterable() 함수를 호출해도 옵서버가 subscribe() 함수를 호출하여 구독하지 않으면 데이터를 발행하지 않습니다. 


뜨거운 Observable

- 구독자 존재 여부와 관계없이 데이터를 발행하는 Observable 입니다. 따라서 여러 구독자를 고려할 수 있습니다.



2-5 Subject 클래스

/**
* Subject Class
*
* 차가운 옵서버블을 뜨거운 옵서버블로 바꿔줌
*
* 옵서버블 이면서 옵서버도 되는게 Subject class 입니다.
*
*/


1. AsyncSubject

//옵서버블에서 발행한 마지막 데이터를 얻어올 수 있는 Subject Class
public void asyncSubject() {

//정적 팩토리 함수 create 사용
AsyncSubject<String> subject = AsyncSubject.create();

subject.subscribe(data -> System.out.println("Subscriber #1 -> " + data));
subject.onNext("1");
subject.onNext("3");

subject.subscribe(data -> System.out.println("Subscriber #2 -> " + data));
subject.onNext("5");
subject.onNext("7");

subject.onComplete();

//onComplete 이후 onNext 함수는 무시 됩니다.
subject.onNext("9");

subject.subscribe(data -> System.out.println("Subscriber #3 -> " + data));
subject.subscribe(data -> System.out.println("Subscriber #4 -> " + data));
}

(결과)

Subscriber #1 -> 7

Subscriber #2 -> 7

Subscriber #3 -> 7

Subscriber #4 -> 7



2. BehaviorSubject

//가장 최근 값 혹은 기본값을 넘겨주는 클래스
public void behaviorSubject() {

BehaviorSubject<String> subject = BehaviorSubject.createDefault("6");

subject.subscribe(data -> System.out.println("Subscriber #1 -> " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data -> System.out.println("Subscriber #2 -> " + data));
subject.onNext("5");
subject.onComplete();
}

(결과)

Subscriber #1 -> 6

Subscriber #1 -> 1

Subscriber #1 -> 3

Subscriber #2 -> 3

Subscriber #1 -> 5

Subscriber #2 -> 5



3. PublishSubject()

// 구독자가 subscribe() 함수를 호출하면 값을 발행하기 시작
// AsyncSubject 클래스처럼 마지막 값만 발행하지 않음
// BehaviorSubject 클래스처럼 발행한 값이 없을 때 기본값을 대신 발행하지도 않음
public void publishSubject() {

PublishSubject<String> subject = PublishSubject.create();

subject.subscribe(data -> System.out.println("Subscriber #1 -> " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data -> System.out.println("Subscriber #2 -> " + data));
subject.onNext("5");
subject.onComplete();
}

(결과)

Subscriber #1 -> 1

Subscriber #1 -> 3

Subscriber #1 -> 5

Subscriber #2 -> 5



4. ConnectableObservable 클래스

// Subject 클래스 처럼 차가운 Observable 을 뜨거운 Observable 로 변환합니다.
// 즉 Observable 이면서 옵서버가 됩니다.
// subscribe() 함수를 호출해도 아무 동작이 일어나지 않고 새로 추가된 connect() 함수를 호출해야 동작
// connect() 함수를 호출해야 그떄까지 구독했던 구독자 모두에게 데이터를 발행
public void connectableObservable() {
String[] dt = {"1", "3", "5"};
Observable<String> balls = Observable
//100ms 간격으로 0,1,2,3,4 .. 발행
.interval(100L, TimeUnit.MILLISECONDS)
// 0,1,2,3,4 .. 숫자를 Long 타입으로 변환
.map(Long::intValue)
// 배열 dt 의 값으로 변환 ( "1", "3", "5" 반환)
.map(i -> dt[i])
// dt 배열의 갯수만큼만 실행 ( 3개 실행 0, 1, 2 )
.take(dt.length);

ConnectableObservable<String> source = balls.publish();

source.subscribe(data -> System.out.println("Subscriber #1 => " + data));
source.subscribe(data -> System.out.println("Subscriber #2 => " + data));
source.connect();

CommonUtils.sleep(250);
source.subscribe(data -> System.out.println("Subscriber #3 => " + data));
CommonUtils.sleep(100);
}

(결과)

Subscriber #1 => 1

Subscriber #2 => 1

Subscriber #1 => 3

Subscriber #2 => 3

Subscriber #1 => 5

Subscriber #2 => 5

Subscriber #3 => 5



(참고) CommonUtils

public class CommonUtils {
// 실행 시간을 표시하기 위한 정적 변수
public static long startTime;

public static void exampleStart() {
startTime = System.currentTimeMillis();
}

public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


<참고자료>

ReactivieX Doc

반응형
0 Comments
댓글쓰기 폼