Chapters

Hide chapters

Combine: Asynchronous Programming with Swift

First Edition · iOS 13 · Swift 5.1 · Xcode 11

Before You Begin

Section 0: 3 chapters
Show chapters Hide chapters

2. Publishers & Subscribers
Written by Scott Gardner

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

Now that you’ve learned some of the basic concepts of Combine, it’s time to jump in and play with two of Combine’s core components — publishers and subscribers.

In this chapter, you’ll review several examples of creating publishers and subscribing to those publishers using subscribers. By doing so, you’ll acquire important skills that you’ll use throughout the rest of this book and beyond.

Getting started

Note: There are starter and final versions of the playgrounds and projects you’ll use in each chapter throughout the book. The starter will be prepared and ready for you to enter the code specified for each example and challenge. You can compare your work with the final version at the end or along the way if you get stuck.

For this chapter, you’ll use an Xcode playground with Combine imported. Open Starter.playground in the projects folder and you’ll see the following:

Open Sources in the Project navigator (View ▸ Navigators ▸ Show Project Navigator and twist down the Combine playground page), and select SupportCode.swift. It contains the following helper function example(of:):

public func example(of description: String,
                    action: () -> Void) {
  print("\n——— Example of:", description, "———")
  action()
}

You’ll use this function to encapsulate some examples you’ll use throughout this book.

However, before you begin playing with those examples, you first need to learn about publishers, subscribers and subscriptions. They form the foundation of Combine and enable you to send and receive data, typically asynchronously.

Hello Publisher

At the heart of Combine is the Publisher protocol. This protocol defines the requirements for a type to be able to transmit a sequence of values over time to one or more subscribers. In other words, a publisher publishes or emits events that can include values of interest.

example(of: "Publisher") {
  // 1
  let myNotification = Notification.Name("MyNotification")

  // 2
  let publisher = NotificationCenter.default
    .publisher(for: myNotification, object: nil)
}
// 3
let center = NotificationCenter.default

// 4
let observer = center.addObserver(
  forName: myNotification,
  object: nil,
  queue: nil) { notification in
    print("Notification received!")
}

// 5
center.post(name: myNotification, object: nil)

// 6
center.removeObserver(observer)
——— Example of: Publisher ———
Notification received!

Hello Subscriber

Subscriber is a protocol that defines the requirements for a type to be able to receive input from a publisher. You’ll dive deeper into conforming to the Publisher and Subscriber protocols shortly; for now, you’ll focus on the basic flow.

example(of: "Subscriber") {
  let myNotification = Notification.Name("MyNotification")

  let publisher = NotificationCenter.default
    .publisher(for: myNotification, object: nil)

  let center = NotificationCenter.default
}

Subscribing with sink(_:_:)

Continue in the previous example and add the following code to it:

// 1
let subscription = publisher
  .sink { _ in
    print("Notification received from a publisher!")
  }

// 2
center.post(name: myNotification, object: nil)
// 3
subscription.cancel()
——— Example of: Publisher ———
Notification received from a publisher!
example(of: "Just") {
  // 1
  let just = Just("Hello world!")
  
  // 2
  _ = just
    .sink(
      receiveCompletion: {
        print("Received completion", $0)
      },
      receiveValue: {
        print("Received value", $0)
    })
}
——— Example of: Just ———
Received value Hello world!
Received completion finished
_ = just
  .sink(
    receiveCompletion: {
      print("Received completion (another)", $0)
    },
    receiveValue: {
      print("Received value (another)", $0)
  })
Received value (another) Hello world!
Received completion (another) finished

Subscribing with assign(to:on:)

In addition to sink, the built-in assign(to:on:) operator enables you to assign the received value to a KVO-compliant property of an object.

example(of: "assign(to:on:)") {
  // 1
  class SomeObject {
    var value: String = "" {
      didSet {
        print(value)
      }
    }
  }
  
  // 2
  let object = SomeObject()
  
  // 3
  let publisher = ["Hello", "world!"].publisher
  
  // 4
  _ = publisher
    .assign(to: \.value, on: object)
}
——— Example of: assign(to:on:) ———
Hello
world!

Hello Cancellable

When a subscriber is done and no longer wants to receive values from a publisher, it’s a good idea to cancel the subscription to free up resources and stop any corresponding activities from occurring, such as network calls.

// 1
center.post(name: myNotification, object: nil)

// 2
subscription.cancel()
——— Example of: Subscriber ———
Notification received from a publisher!

Understanding what’s going on

They say a picture is worth a thousand words, so let’s kick things off with one to explain the interplay between publishers and subscribers:

public protocol Publisher {
  // 1
  associatedtype Output

  // 2
  associatedtype Failure : Error

  // 4
  func receive<S>(subscriber: S)
    where S: Subscriber,
    Self.Failure == S.Failure,
    Self.Output == S.Input
}

extension Publisher {
  // 3
  public func subscribe<S>(_ subscriber: S)
    where S : Subscriber,
    Self.Failure == S.Failure,
    Self.Output == S.Input
}
public protocol Subscriber: CustomCombineIdentifierConvertible {
  // 1
  associatedtype Input

  // 2
  associatedtype Failure: Error

  // 3
  func receive(subscription: Subscription)

  // 4
  func receive(_ input: Self.Input) -> Subscribers.Demand

  // 5
  func receive(completion: Subscribers.Completion<Self.Failure>)
}
public protocol Subscription: Cancellable, CustomCombineIdentifierConvertible {
  func request(_ demand: Subscribers.Demand)
}

Creating a custom subscriber

Time to put what you just learned to practice. Add this new example to your playground:

example(of: "Custom Subscriber") {
  // 1
  let publisher = (1...6).publisher
  
  // 2
  final class IntSubscriber: Subscriber {
    // 3
    typealias Input = Int
    typealias Failure = Never

    // 4
    func receive(subscription: Subscription) {
      subscription.request(.max(3))
    }
    
    // 5
    func receive(_ input: Int) -> Subscribers.Demand {
      print("Received value", input)
      return .none
    }
    
    // 6
    func receive(completion: Subscribers.Completion<Never>) {
      print("Received completion", completion)
    }
  }
}
let subscriber = IntSubscriber()

publisher.subscribe(subscriber)
——— Example of: Custom Subscriber ———
Received value 1
Received value 2
Received value 3
func receive(_ input: Int) -> Subscribers.Demand {
  print("Received value", input)
  return .unlimited
}
——— Example of: Custom Subscriber ———
Received value 1
Received value 2
Received value 3
Received value 4
Received value 5
Received value 6
Received completion finished
let publisher = (1...6).publisher
let publisher = ["A", "B", "C", "D", "E", "F"].publisher

Hello Future

Much like you can use Just to create a publisher that emits a single value to a subscriber and then complete, a Future can be used to asynchronously produce a single result and then complete. Add this new example to your playground:

example(of: "Future") {
  func futureIncrement(
    integer: Int,
    afterDelay delay: TimeInterval) -> Future<Int, Never> {

  }
}
Future<Int, Never> { promise in
  DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
    promise(.success(integer + 1))
  }
}
final public class Future<Output, Failure> : Publisher
  where Failure: Error {
  public typealias Promise = (Result<Output, Failure>) -> Void
  ...
}
// 1
let future = futureIncrement(integer: 1, afterDelay: 3)

// 2
future
  .sink(receiveCompletion: { print($0) },
        receiveValue: { print($0) })
  .store(in: &subscriptions)
——— Example of: Future ———
2
finished
future
  .sink(receiveCompletion: { print("Second", $0) },
        receiveValue: { print("Second", $0) })
  .store(in: &subscriptions)
print("Original")
——— Example of: Future ———
Original
2
finished
Second 2
Second finished

Hello Subject

You’ve already learned how to work with publishers and subscribers, and even how to create your own custom subscribers. Later in the book, you’ll learn how to create custom publishers. For now, though, there’s just a couple more things standing between you and a well-deserved <insert your favorite beverage> break. First is a subject.

example(of: "PassthroughSubject") {
  // 1
  enum MyError: Error {
    case test
  }
  
  // 2
  final class StringSubscriber: Subscriber {
    typealias Input = String
    typealias Failure = MyError
    
    func receive(subscription: Subscription) {
      subscription.request(.max(2))
    }
    
    func receive(_ input: String) -> Subscribers.Demand {
      print("Received value", input)
      // 3
      return input == "World" ? .max(1) : .none
    }
    
    func receive(completion: Subscribers.Completion<MyError>) {
      print("Received completion", completion)
    }
  }
  
  // 4
  let subscriber = StringSubscriber()
}
// 5
let subject = PassthroughSubject<String, MyError>()

// 6
subject.subscribe(subscriber)

// 7
let subscription = subject
  .sink(
    receiveCompletion: { completion in
      print("Received completion (sink)", completion)
    },
    receiveValue: { value in
      print("Received value (sink)", value)
    }
  )
subject.send("Hello")
subject.send("World")
——— Example of: PassthroughSubject ———
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
// 8
subscription.cancel()

// 9
subject.send("Still there?")
——— Example of: PassthroughSubject ———
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
Received value Still there?
subject.send(completion: .finished)
subject.send("How about another one?")
——— Example of: PassthroughSubject ———
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
Received value Still there?
Received completion finished
subject.send(completion: .failure(MyError.test))
——— Example of: PassthroughSubject ———
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
Received value Still there?
Received completion failure(...MyError.test)
example(of: "CurrentValueSubject") {
  // 1
  var subscriptions = Set<AnyCancellable>()
  
  // 2
  let subject = CurrentValueSubject<Int, Never>(0)
  
  // 3
  subject
    .sink(receiveValue: { print($0) })
    .store(in: &subscriptions) // 4
}
——— Example of: CurrentValueSubject ———
0
subject.send(1)
subject.send(2)
1
2
print(subject.value)
subject.value = 3
print(subject.value)
subject
  .sink(receiveValue: { print("Second subscription:", $0) })
  .store(in: &subscriptions)
subject
  .print()
  .sink...
——— Example of: CurrentValueSubject ———
receive subscription: (CurrentValueSubject)
request unlimited
receive value: (0)
0
receive value: (1)
1
receive value: (2)
2
2
receive value: (3)
3
3
receive subscription: (CurrentValueSubject)
request unlimited
receive value: (3)
Second subscription: 3
receive cancel
receive cancel
subject.value = .finished
subject.send(completion: .finished)
receive finished
receive finished

Dynamically adjusting demand

You learned earlier that adjusting demand in Subscriber.receive(_:) is additive. You’re now ready to take a closer look at how that works in a more elaborate example. Add this new example to the playground:

example(of: "Dynamically adjusting Demand") {
  final class IntSubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never
    
    func receive(subscription: Subscription) {
      subscription.request(.max(2))
    }
    
    func receive(_ input: Int) -> Subscribers.Demand {
      print("Received value", input)
      
      switch input {
      case 1:
        return .max(2) // 1
      case 3:
        return .max(1) // 2
      default:
        return .none // 3
      }
    }
    
    func receive(completion: Subscribers.Completion<Never>) {
      print("Received completion", completion)
    }
  }
  
  let subscriber = IntSubscriber()
  
  let subject = PassthroughSubject<Int, Never>()
  
  subject.subscribe(subscriber)
  
  subject.send(1)
  subject.send(2)
  subject.send(3)
  subject.send(4)
  subject.send(5)
  subject.send(6)
}
——— Example of: Dynamically adjusting Demand ———
Received value 1
Received value 2
Received value 3
Received value 4
Received value 5

Type erasure

There will be times when you want to let subscribers subscribe to receive events from a publisher without being able to access additional details about that publisher.

example(of: "Type erasure") {
  // 1
  let subject = PassthroughSubject<Int, Never>()
  
  // 2
  let publisher = subject.eraseToAnyPublisher()
  
  // 3
  publisher
    .sink(receiveValue: { print($0) })
    .store(in: &subscriptions)
  
  // 4
  subject.send(0)
}
publisher.send(1)

Challenge

Completing challenges helps drive home what you learned in the chapter. There are starter and final versions of the challenge in the exercise files download.

Challenge: Create a Blackjack card dealer

Open Starter.playground in the challenge folder, and twist down the playground page and Sources in the Project navigator. Select SupportCode.swift.

if case let .failure(error) = $0 {
  print(error)
}

Solution

How’d you do?

// Add code to update dealtHand here
if hand.points > 21 {
  dealtHand.send(completion: .failure(.busted))
} else {
  dealtHand.send(hand)
}
_ = dealtHand
  .sink(receiveCompletion: {
    if case let .failure(error) = $0 {
      print(error)
    }
  }, receiveValue: { hand in
    print(hand.cardString, "for", hand.points, "points")
  })
——— Example of: Create a Blackjack card dealer ———
🃕🃆🃍 for 21 points

Key points

  • Publishers transmit a sequence of values over time to one or more subscribers, either synchronously or asynchronously.
  • A subscriber can subscribe to a publisher to receive values; however, the subscriber’s input and failure types must match the publisher’s output and failure types.
  • There are two built-in operators you can use to subscribe to publishers: sink(_:_:) and assign(to:on:).
  • A subscriber may increase the demand for values each time it receives a value, but it cannot decrease demand.
  • To free up resources and prevent unwanted side effects, cancel each subscription when you’re done.
  • You can also store a subscription in an instance or collection of AnyCancellable to receive automatic cancelation upon deinitialization.
  • A future can be used to receive a single value asynchronously at a later time.
  • Subjects are publishers that enable outside callers to send multiple values asynchronously to subscribers, with or without a starting value.
  • Type erasure prevents callers from being able to access additional details of the underlying type.
  • Use the print() operator to log all publishing events to the console and see what’s going on.

Where to go from here?

Congratulations! You’ve taken a huge step forward by completing this chapter. You learned how to work with publishers to send values and completion events, and how to use subscribers to receive those values and events. Up next, you’ll learn how to manipulate the values coming from a publisher to help filter, transform or combine them.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now