[Combine] Resource Management

내,,소중한,,리소스인데,,! 리소스 관리!

naljin
12 min readMay 7, 2020

Combine은 리소스를 관리할 수 있는 두가지 연산자를 제공합니다! 바로 바로..!!! share()multicast(_:) 인데요!

앗, 뭔가 익숙하지 않나요? 사실 multicast는 이전 Networking 포스팅에서 한번 등장했던 연산자예요. 네트워크 요청은 여러개의 subscriber들에게 하나의 자원에서 나오는 결과(publisher가 방출하는 값)를 공유하는 것이 복제하는 것보다 더 좋다고 설명했던것… 기억하시나요?! 기억이 안나도 괜찮아요! 저도 그러니까요 허허.. 다시 하나씩 알아보도록 합시다.

share

이 연산자의 목적은 보다 참조를 통해서 publisher를 얻을 수 있도록 하는 것입니다. 원래 Publisher는 대개 Struct 타입이어서 값 복사가 되기 때문이죠.

하지만!! share 연산자를 사용하면 업스트림 publisher를 공유하는 Publishers.Share class 인스턴스를 반환하게 됩니다.

해당 Publisher는 아래와 같이 동작합니다.

  1. 첫번째 구독 요청이 오면
  2. 먼저 업스트림 publisher를 구독한 후에
  3. 업스트림에서 받은 값을 모든 subscriber에게 전달합니다

Note : subscriber는 구독 요청을 한 이후부터 들어오는 값부터 받을 수 있습니다. 업스트림 publisher가 완료된 후 공유 publisher를 구독한다면 , 새 subscriber는 완료 이벤트만 수신하게 됩니다.

자, 네트워크 요청 상황을 가정해봅시다. 여러 구독자가 여러 번 요청하지 않고 결과를 수신하도록 하려는 경우, 코드는 아래와 같을거예요.

let shared = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://medium.com/@rkdthd0403")!)
.map(\.data)
.print("shared")
.share()
print("subscribing first")//1. 첫번째 구독은 "작업"을 trigger 합니다. (여기서는 "작업"은 네트워크 요청 수행)
let subscription1 = shared.sink(
receiveCompletion: { _ in },
receiveValue: { print("subscription1 received: '\($0)'") }
)
print("subscribing second")//2. 두번째 구독은 단순히 값을 받습니다. 첫번째 구독자와 같은 값을 받게 되겠죠?
let subscription2 = shared.sink(
receiveCompletion: { _ in },
receiveValue: { print("subscription2 received: '\($0)'") }
)

콘솔에는 다음과 같은 결과가 출력됩니다.

subscribing first
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
subscribing second
shared: receive value: (153217 bytes)
subscription1 received: '153217 bytes'
subscription2 received: '153217 bytes'

shared: receive finished

print 연산자를 사용한 결과 다음과 같은 것들을 확인할 수 있었습니다.

  • 첫번째 구독이 DataTaskPublisher 구독을 트리거합니다.
  • 두 번째 구독은 아무것도 바꾸지 않습니다. 즉 요청이 발생하지 않습니다. publisher는 그저 계속 동작합니다.
  • 요청이 완료되면 publisher는 두 구독자에게 결과 데이터를 내보낸 후 완료합니다.

요청이 한 번만 전송되는지 확인하기 위해 share() 라인을 주석 처리하면 아래와 유사한 출력 결과가 나옵니다.

subscribing first
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
subscribing second
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
shared: receive value: (153217 bytes)
subscription1 received: '153217 bytes'
shared: receive finished
shared: receive value: (153217 bytes)
subscription2 received: '153217 bytes'
shared: receive finished

쨘!!! DataTaskPublisher가 두 개의 구독을 받고 있죠? 이 경우는 요청을 두 번 수행하게 됩니다.

하지만 share 연산자에도 문제는 있습니다. 첫번째 요청이 완료된 후 두 번째 구독자가 온다면 어떻게 될까요? 위의 Note 를 한 번 더 가져와 볼게요!

Note : subscriber는 구독 요청을 한 이후부터 들어오는 값부터 받을 수 있습니다. 업스트림 publisher가 완료된 후 공유 publisher를 구독한다면 , 새 subscriber는 완료 이벤트만 수신하게 됩니다.

따라서 두 번째 구독자는 완료 이벤트만 수신하게 될거예요. 코드를 통해 볼까요? 위의 코드랑 달라지는건 두번째 구독이 5초 후에 이뤄진다는 것 밖에 없어요.

let shared = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://medium.com/@rkdthd0403")!)
.map(\.data)
.print("shared")
.share()
print("subscribing first")let subscription1 = shared.sink(
receiveCompletion: { _ in },
receiveValue: { print("subscription1 received: '\($0)'") }
)
print("subscribing second")var subscription2: AnyCancellable? = nil//DispatchQueue를 이용해서 5초 이후에 구독이 이뤄지는 상황을 시뮬레이션합니다
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
print("subscribing second")
subscription2 = shared.sink(
receiveCompletion: { print("subscription2 completion \($0)") },
receiveValue: { print("subscription2 received: '\($0)'") }
)
}

콘솔에는 다음과 같이 찍힙니다.

subscribing first
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
shared: receive value: (153217 bytes)
subscription1 received: '153217 bytes'
shared: receive finished
subscribing second
subscription2 completion finished

흠.. 역시 subscription2는 완료 이벤트만 받고있네요. 확실히 저희가 원하던 결과는 아니죠? 과연 두 구독 모두 요청 결과를 받도록 하려면 어떻게 해야 해야 할까요?multicast 에 대해 알아봅시다!

multicast

우리가 위에서 원하던 상황은 이러합니다..

  1. publisher에 대한 하나의 구독을 공유하고
  2. 업스트림 publisher가 완료된 후에도 새 구독자에게 값을 재생

RxSwift에서는 shareReplay 연산자를 쓰면 되나보더라고요…? 하지만 이 연산자는 Combine에 없어여… 젠장!

느이 Combine엔 shareReplay 없지?

대신 Networking 포스팅에서 보았듯이 이런 상황에서 우리는 multicast(_:) 연산자를 사용했습니다. 이 연산자의 특징은 ConnectablePublisher를 반환한다는 것이죠! ConnectablePublisherconnect()를 호출하기 전까지는 업스트림 publisher를 구독하지 않습니다! 따라서 먼저 필요한 구독자를 모두 설정하고 난 뒤 connect()를 호출해서 작업을 시작할 수 있겠죠?

코드를 봅시다!

// 1. 업스트림 publisher가 방출하는 값과 완료 이벤트를 전달할 subject를 준비합니다.
let subject = PassthroughSubject<Data, URLError>()
// 2. 위의 subject를 사용하여 멀티캐스트 publisher를 준비합니다.
let multicasted = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://medium.com/@rkdthd0403")!)
.map(\.data)
.print("shared")
.multicast(subject: subject)
// 3. 공유(멀티캐스트) 된 publisher를 구독합니다.
let subscription1 = multicasted
.sink(
receiveCompletion: { _ in },
receiveValue: { print("subscription1 received: '\($0)'") }
)
let subscription2 = multicasted
.sink(
receiveCompletion: { _ in },
receiveValue: { print("subscription2 received: '\($0)'") }
)
// 4. publisher(multicasted)에게 업스트림 publisher에 연결하도록 합니다.
multicasted.connect()
// 5. 두 구독 모두 데이터를 수신하는지 테스트하기 위해 빈 데이터 전송합니다.
subject.send(Data())

콘솔에는 아래와 같이 찍힐거예요. 요청은 한번만, 데이터는 두 구독자 모두에게!

subscribing first
shared: receive subscription: (DataTaskPublisher)
shared: request unlimited
subscription1 received: '0 bytes'
subscription2 received: '0 bytes'

shared: receive cancel

Note: 모든 ConnectablePublishers와 마찬가지로 multicast publisher는 share()처럼 동작하도록 하는 autoconnect() 메소드를 제공합니다. 바로 처음 구독이 들어오면 업스트림 publisher에 연결되어 바로 작업을 시작하는 것이죠!

이는 업스트림 publisher가 단일 값을 방출하고 CurrentValueSubject를 사용하여 구독자와 공유할 수 있는 시나리오에서 유용하다고 하는데…. 저는 감이 잘 안오네요 🤔 실제 프로젝트를 통해 사용해봐야 알것 같아욥

어쨌거나 저쨌거나~~~ 네트워킹과 같이 resource-heavy한 프로세스에서 구독 작업을 공유하는 것은 거의 필수 사항이라고 볼 수 있겠죠? 안그러면 메모리 문제뿐만 아니라 서버에 불필요한 네트워크 요청이 쇄도할 수 있으니까요 ~.~

Future

Combine은 계산(computation) 결과를 공유할 수 있는 방법을 한 가지 더 제공하는데요! 바로 Future입니다. FuturePromise 전달 인자를 받는 closure를 통해 생성합니다. 이런식으로요!

Future<Int, Never> { promise in
.....
}
}

성공하든 실패하든 상관없이 결과를 얻을 때 promise를 시행합니다.

let future = Future<Int, Error> { fulfill in
do {
let result = try performSomeWork() //어떤 작업을 수행하려고 시도
fulfill(.success(result)) //계산 결과로 즉시 promise 시행
} catch {
fulfill(.failure(error)) //실패 시 던져진 오류로 즉시 promise 시행
}
}

리소스 관점에서 흥미로운 점은 다음과 같습니다.

  • Future 은 class 입니다.
  • 생성 즉시 closure를 호출하여 결과 계산을 시작하고 가능한 한 빨리 promise를 시행합니다.
  • 시행된 Promise의 결과를 저장하고 현재와 미래의 구독자에게 전달합니다.

즉 Future는

  1. 구독을 기다리지 않고 작업을 즉시 시작할 수 있고
  2. 일을 한 번만 수행하면서
  3. 그 결과를 구독자에게도 전달할 수 있는

편리한 방법입니다. 따라서 Future네트워크 요청이 생성하는 단일 결과를 공유해야 할 때 사용할 수 있는 적절한 후보입니다.

Note: Future를 구독하지 않더라도, Future를 생성하는 것 자체가 closure를 호출하고 일을 수행하도록 합니다.

Key points

  • 구독하는 작업을 공유하는 것은 네트워킹과 같이 자원이 많이 드는 프로세스를 다룰 때 매우 중요합니다.
  • publisher를 여러 구독자와 공유하기만 하면 될 때는 share()를 사용합니다.
  • 업스트림 publisher가 작업을 시작하는 경우나, 구독자에게 값을 전달하는 방법을 세밀하게 제어해야 할 경우 multicast(_:)를 사용합니다.
  • 계산의 단일 결과를 여러 구독자에게 공유하려면 Future를 사용합니다.

이전 포스팅 👈🏻

--

--

No responses yet