• Why I struggle to use actors

    Following up on some discussion about why I keep finding myself using Mutex (née OSAllocatedUnfairLock) rather than actors. This is kind of slapped together; eventually I may try to write up a proper blog post, but I wanted to capture it.

    Each of these is a runnable file using swift ..., with embedded commentary.

    // Actor to manage deferring execution on a bunch of things until requested
    // Seems a very nice use for an actor.
    public actor Defer {
        public nonisolated static let shared = Defer()
    
        private var defers: [@Sendable () -> Void] = []
    
        public func addDefer(_ f: @escaping @Sendable () -> Void) { defers.append(f) }
    
        public func execute() {
            for f in defers { f() }
            defers.removeAll()
        }
    }
    // First attempt to use it is not surprisingly wrong. The call to `addDefer` is async because actor.
    public final class DeferUser: Sendable {
        init() {
            // Call to actor-isolated instance method 'addDefer' in a synchronous nonisolated context
            Defer.shared.addDefer { print("Cleanup") }
        }
    }
    

    OK, this makes sense. But making DeferUser.init async puts a lot of restrictions on callers of it. It means there can’t be a shared instance. It can’t be deterministically constructed in AppDelegate or SwiftUI if anything relies on it. And there’s nothing inherently async about constructing it. It’s leaking an implementation detail. “Just make everything async” is not where we should be going in Swift (and is not IMO where we intend to go).


    Try 2. Maybe addDefer could be nonisolated:

    public actor Defer {
        public nonisolated static let shared = Defer()
    
        public private(set) var defers: [@Sendable () -> Void] = []
    
        private func append(_ f: @escaping @Sendable () -> Void) { defers.append(f) }
        public nonisolated func addDefer(_ f: @escaping @Sendable () -> Void) {
            Task { await append(f) }
        }
    
        public func execute() {
            for f in defers { f() }
            defers.removeAll()
        }
    }
    
    // Sure. But now there's a surprising race condition:
    
    func run() async {
        Defer.shared.addDefer { print("Cleanup") }
        // print(await Defer.shared.defers.count) // Uncomment the next line to make this "work"
        await Defer.shared.execute()
    }
    
    await run()
    

    There is no promise that “Cleanup” will be printed until at least ClosureIsolation is available. Task would also need to be updated I believe. This is a serious footgun IMO, and unlikely to be fixed for some time. (For more, see previous Mastodon discussion.)

    In my experiments, this will never print “Cleanup” unless you “pump the runloop” (to mix an old-school metaphor) by tickling some other async property.


    The problem is worse if you move the Task into DeferUser.init

    public actor Defer {
        public nonisolated static let shared = Defer()
    
        public private(set) var defers: [@Sendable () -> Void] = []
    
        public func addDefer(_ f: @escaping @Sendable () -> Void) { defers.append(f) }
    
        public func execute() {
            for f in defers { f() }
            defers.removeAll()
        }
    }
    public final class DeferUser: Sendable {
        init() {
            // Even with future-Swift, I doubt folks will realize this needs to be put on the Defer context.
            Task { await Defer.shared.addDefer { print("Cleanup") } }
        }
    }
    
    // I expect many people will reach for this version, and it's even more unsafe IMO than the last one.
    func run() async {
        _ = DeferUser()
        // print(await Defer.shared.defers.count) // Uncomment the next line to make this "work"
        await Defer.shared.execute()
    }
    
    await run()
    

    This one IMO is worse than the last one because even with “future Swift,” I doubt folks will figure out how to get this Task onto the right context to remove the race condition.


    So even with future Swift, I have to think really hard about this.

    OR….

    I can use a mutex and it all works in a way that I can reason about.

    import os
    
    public final class Defer: Sendable {
        public static let shared = Defer()
    
        private let defers = OSAllocatedUnfairLock<[@Sendable () -> Void]>(initialState: [])
    
        public func addDefer(_ f: @escaping @Sendable () -> Void) { defers.withLock { $0.append(f) } }
    
        // If I want this to be async, it can be by putting the `for` loop in a Task.
        // Or it can be synchronous. The choice is up to me rather than being forced by the actor.
        public func execute() {
            // Yes, this requires some special care to do correctly. But `withLock` points a big arrow to
            // the piece of code I need to think hard about.
            let defers = defers.withLock { defers in
                defer { defers.removeAll() }
                return defers
            }
            for f in defers { f() }
        }
    }
    public final class DeferUser: Sendable {
        init() {
            Defer.shared.addDefer { print("Cleanup") }
        }
    }
    
    // And now it should "just work."
    func run() async {
        _ = DeferUser()
        Defer.shared.execute()
    }
    
    await run()
    
  • Dropped messages in for-await

    Swift concurrency has a feature called for-await that can iterate over an AsyncSequence. Combine has a .values property that can turn Publishers into an AsyncSequence. This feels like a perfect match! But it is surprisingly subtle and makes it very easy to drop messages if you’re not careful.

    Consider the following example (full code is at the end).

            // A NotificationCenter publisher that emits Int as the object.
            // (Yes, this is an abuse of `object`. Hush. I'm making the example simpler.)
    
            let values = nc.publisher(for: name)
                .compactMap { $0.object as? Int }
                .values
    
            // Loop over the notifications... right?
            for await value in values {
                // At this point, nothing is "subscribed" to values, so messages will be dropped until the next loop begins.
    
                // ... Process notification ...
            }
    

    This feels right, but it’s subtly broken and will drop notifications. AsyncPublisher provides no buffer. If nothing is subscribed, then items will be dropped. This makes sense. Imagine if .values did store all of the elements published until they were consumed. Then if I failed to actually consume it, it would leak memory. (We can argue about the precise meaning of “leak” here, but still, grow memory without bound.) Just creating an AsyncPublisher shouldn’t do that. Nothing else works like that in Combine or in Swift Concurrency. An AsyncStream is a value. You should be able to stick it in a variable to use later without leaking memory. (Ma’moun has made me rethink this. It’s true that this is how it works, but I’m now torn a bit more on whether it should.)

    Similarly, the fact that for-await doesn’t create a subscription makes sense. In what way would it do that? That’s not how AsyncSequence works. Its job is to call makeAsyncIterator() and then repeatedly call next(). It doesn’t know about buffering or Subscriber or any of that. And say makeAsyncIterator() could take buffering parameters. Where would they go in the for-await syntax?

    The answer to all of this is that you need a buffer, and it’s your job to configure it. If you want an “infinite” buffer (which is what people usually think they want), then it looks like this:

             let values = nc.publisher(for: name)
                .compactMap { $0.object as? Int }
                .buffer(size: .max, prefetch: .byRequest, whenFull: .dropOldest) // <----
                .values
    

    And IMO this probably is the most sensible way to solve this, even if the syntax is a bit verbose. Obviously we could add a bufferedValues(...) extension to make it a little prettier….

    BUT….

    Yeah, nobody remembers this, even if they’ve heard about it before. .values is just so easy to reach for. And the bug is a subtle race condition that drops messages. And you can’t easily unit test for it. And the compiler probably can’t warn you about it. And this problem exists in any situation where an AsyncSequence “pushes” values, which is basically every observation pattern, even without Combine.

    And so I struggle with whether to encourage for-await. Every time you see it, you need to think pretty hard about what’s going on in this specific case. And unfortunately, that’s kind of true of AsyncSequence generally. I’m not sure what to think about this yet. Most of my bigger projects use Combine for these kinds of things currently, and it “just works” including unsubscribing automatically when the AnyCancellable is deinited (another thing that’s easy to mess up with for-await). I just don’t know yet.

    ADDENDUM

    I strangely forgot to also write about NotificationCenter.notifications(named:), which goes directly from NotificationCenter to AsyncSequence. It’s a good example of the subtlety. It has the same dropped-messages issue:

           // Also drops messages if they come too quickly, but not as many as an unbuffered `.values`.
           let values = nc.notifications(named: name)
                .compactMap { $0.object as? Int }
    
            for await value in values { ... }
    

    Unlike the Combine version, I don’t know how to fix this one. (Maybe this should be considered a Foundation bug? But maybe it’s just “as designed?”) After experimenting a bit, I believe the buffering policy is .bufferingNewest(8). If more than 8 notifications come in during your processing loop, you’ll miss some. Should you send notifications that fast? Maybe not? I don’t know. But the bugs are definitely subtle if you do.

    Here’s the full code to play with:

    @MainActor
    struct ContentView: View {
        let model = Model()
    
        var body: some View {
            Button("Send") { model.sendMessage() }
                .onAppear { model.startListening() }
        }
    }
    
    @MainActor
    class Model {
        var lastSent = 0
        var lastReceived = 0
    
        let nc = NotificationCenter.default
        let name = Notification.Name("MESSAGE")
    
        var listenTask: Task<Void, Error>?
    
        func sendMessage() {
            lastSent += 1
            nc.post(name: name, object: lastSent)
        }
    
        // Set up an infinite for-await loop, listening to notifications until canceled.
        func startListening() {
            listenTask?.cancel()
            listenTask = Task {
                var lastReceived = 0
    
                let values = nc.publisher(for: name).values
                    .compactMap { $0.object as? Int }
    
                for await value in values {
                    // At this point, nothing is "subscribed" to values, so messages will be dropped.
                    let miss = value == lastReceived + 1 ? "" : " (MISS)"
                    print("Received: \(value)\(miss)")
                    lastReceived = value
                    // Sleep to make it easier to see dropped messages.
                    try await Task.sleep(for: .milliseconds(500))
                }
            }
        }
    
        deinit {
            listenTask?.cancel()
        }
    }
    

subscribe via RSS