본문으로 바로가기

cautions!

1. 관례상 stream 변수의 뒤에 $를 붙이곤 합니다.

이 포스트에서는 뗐다가 썼다가 혼용되어 있습니다.

const inputObservable$ = fromEvent(inputDom, 'keyup');

 

2. 공식문서의 그림(이런 녀석들을 "마블 다이어그램"이라고 부릅니다)을 이해할 수 있어야 합니다!

정보의 흐름을 아는데 그림이 정말 중요합니다.

 

3. 그 외의 지식들

- 기본적으로 Observable은 cold 합니다. 즉, subscription이 되지 않으면 등록되지 않습니다.

- Observable은 immutable합니다. operator를 거친다고해서 변형되지 않습니다.

 

 

무엇보다, 공식 문서 보는게 제일 좋습니다. 거기에 타입이랑 인자 다 써 있어요.

그러니까 이 포스트는 그냥 한 번 쭈욱 읽어보고 버리는게 제일 좋습니다.

 

Operator?

observable에 전달된 데이터를 조작, 변형하는 친구들입니다.

수 많은 operator가 있는데 6버전의 구분에 따르면 다음과 같이 구분할 수 있습니다.

(문서에 따른 내용입니다. rxjs-dev.firebaseapp.com/guide/operators)

 

  • Creation Operators => 해당 포스트
  • Join Creation, Join Operators => darrengwon.tistory.com/1376
  • Transformation Operators => darrengwon.tistory.com/1374
  • Filtering Operators => darrengwon.tistory.com/1375
  • Multicasting Operators
  • Error Handling Operators => 해당 포스트
  • Utility Operators => 해당 포스트
  • Conditional and Boolean Operators
  • Mathematical and Aggregate Operators => 해당 포스트

 

이 많은 operator들을 전부 살피는 것은 무리이므로, 간략하게 살펴보도록하겠습니다.

 

 

Create Operator: of, fromEvent, from, interval, timer, range ... etc

뭐 별거 있나요, Observable 생성하는 녀석들입니다.

 

0) 커스텀 Observable 만들기

 

이렇게 만드는게 번거로워서 여러가지 create operator를 rxjs에서 제공해주는 겁니다.

import { Observable } from 'rxjs';

// observable
const observable = new Observable(subscriber => {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    setTimeout(() => {
      subscriber.next(4);
      subscriber.complete(); // 끝
      subscriber.next(5); // 끝났음. 출력안됨
    }, 1000);
  } catch (error) {
    subscriber.error(error);
  }
});

// observer. 단순히 next, error, complete 핸들링할 함수 객체 덩어리임 ㅋㅋ
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});

// 옵저버가 옵저버블을 구독하도록 시킴+ 옵저버의 함수를 실행시키는 주체는 옵저버블임.
observable.subscribe(observer);

 

참고조, 옵저버는 다음과 같이 간략한 방식으로 작성할 수도 있습니다.

다만 저는 선호하지 않는 편이어서, 객체 형태로 명확히 넘겨주는 편을 선호하는 편입니다.

// 객체 형태로
observable.subscribe({
    next: v => console.log(v),
    error: (err) => console.log(err.message),
    complete: () => console.log("done")
})

// 간략하게
observable.subscribe(
  v => console.log(v), 
  (err) => console.log(err.message),
  () => console.log("done")
)

 

unsubscribe까지 작성해보았습니다. 

이 코드의 경우에는 2초가 지나기도 전에 unsubscribe되었으므로 complete 메서드가 실행되지 않습니다.

unsubscribe는 메모리 릭은 방지하기 위해서라도 꼭 해줘야 합니다.

예를 들어 특정 페이지에서만 사용했다가, 다른 페이지로 이동했으면 unsubscribe 해줘야 합니다.

 

* 주의 : unsubscribe 메서드 한 번 썼다고 모든 subscribe가 종료되는 건 아닙니다. 만약, 여러 개의 subscription이 있다면 각각 unsubscribe해주어야 합니다. 

import { Observable } from "rxjs";

// observable
const observable = new Observable((subscriber) => {
  subscriber.next(1);
  const id = setTimeout(() => {
    subscriber.next(2);
    subscriber.complete();
  }, 2000);
  return () => {
    console.log("clearTimeout");
    clearTimeout(id);
  };
});

// subscriber
const subscription = observable.subscribe({
  next: (v) => console.log(v),
  complete: () => console.log("done"),
});

// 참고로, unsubscribe한다고 해서 observer의 complete 메서드가 실행되는 건 아님. 그냥 끝남.
subscription.unsubscribe();

 

 

1) 정적인 값으로부터 observable 생성: of

const source$ = of("Hello", [1, 2, 3], 64);

source$.subscribe({
  next: (x) => console.log(x),
});

// output
"Hello"
[1, 2, 3]
64

 

2) dom의 event로 부터 observable 생성 : fromEvent

 

fromEvent로 생성하면 됩니다. 별도의 파이프 달고 싶으면 달아주면 되구요

우선 가장 기본적인 버튼 클릭 이벤트.

const btn = document.querySelector('.btn');
const btnObservable = fromEvent(btn, 'click'); // observable

// observer
btnObservable.subscribe({
  next: e => console.log(e),
  err: err => console.log(err),
  complete: () => console.log('complete'),
});

 

Rxjs을 특성을 이용하여 곧바로 input의 값을 다른 DOM에 뿌리는 작업을 더욱 쉽게할 수 있습니다.

const inputDom = $('.input');
const outputDom = $('.output');
const inputObservable = fromEvent(inputDom, 'keyup');

inputObservable.subscribe({
  next: e => (outputDom.textContent = e.target.value),
  err: err => console.log(err),
  complete: () => console.log('complete'),
});

 

마우스의 좌표를 받아오겠습니다. 그런데 매번 일어나면 브라우저가 펑 하고 터져버릴 것 같은 그런 느낌이죠?

throttle을 걸어놓겠습니다.

import { fromEvent, interval } from 'rxjs';
import { throttle } from 'rxjs/operators';

const docMoveObservable = fromEvent(document, 'mousemove');
const pipeObservable = docMoveObservable.pipe(throttle(() => interval(1000)));

pipeObservable.subscribe({
  next: e => console.log(e.clientX),
  err: err => console.log(err),
  complete: () => console.log('complete'),
});

 

 

3) 배열로부터 obsrvable 생성하기 : from

from을 사용합니다.

이벤트는 끝나지 않기 때문에 complete를 위한 unscribe를 해주지 않으면 끝나지 않지만,

배열은 끝이 명확하므로 별다른 작업 없이도 complete 메서드가 실행됩니다.

const numbers = [1, 2, 3, 4, 5];
const numberObservable = from(numbers);

numberObservable.subscribe({
  next: x => console.log(x),
  error: err => console.log(err),
  complete: () => console.log('done'),
});

 

set, map도 잘 굴러가네요

const mySet = new Set(['hello', 34, { name: 'cineps&good', asset: 'people' }]);
const myMap = new Map([['name', 'darren'],['feeling', 'depression']]);

const mysetObv = from(mySet);
const mymapObv = from(myMap);

mysetObv.subscribe({
  next: x => console.log(x),
  complete: () => console.log('done'),
});
mymapObv.subscribe({
  next: x => console.log(x),
  complete: () => console.log('done'),
});

 

 

4) Promise로부터 Observable 만들기 : fromPromise 없어졌음 ㅋㅋ from 쓰세요

 

rxjs 버전 5 때는 fromPromise가 있었는데 6 부터는 from으로 통합되었습니다. 

data fetching이 아주 편안해졌습니다.

const promiseSource$ = from(
  axios.get("https://jsonplaceholder.typicode.com/todos/1")
);
promiseSource$.subscribe({ next: (x) => console.log(x) });

 

const promise = new Promise((resolve, reject) => {
  try {
    setTimeout(() => resolve('resolved'), 1000);
  } catch (error) {
    reject(error.message);
  }
});

const PromiseObsv = from(promise);
PromiseObsv.subscribe({
  next: x => console.log(x),
  error: err => console.log(err),
  complete: () => console.log('done'),
});

 

5) interval, timer, range

 

 

interval

setTimeout, setInterval보다 편리하다.

setInterval의 경우 cleaup을 별도로 해줘야 하지만 interval은 complete하면 즉시 cleanup된다.

take 4를 취했으니, 0부터 3까지 출력될 것이다. 끝이 명확하니 complete 메서드는 자동으로 실행된다.

 

+

timer랑 다른 점이 무엇이냐면, inteval은 첫 시작이 주어진 시간이 지나야 시작합니다.

반면 timer는 일단 주어진 시간에 시작하고, 그 다음으로 주어진 시간 대로 

interval(1000) => 1초 지나고부터 1초 간격으로 반복.

timer(0, 1000) => 0초(즉시) 시작하고, 그 다음부터 1초 간격으로 반복.

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const numbersObsv = interval(1000).pipe(take(4));

numbersObsv.subscribe({
  next: x => console.log(x),
  complete: () => console.log('done'),
});

 

timer

일정 시간이 지나면 한번에 다 실행된다. 

아, 그리고 옵저버는 귀찮아서 그냥 옵저버블에 체인으로 달아버렸다. 이게 편하더라구요.

 

// 처음에는 2초, 그 다음부터는 5초 간격으로 출력
timer(2000, 5000).subscribe({ next: (x) => console.log(x) });

 

import { timer, of } from 'rxjs';
import { concatMapTo } from 'rxjs/operators';


// This could be any observable
const source = of(1, 2, 3);

const result = timer(1000)
  .pipe(concatMapTo(source))
  .subscribe({
    next: x => console.log(x),
    complete: () => console.log('done'),
  });

 

range 

python range랑 똑같다.

index 관련해서 헷갈리기 쉬운데, 정의 자체가 range(start, count, scheduler)로 되어 있다.

아래 코드는 0부터 5개를 출력한다. 즉, 0, 1, 2, 3, 4를 출력한다.

import { range } from 'rxjs';

range(0, 5).subscribe({
  next: x => console.log(x),
  complete: () => console.log('done'),
});

 

Error Handling Operators

catchError, retry, retryWhen

 

이 세 친구는 보통 엮어서 사용된다.

operator의 pipe 라인 중 어디에 catchError를 배치할 것인지에 대해 주의해야 한다. 한 번 에러가 발생하면, 별도의 retry를 하지 않으면 그대로 옵저버의 error 메서드가 실행되며 옵저버블이 끝나기 때문이다. 

 

catchError의 사용례는 다음과 같다. err는 발생한 err 객체이며, caught는 cathError가 발생한 source이다.

catchError((err, caught) => ...)

err와 caught를 출력해보았다. 음.. 뭐 특별한 건 없다.

 

그렇다면 사용례를 살펴보도록 하자.

 

가장 간단한 형태의 catchError의 사용례이다. err가 발생되면 err를 리턴해서 observer의 error 메서드를 발생시키고 끝난다.

click$
  .pipe(
    switchMap(() =>
      ajax("https://api.github.com/users/darrenkwondev").pipe(
        catchError((err, caught) => {
          console.log(err, caught);
          return err;
        })
      )
    )
  )
  .subscribe({
    next: (x) => console.log(x),
    error: (err) => console.log(err),
    complete: () => console.log("done"),
  });

 

여기서 만약, 단순히 에러 메세지를 발생시키지 않고, 재시도하길 원한다면, 에러가 발생한 옵저버블을 다시 리턴하면 됩니다.

그러나 단순이 아래와 같이 작성한다면 무한히 retry하게 됩니다.

click$
  .pipe(
    switchMap(() =>
      ajax("https://api.github.com/users/darrenkwondev").pipe(
        catchError((err, caught) => {
          console.log(err, caught);
          return caught; // 무한히 retry함. 왜? 에러가 발생한 obsv를 다시 반환하기 떄문
        })
      )
    )
  )

네트워크 탭을 열어보면 아래처럼 계속해서, 에러가 발생하는 observable을 재실행함을 확인할 수 있다.

이는 명백히 메모리 누수이며 치명적인 결과를 가져온다.

 

 

Utility Operators

tap

값을 단순히 확인하고 싶을 때 사용합니다.

값을 도중에 확인하고 싶은 경우에는 map이 아닌 tap을 이용하도록합시다.

tab(observer) 꼴로 작성하기 때문에 도중에 발생한 error, complete를 캐치할 수도 있습니다.

of(1, 2, 3, 4)
  .pipe(
    tap((v) => console.log("before", v)),
    map((v) => v * 2),
    tap((v) => console.log("after", v))
  )
  .subscribe(console.log);

 

 

Mathematical and Aggregate Operators

 

reduce

일반적으로 써 왔던 reduce의 역할입니다.

of(1, 2, 3, 4)
  .pipe(
    reduce((acc, cur, i) => {
      console.log(`${i}번째 acc : ${acc}, cur: ${cur}`);
      return acc + cur;
    }, 0)
  )
  .subscribe(console.log);

 

 

count, max, min

설명이 필요 없죠

of(1, 2, 3, 4).pipe(count()).subscribe(console.log); // 4 4개니까
of(1, 2, 3, 4).pipe(max()).subscribe(console.log); // 4 제일 크니까
of(1, 2, 3, 4).pipe(min()).subscribe(console.log); // 1 제일 작으니까

 

 

 


darren, dev blog
블로그 이미지 DarrenKwonDev 님의 블로그
VISITOR 오늘 / 전체