Swift concurrency has a feature called for-await that can iterate over an AsyncSequence. Combine has a .values property that can turn Publishers into an AsyncSequence. This feels like a perfect match! But it is surprisingly subtle and makes it very easy to drop messages if you’re not careful.

Consider the following example (full code is at the end).

        // A NotificationCenter publisher that emits Int as the object.
        // (Yes, this is an abuse of `object`. Hush. I'm making the example simpler.)

        let values = nc.publisher(for: name)
            .compactMap { $0.object as? Int }
            .values

        // Loop over the notifications... right?
        for await value in values {
            // At this point, nothing is "subscribed" to values, so messages will be dropped until the next loop begins.

            // ... Process notification ...
        }

This feels right, but it’s subtly broken and will drop notifications. AsyncPublisher provides no buffer. If nothing is subscribed, then items will be dropped. This makes sense. Imagine if .values did store all of the elements published until they were consumed. Then if I failed to actually consume it, it would leak memory. (We can argue about the precise meaning of “leak” here, but still, grow memory without bound.) Just creating an AsyncPublisher shouldn’t do that. Nothing else works like that in Combine or in Swift Concurrency. An AsyncStream is a value. You should be able to stick it in a variable to use later without leaking memory. (Ma’moun has made me rethink this. It’s true that this is how it works, but I’m now torn a bit more on whether it should.)

Similarly, the fact that for-await doesn’t create a subscription makes sense. In what way would it do that? That’s not how AsyncSequence works. Its job is to call makeAsyncIterator() and then repeatedly call next(). It doesn’t know about buffering or Subscriber or any of that. And say makeAsyncIterator() could take buffering parameters. Where would they go in the for-await syntax?

The answer to all of this is that you need a buffer, and it’s your job to configure it. If you want an “infinite” buffer (which is what people usually think they want), then it looks like this:

         let values = nc.publisher(for: name)
            .compactMap { $0.object as? Int }
            .buffer(size: .max, prefetch: .byRequest, whenFull: .dropOldest) // <----
            .values

And IMO this probably is the most sensible way to solve this, even if the syntax is a bit verbose. Obviously we could add a bufferedValues(...) extension to make it a little prettier….

BUT….

Yeah, nobody remembers this, even if they’ve heard about it before. .values is just so easy to reach for. And the bug is a subtle race condition that drops messages. And you can’t easily unit test for it. And the compiler probably can’t warn you about it. And this problem exists in any situation where an AsyncSequence “pushes” values, which is basically every observation pattern, even without Combine.

And so I struggle with whether to encourage for-await. Every time you see it, you need to think pretty hard about what’s going on in this specific case. And unfortunately, that’s kind of true of AsyncSequence generally. I’m not sure what to think about this yet. Most of my bigger projects use Combine for these kinds of things currently, and it “just works” including unsubscribing automatically when the AnyCancellable is deinited (another thing that’s easy to mess up with for-await). I just don’t know yet.

ADDENDUM

I strangely forgot to also write about NotificationCenter.notifications(named:), which goes directly from NotificationCenter to AsyncSequence. It’s a good example of the subtlety. It has the same dropped-messages issue:

       // Also drops messages if they come too quickly, but not as many as an unbuffered `.values`.
       let values = nc.notifications(named: name)
            .compactMap { $0.object as? Int }

        for await value in values { ... }

Unlike the Combine version, I don’t know how to fix this one. (Maybe this should be considered a Foundation bug? But maybe it’s just “as designed?”) After experimenting a bit, I believe the buffering policy is .bufferingNewest(8). If more than 8 notifications come in during your processing loop, you’ll miss some. Should you send notifications that fast? Maybe not? I don’t know. But the bugs are definitely subtle if you do.

Here’s the full code to play with:

@MainActor
struct ContentView: View {
    let model = Model()

    var body: some View {
        Button("Send") { model.sendMessage() }
            .onAppear { model.startListening() }
    }
}

@MainActor
class Model {
    var lastSent = 0
    var lastReceived = 0

    let nc = NotificationCenter.default
    let name = Notification.Name("MESSAGE")

    var listenTask: Task<Void, Error>?

    func sendMessage() {
        lastSent += 1
        nc.post(name: name, object: lastSent)
    }

    // Set up an infinite for-await loop, listening to notifications until canceled.
    func startListening() {
        listenTask?.cancel()
        listenTask = Task {
            var lastReceived = 0

            let values = nc.publisher(for: name).values
                .compactMap { $0.object as? Int }

            for await value in values {
                // At this point, nothing is "subscribed" to values, so messages will be dropped.
                let miss = value == lastReceived + 1 ? "" : " (MISS)"
                print("Received: \(value)\(miss)")
                lastReceived = value
                // Sleep to make it easier to see dropped messages.
                try await Task.sleep(for: .milliseconds(500))
            }
        }
    }

    deinit {
        listenTask?.cancel()
    }
}