개발콩블로그

[RxSwift] ObservableType과 ObserverType 본문

RxSwift

[RxSwift] ObservableType과 ObserverType

devBean 2025. 2. 25. 23:51

안녕하세요 개발콩입니다!

오늘은 ObservableType과 ObserverType에 대한 글을 작성해보도록 하겠습니다.

 

 

바로 Observable과 Observer가 채택하는 프로토콜을 알아볼까요?

ObservableType Protocol

public protocol ObservableType: ObservableConvertibleType {
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}

extension ObservableType {
    public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
        let observer = AnonymousObserver { e in
            on(e)
        }
        return self.asObservable().subscribe(observer)
    }
}

 

Observable은 내부적으로 AnonymousObserver를 생성하고

이러한 observer가 (Event<Element> -> Void) 클로저를 실행합니다.

 

한번 확인해 볼까요?

final class AnonymousObserver<Element>: ObserverBase<Element> {
    typealias EventHandler = (Event<Element>) -> Void
    
    private let eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self.eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        self.eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}

 

AnonymousObserver에서 init시점에 받은 클로저는

onCore(_ event: Event<Element>) method에서 실행되는 것을 볼 수 있습니다.

 

 

그리고 해당 onCore(_ event: Event) method는

on(_ event: Event<Element>)에서 사용되는 것을 볼 수 있습니다.

class ObserverBase<Element> : Disposable, ObserverType {
    private let isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self.isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self.isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }

    func dispose() {
        fetchOr(self.isStopped, 1)
    }
}

 

그렇다면 이곳에서 등장한 ObserverType에 대해 확인해보겠습니다.

 

ObserverType Protocol

public protocol ObserverType {
    associatedtype Element

    func on(_ event: Event<Element>)
}

extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
    public func onCompleted() {
        self.on(.completed)
    }
    
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

 

ObserverType은 on(_ event: Event<Element>)가 필수적으로 구현되어야 하며,

Event Type은 아래와 같습니다.

@frozen public enum Event<Element> {
    /// Next element is produced.
    case next(Element)

    /// Sequence terminated with an error.
    case error(Swift.Error)

    /// Sequence completed successfully.
    case completed
}

 

 

결론

  • ObservableType에서 내부적으로 Observer를 생성합니다.
  • 해당 Observer는 AnonymousObserver 타입이며 생성 시에 이벤트를 처리할 클로저를 전달받습니다.
  • AnonymousObserver는 ObserverBase를 상속하여 onCore 메서드를 구현하고, 전달받은 클로저를 실행합니다.
  • ObserverBase의 on 메서드는 onCore메서드를 호출합니다.
  • Observer는 on 메서드를 통해 Observable이 방출한 Event를 처리합니다.