정상에서 IT를 외치다

[리액티브 프로그래밍] 리액티브 연산자의 활용 본문

안드로이드

[리액티브 프로그래밍] 리액티브 연산자의 활용

Black-Jin 2018. 12. 19. 02:07
반응형


안녕하세요. 블랙진입니다.


한빛미디어 RxJava프로그래밍을 보며 리액티브를 공부한 내용을 정리하기 위한 세번째 포스팅입니다.




Chapter 4


4.1 생성연산자


4.1.1 interval() 함수

/**
* 일정 시간 간격으로 데이터 흐름을 생성합니다.
* 현재 스레드가 아닌 스케줄러에서 실행
*/
public class IntervalExample {

public void example() {

Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(data -> (data + 1) * 100)
.map(Object::toString)
.take(5);

source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}

(결과)

RxComputationThreadPool-1: 100

RxComputationThreadPool-1: 200

RxComputationThreadPool-1: 300

RxComputationThreadPool-1: 400

RxComputationThreadPool-1: 500


public class CommonUtils {

public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

CommonUtils.sleep() 함수가 있습니다. 이는 Thread.sleep(millis) 함수로 millis초 동안 실행중인 쓰레드를 정지하라는 명령입니다.

위 예제 에서는 1000ms 동안 메인 쓰레드가 중지됩니다. 메인 스레드가 중지되면서 스케줄러가 동작되는 원리입니다.



4.1.2 timer() 함수

/**
* interval() 함수와 유사하지만 한 번만 실행하는 함수입니다.
* 일정 시간이 지난 후에 한 개의 데이터를 발행하고 onComplete() 이벤트가 발생합니다.
*
* 현재 스레드가 아닌 스케줄러에서 실행
*/
public class TimerExample {

public void example() {

Observable<String> source = Observable.timer(500L, TimeUnit.MILLISECONDS)
.map(notUser -> {
return new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
.format(new Date());
});

source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}



4.1.3 range() 함수

/**
* 주어진 값(n)부터 m개의 Integer 객체를 발행합니다.
*/
public void example() {

Observable<Integer> source = Observable.range(1, 10)
.filter(number -> number % 2 == 0);
source.subscribe(System.out::println);
}



4.1.4 intervalRange() 함수

/**
* interval()과 range()를 혼합해놓은 함수입니다. interval() 함수 처럼 일정한 시간 간격으로 값을 출력하지만
* range() 함수처럼 시작 숫자(n)로부터 m개 만큼의 값만 생성하고 onComplete 이벤트가 발생합니다.
*/
public void example() {

Observable<Long> source = Observable.intervalRange(
1,
5,
100L,
100L,
TimeUnit.MILLISECONDS);

source.subscribe(System.out::println);
CommonUtils.sleep(1000);
}



4.1.5 defer() 함수

/**
* timer 함수와 비슷하지만 데이터 흐름 생성을 구독자가 subscribe() 함수를 호출할 때까지 미룰 수 있습니다.
*/

(원형)

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableDefer<T>(supplier));
}

SchedulerSupport(value="none") 이므로 현재 스레드 (main 스레드) 에서 실행되며 인자로는 Callable<Observable<T>>를 받습니다.



4.1.6 repeat() 함수

/**
* 단순히 반복 실행을 합니다.
*/
public void example() {
String[] balls = {"1","3","5"};
Observable<String> source = Observable.fromArray(balls)
.repeat(3); // 3번 반복 실행합니다.

source.subscribe(System.out::println);
}



4.2 변환 연산자


4.2.1 concatMap() 함수

/**
* flatMap() 함수와 매우 비슷합니다. flatMap() 는 먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면
* 나중에 들어온 데이터의 처리 결과가 먼저 출력될 수도 있습니다. 이를 이터리밍(interleaving) 이라고 합니다.
*
* concatMap() 함수는 먼저 들어온 데이터 순서대로 처리해서 결과를 낼 수 있도록 보장해줍니다.
*/
public class ConcatMap {

public void example() {

// 시간을 측정하기 위해 호출
CommonUtils.exampleStart();

String[] balls = {"1","3","5"};
//100ms 간격으로 interval() 함수를 호출합니다.
Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
// 0 부터 발생하는 Long 객체 값을 Integer 값으로 변환합니다.
.map(Long::intValue)
// 숫자를 "1","3","5" 문자열로 변환합니다.
.map(idx -> balls[idx])
// 3개의 값만은 가져옵니다.
.take(balls.length)
// 입력인 "1","3","5 은 100ms 간격으로 발생하지만 출력인 별은 200ms 간격으로 발생하기 때문에
// 입력과 출력의 순서가 역전될 수 있습니다.
// 그것을 concatMap 함수가 잡아줍니다.
.concatMap(ball -> Observable.interval(200L, TimeUnit.MILLISECONDS)
.map(notUser -> ball + "☆")
.take(2)
);
source.subscribe(MyLog::LogTime);
CommonUtils.sleep(2000);
}

}


4.2.2 switchMap() 함수

/**
* concatMap() 함수가 인터러빙 발생할 수 있는 상황에서 동작의 순서를 보장해준다면
* switchMap() 함수는 순서를 보장하기 위해 기존에 진행 중이던 작업을 바로 중단합니다.
* 여러 개의 값이 발행되었을 때 마지막에 들어온 값만 처리하고 싶을 때 사용합니다.
* 중간에 끊기더라도 마지막 데이터의 처리는 보장하기 때문입니다.
*/
public class SwitchMap {

public void example() {

// 시간을 측정하기 위해 호출
CommonUtils.exampleStart();

String[] balls = {"1","3","5"};
//100ms 간격으로 interval() 함수를 호출합니다.
Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
// 0 부터 발생하는 Long 객체 값을 Integer 값으로 변환합니다.
.map(Long::intValue)
// 숫자를 "1","3","5" 문자열로 변환합니다.
.map(idx -> balls[idx])
// 3개의 값만은 가져옵니다.
.take(balls.length)
// 중간 결과 확인용 함수
.doOnNext(MyLog::LogTime)
// SwitchMap 함수로 처리합니다.
.switchMap(ball -> Observable.interval(200L, TimeUnit.MILLISECONDS)
.map(notUser -> ball + "☆")
.take(2)
);
source.subscribe(MyLog::LogTime);
CommonUtils.sleep(2000);
}
}



4.2.3 GroupBy() 함수

/**
* 어떤 기준으로 단일 Observable 을 여러개로 이루어진 Observable 그룹으로 만듭니다.
*
* map() 함수는 1개의 데이터를 다른 값이나 다른 타입으로 변환해줍니다.
* flatMap() 함수는 1개의 값을 받아서 여러 개의 데이터(Observable)로 확장해줍니다.
* groupBy() 함수는 값들을 받아서 어떤 기준에 맞는 새로운 Observable 다수를 생성합니다.
*/
public class GroupBy {

public void example() {
String[] objs = {"6","4","2-T","2","6-T","4-T"};
// GroupedObservable 클래스는 Observable 과 동일하지만 getKey() 라는 메서드를 제공하여
// 구분된 그룹을 알 수 있게 해줍니다.
Observable<GroupedObservable<String, String>> source =
Observable.fromArray(objs).groupBy(CommonUtils::getShape);

source.subscribe(obj ->
obj.subscribe(
val -> System.out.println("GROUP:" + obj.getKey() + "\t Value: " + val)
));
}
}



4.2.3 Scan() 함수

/**
* reduce() 함수와 비슷합니다.
* reduce() 함수는 Observable 에서 모든 데이터가 입력된 후 그것을 종합하여 마지막 1개의 데이터만을 구독자에게 발행했습니다.
* scan() 함수는 실행할 때마다 입력값에 맞는 중간 결가 및 최종 결과를 구독자에게 발행합니다.
*/
public class Scan {

public void example() {
String[] balls = {"1","3","5"};
Observable<String> source = Observable.fromArray(balls)
.scan((ball1, ball2) -> ball2 + "(" + ball1 + ")");
source.subscribe(MyLog::Log);
}
}



4.3 결합 연산자


4.3.1 zip() 함수

/**
* 각각의 Observable 을 모두 활용해 2개 혹은 그 이상의 Observable 을 결합합니다.
*
* Zip() 함수는 최대 9개의 Observable 을 결합할 수 있지만 보통 2개 혹은 3개면 충분합니다.
*/
public class Zip {

public void example() {
String[] shapes = {"BALL","PENTAGON","STAR"};
String[] coloredTriangles = {"2-T","6-T","4-T"};

Observable<String> source = Observable.zip(
// 모양을 가져옵니다.
Observable.fromArray(shapes).map(Shape::getSuffix),
// 색상을 가져옵니다.
Observable.fromArray(coloredTriangles).map(Shape::getColor),
(suffix, color) -> color + suffix);

source.subscribe(MyLog::Log);
}
}



4.3.2 combineLatest() 함수

/**
* 2개 이상의 Observable 을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수입니다.
* 마지막 인자로 combiner 가 들어가는데 그것이 각 Observable 을 결합하여 어떤 결과를 만들어주는 역활을 합니다.
*
* 예를 들어 첫 번째 Observable 과 두 번째 Observable 을 결합하는 기능을 만든다고 하면
* 첫 번째 Observable 값 혹은 두 번째 Observable 의 값이 변경되었을 때 그 값을 자동으로 갱신해줍니다.
*/
public class CombineLatest {

public void example() {

String[] data1 = {"6","7","4","2"};
String[] data2 = {"DIAMOND","STAR","PENTAGON"};

Observable<String> source = Observable.combineLatest(
Observable.fromArray(data1)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(shape, notUser) -> Shape.getColor(shape)),
Observable.fromArray(data2)
.zipWith(Observable.interval(150L, 200L, TimeUnit.MILLISECONDS),
(shape, notUser) -> Shape.getSuffix(shape)), (v1, v2) -> v1 + v2
);

source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}



4.3.3 merge() 함수

/**
* 입력 Observable 의 순서와 모든 Observable 이 데이터를 발행하는지 등에 관여하지 않고
* 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그래로 발행합니다.
*/
public class Merge {

public void example() {
String[] data1 = {"1","3"};
String[] data2 = {"2","4","6"};

Observable<String> source1 = Observable.interval(0L, 100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx -> data1[idx])
.take(data1.length);

Observable<String> source2 = Observable.interval(50, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx -> data2[idx])
.take(data2.length);

Observable<String> source = Observable.merge(source1, source2);

source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}



4.3.3 concat() 함수

**
* 2개 이상의 Observable 을 이어 붙여주는 함수입니다.
* 첫 번째 Observable 에 onComplete 이벤트가 발생하야 두 번 째 Observable 을 구독합니다.
*/
public class Concat {

public void example() {
Action onCompleteAction = () -> MyLog.Log("onComplete()");

String[] data1 = {"1","3","5"};
String[] data2 = {"2","4","6"};

Observable<String> source1 = Observable.fromArray(data1)
.doOnComplete(onCompleteAction);

Observable<String> source2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx -> data2[idx])
.take(data2.length)
.doOnComplete(onCompleteAction);

Observable<String> source= Observable.concat(source1, source2)
.doOnComplete(onCompleteAction);

source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}



4.4 조건연산자


4.4.1 amb() 함수

/**
* 여러개의 Observable 중에서 1개의 Observable 을 선택하는데,
* 선택 기준을 가장 먼저 데이터를 발행하는 Observable 입니다.
* 이후에 나머지 Observable 에서 발행하는 데이터는 모두 무시합니다.
*/
public class Amb {

public void example() {
String[] data1 = {"1","3","5"};
String[] data2 = {"2-R","4-R"};

List<Observable<String>> sources = Arrays.asList(
Observable.fromArray(data1)
.doOnComplete(() -> MyLog.Log("Observable #1 : onComplete()")),
Observable.fromArray(data2)
.delay(100L, TimeUnit.MILLISECONDS)
.doOnComplete(() -> MyLog.Log("Observable #2 : onComplete()"))
);

Observable.amb(sources)
.doOnComplete(() -> MyLog.Log("Result : onComplete()"))
.subscribe(MyLog::Log);

CommonUtils.sleep(1000);
}
}



4.4.2 takeUntil() 함수

/**
* take() 함수에 조건을 설정할 수 있습니다.
* take() 함수처럼 일정 개수만 값을 발행하되 완료 기준을 다른 Observable 에서 값을 발행하는지고 판단하는 것입니다.
*/
public class TakeUntil {

public void example() {
String[] data = {"1","2","3","4","5","6"};

Observable<String> source = Observable.fromArray(data)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val, notUsed) -> val)
.takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));

source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}



4.4.3 skipUntil() 함수

/**
* takeUntil() 과 정반대의 함수입니다.
* other Observable 을 인자로 받다는 점은 같지만 Observable 에서 데이터를 발행할 때까지 값을 건너뜁니다.
*/
public class SkipUntil {

public void example() {
String[] data = {"1","2","3","4","5","6"};

Observable<String> source = Observable.fromArray(data)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val, notUser) -> val)
.skipUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));

source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}



4.4.4 all() 함수

/**
* 주어진 조건에 100% 맞을 때만 true 값을 발행하고 조건에 맞이 않는 데이터가 발행되면
* 바로 false 값을 발행합니다.
*/
public class All {

public void example() {
String[] data = {"1","2","3","4"};

Single<Boolean> source = Observable.fromArray(data)
.map(Shape::getShape)
.all(Shape.BALL::equals);

source.subscribe(MyLog::Log);
}
}



4.5 수학 및 기타 연산자


4.5.1 수학 함수


4.5.2 delay() 함수

/**
* RxJava 에는 시간을 다루는 함수들이 많습니다.
*
* 주기적으로 Observable 에서 값을 발행해주는 interval() 함수
* 일정 시간이 지난 후 값을 발행해주는 timer() 함수
* Callable 을 등록해두고 실행을 지연하는 defer() 함수
*
* 앞서 소개한 세 가지 함수가 Observable 을 생성하는 역활이라면
* delay() 함수는 유틸리티 연산자로서 보조 역활을 합니다.
*/
public class Delay {

public void example() {
String[] data = {"1","7","2","3","4"};
Observable<String> source = Observable.fromArray(data)
.delay(100L, TimeUnit.MILLISECONDS);
source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}

}



4.5.3 timeInterval() 함수

/**
* 어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지를 알려줍니다.
*/
public class TimeInterval {

public void example() {
String[] data = {"1","3","7"};

CommonUtils.exampleStart();
Observable<Timed<String>> source = Observable.fromArray(data)
.delay(item -> {
CommonUtils.doSomething();
return Observable.just(item);
})
.timeInterval();

source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}

}


반응형
Comments