Git Product home page Git Product logo

Comments (12)

dkhalanskyjb avatar dkhalanskyjb commented on June 11, 2024

Interestingly, for me, this test crashes on the JVM as well.

from kotlinx.coroutines.

qwwdfsad avatar qwwdfsad commented on June 11, 2024

It also crashes for me on the JVM. On Android, it's either timing (so Ignored exception is printed) or slightly different executor/dispatcher setup.

The cause is straightforward -- there are active coroutines running on the executor that is concurrently closed.
So, any attempt to (re)dispatch any coroutine on that dispatcher fails. Moreover, often there is no place in the code (or this place might be unrelated) to rethrow such an exception, thus an error. It cannot be reasonably ignored -- it indicates something went really wrong (e.g. it might imply that your finally blocks in suspend functions didn't execute).

So I won't triage it as a bug. We can document it better and maybe (but it's really debatable topic) treat it not as an internal error, but instead raise global exception handler immediately (which, on Android, will crash the app)

from kotlinx.coroutines.

dkhalanskyjb avatar dkhalanskyjb commented on June 11, 2024

So I won't triage it as a bug.

The error message clearly states that any such error is worth reporting to us:

kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation(DispatchedContinuation[DarwinGlobalQueueDispatcher@53c0308, Continuation @ 6]){Completed}@79400a0. Please read KDoc to 'handleFatalException' method and report this incident to maintainers

So, I don't think we can ignore it when people do just that.

Also, I think this actually is an issue on our side: since there is no guarantee that dispatch doesn't throw, the places where it throws shouldn't be treated as surprises (that is, internal errors).

from kotlinx.coroutines.

qwwdfsad avatar qwwdfsad commented on June 11, 2024

True, we have to acknowledge that.

since there is no guarantee that dispatch doesn't throw

The documentation (somewhat vaguely, though) states the following:

This method should generally be exception-safe. An exception thrown from this method may leave the coroutines that use this dispatcher in an inconsistent and hard-to-debug state.

We can re-visit the corresponding places again, though there are not that many alternatives -- internal error or immediate crash (handleCoroutineException, which is also invoked by the internal error path).
We have another place like this -- ThreadContextElement (the documentation to handleFatalException mentions it)

from kotlinx.coroutines.

rusmonster avatar rusmonster commented on June 11, 2024

It cannot be reasonably ignored -- it indicates something went really wrong (e.g. it might imply that your finally blocks in suspend functions didn't execute).

So what's correct way to close the dispatcher?

My real use case is simple: I'm implementing a client which setup websocket connection to backend and creates singleThreadDispatcher.

When an event is received from the websocket - the client parses it on singleThreadDispatcher then updates database withContext(Dispatchers.IO) and emitting an onUpdated event to outside.

Also the client has shutdown() method where it closes the websocket connection and the singleThreadDispatcher.
Obviously shutdown could be called at any moment of time.

So my code looks like:

class Client {
    private val dispatcher = newSingleThreadContext("MyDispatcher")
    private val scope = CoroutineScope(dispatcher)
    private var websocket: WebSocket? = null

    val onUpdated = MutableSharedFlow<Event>()

    private fun onWebsocketMessage(message: String) {
        scope.launch {
            val event = parseMessage(message)

            withContext(Dispatchers.IO) {
                updateDB(event)
            }

            processEvent(event)
            onUpdated.emit(event)
        }
    }

    fun shutdown() {
        websocket?.close()
        scope.cancel()
        dispatcher.close()
    }
}

Is there a better approach than just maintain collection of ioDispatcherJobs and joinAll them in the shutdown() method?

Like:

class Client {
    private val dispatcher = newSingleThreadContext("MyDispatcher")
    private val scope = CoroutineScope(dispatcher + SupervisorJob())
    private var websocket: WebSocket? = null

    val onUpdated = MutableSharedFlow<Event>()

    private val ioDispatcherJobs = mutableListOf<Job>()

    private fun onWebsocketMessage(message: String) {
        scope.launch {
            val event = parseMessage(message)

            val job = launch(Dispatchers.IO, CoroutineStart.LAZY) {
                updateDB(event)
            }

            ioDispatcherJobs += job
            job.join()
            ioDispatcherJobs -= job

            processEvent(event)
            onUpdated.emit(event)
        }
    }

    suspend fun shutdown() {
        websocket?.close()

        withContext(dispatcher) {
            ioDispatcherJobs.joinAll()
        }

        scope.cancel()
        dispatcher.close()
    }
}

from kotlinx.coroutines.

dkhalanskyjb avatar dkhalanskyjb commented on June 11, 2024

May I suggest not using newSingleThreadContext at all, instead doing Dispatchers.IO.limitedParallelism(1)? This way, you won't need to close the dispatcher at all.

from kotlinx.coroutines.

rusmonster avatar rusmonster commented on June 11, 2024

Correct me if I'm wrong, but I don't see in documentation thatlimitedParallelism(1) guarantees execution on single thread.
It means that I have to synchronize access to data everywhere, for example in my processEvent method:

val allEvents = mutableListOf<Event>()
val mutex = Mutex()

fun processEvent(event: Event) {
    mutex.withLock { 
        allEvents += event
    }    
}

Correct?

Which is exactly what I'm trying to avoid by using newSingleThreadContext

from kotlinx.coroutines.

dkhalanskyjb avatar dkhalanskyjb commented on June 11, 2024

I don't see in documentation that limitedParallelism(1) guarantees execution on single thread.

It doesn't guarantee that this will always run on the same thread, so if you have things like thread local variables, yes, limitedParallelism on its own won't help you; but it does guarantee that the parallelism will be at most 1, or, in other words, at most one thread at a time will execute the code scheduled on that dispatcher. So no, you don't need mutexes: only one thread at a time (though possibly a different one between calls) can call processEvent.

from kotlinx.coroutines.

rusmonster avatar rusmonster commented on June 11, 2024

Thank you, that make sense. Two more questions regarding limitedParallelism(1) then:

  1. Does it guarantees FIFO order of operations?
  2. As it could be executed on different threads I still have to to use atomic vars, otherwise it's not guaranteed that I read last value set to a var by a different thread. Correct?
var eventCounter by atomic(0)

fun processEvent(event: Event) {
  eventCounter++
}

from kotlinx.coroutines.

dkhalanskyjb avatar dkhalanskyjb commented on June 11, 2024
  1. Yes, it stores a queue internally.
  2. No, the happens-before relationship is guaranteed by the coroutines machinery.

from kotlinx.coroutines.

rusmonster avatar rusmonster commented on June 11, 2024

Thank you so much! I'll try to go with limitedParallelism(1)

Regarding initial issue - I would expect the same behaviour on all platforms. So one of possible solution is to make the test crash on Android the same way how it crashes on iOS and JVM.

In this case common code debugged once on android - will work on other platforms without changes.

from kotlinx.coroutines.

dkhalanskyjb avatar dkhalanskyjb commented on June 11, 2024

@rusmonster, could you please explain why you thought that limitedParellelism was unsuitable? @qwwdfsad found this misleading piece of information: https://github.com/KStateMachine/kstatemachine/blob/master/docs/index.md#use-single-threaded-coroutinescope Are there any other ones?

from kotlinx.coroutines.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.