A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://speakerdeck.com/elizarov/structured-concurrency below:

Structured Concurrency - Speaker Deck

  • Structured Concurrency Presented at Hydra Distributed Computing Conference, 2019 elizarov

    @ Roman Elizarov relizarov 1

  • Speaker: Roman Elizarov • Professional developer since 2000 • Previously

    developed high-perf trading software @ Devexperts • Teach concurrent & distributed programming @ St. Petersburg ITMO University • Chief judge @ Northern Eurasia Contest / ICPC • Now team lead in Kotlin Libraries @ JetBrains elizarov @ relizarov 2

  • A short story of Kotlin Coroutines design 3
  • Inspired by async/await async Task PostItem(Item item) { var token

    = await RequestToken(); var post = await CreatePost(token, item); ProcessPost(post); } C# async modifier Returns a future 1. awaits a future other async functions return future 4 2. awaits a future

  • Inspired by async/await async Task PostItem(Item item) { var token

    = await RequestToken(); var post = await CreatePost(token, item); ProcessPost(post); } C# 5

  • Inspired by async/await async Task PostItem(Item item) { var token

    = await RequestToken(); var post = await CreatePost(token, item); ProcessPost(post); } C# Suspension points! But what about generators with yield return? 6

  • Kotlin DSL: Initial prototype fun postItem(item: Item) = async {

    val token = await(requestToken()) val post = await(createPost(token, item)) processPost(post) } Kotlin Coroutine builder Coroutine scope 7 Regular function

  • Kotlin DSL: Initial prototype fun postItem(item: Item) = async {

    val token = await(requestToken()) val post = await(createPost(token, item)) processPost(post) } Kotlin Coroutine builder await function Coroutine scope Suspending functions 8 Regular function

  • Kotlin DSL: Initial prototype fun postItem(item: Item) = async {

    val token = await(requestToken()) val post = await(createPost(token, item)) processPost(post) } Kotlin 9

  • Kotlin DSL: Initial prototype fun postItem(item: Item) = async {

    val token = await(requestToken()) val post = await(createPost(token, item)) processPost(post) } Kotlin async / await à future 10

  • Kotlin DSL: Initial prototype fun postItem(item: Item) = generate {

    val token = yield(requestToken()) val post = yield(createPost(token, item)) processPost(post) } Kotlin async / await à future generate / yield à sequence 11

  • Suspending functions? fun postItem(item: Item) = async { val token

    = await(requestToken()) val post = await(createPost(token, item)) processPost(post) } Kotlin 12

  • Suspending functions everywhere! fun postItem(item: Item) = async { val

    token = requestToken() val post = createPost(token, item) processPost(post) } Kotlin Returns a future 13

  • Suspending functions everywhere! suspend fun postItem(item: Item) { val token

    = requestToken() val post = createPost(token, item) processPost(post) } Kotlin Suspending function modifier 14

  • Suspending functions everywhere! suspend fun postItem(item: Item) { val token

    = requestToken() val post = createPost(token, item) processPost(post) } Kotlin 15

  • Suspending functions everywhere! func postItem(item Item) { token := requestToken()

    post := createPost(token, item) processPost(post) } Go 16

  • Prototyping libraries DSL for concurrency 17
  • 18 https://tour.golang.org/concurrency/1
  • A Tour of Go Concurrency #1 func say(s string) {

    for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { go say("world") say("hello") } Go https://tour.golang.org/concurrency/1 19

  • A Tour of Go Concurrency #1 func say(s string) {

    for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { go say("world") say("hello") } Go 20 https://tour.golang.org/concurrency/1

  • A Tour of Go Concurrency #1 func say(s string) {

    for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { go say("world") say("hello") } Go 21 https://tour.golang.org/concurrency/1

  • A DSL for Concurrency: Prototype suspend fun say(s: String) {

    for (i in 0..4) { delay(100) println(s) } } fun main() = mainBlocking { go { say("world") } say("hello") } Kotlin 22 Suspending function modifier Suspending function

  • A DSL for Concurrency: Prototype suspend fun say(s: String) {

    for (i in 0..4) { delay(100) println(s) } } fun main() = mainBlocking { go { say("world") } say("hello") } Kotlin 23 Suspending function modifier Suspending function Coroutine builder Another builder

  • A DSL for Concurrency: Now suspend fun say(s: String) {

    for (i in 0..4) { delay(100) println(s) } } fun main() = runBlocking { launch { say("world") } say("hello") } Kotlin 24

  • A Tour of Go Concurrency #5 func fibonacci(c, quit chan

    int) { x, y := 0, 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } Go https://tour.golang.org/concurrency/5 25

  • A Tour of Go Concurrency #5 func fibonacci(c, quit chan

    int) { x, y := 0, 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } Go https://tour.golang.org/concurrency/5 26

  • A Tour of Go Concurrency #5 func fibonacci(c, quit chan

    int) { x, y := 0, 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } Go https://tour.golang.org/concurrency/5 27

  • A Tour of Go Concurrency #5 func fibonacci(c, quit chan

    int) { x, y := 0, 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } Go https://tour.golang.org/concurrency/5 28

  • A Tour of Go Concurrency #5 func fibonacci(c, quit chan

    int) { x, y := 0, 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } Go https://tour.golang.org/concurrency/5 29

  • A DSL for Concurrency: Prototype suspend fun fib(c: SendChannel<Int>, quit:

    ReceiveChannel<Int>) { var x = 0 var y = 1 whileSelect { c.onSend(x) { val next = x + y x = y y = next true // continue while loop } quit.onReceive { println("quit") false // break while loop } } } Kotlin 30

  • A DSL for Concurrency: Prototype suspend fun fib(c: SendChannel<Int>, quit:

    ReceiveChannel<Int>) { var x = 0 var y = 1 whileSelect { c.onSend(x) { val next = x + y x = y y = next true // continue while loop } quit.onReceive { println("quit") false // break while loop } } } Kotlin 31 Library types

  • A DSL for Concurrency: Prototype suspend fun fib(c: SendChannel<Int>, quit:

    ReceiveChannel<Int>) { var x = 0 var y = 1 whileSelect { c.onSend(x) { val next = x + y x = y y = next true // continue while loop } quit.onReceive { println("quit") false // break while loop } } } Kotlin 32 Library types Select DSL

  • What more to wish? Is concurrency support a solved problem?

    33

  • launchUI { try { // suspend while asynchronously making request

    val result = makeRequest() // display result in UI display(result) } catch (exception: Throwable) { // process exception in UI } } Thread-bound UI programming 34

  • launchUI { try { // suspend while asynchronously making request

    val result = makeRequest() // display result in UI display(result) } catch (exception: Throwable) { // process exception in UI } } Thread-bound UI programming 35

  • launchUI { try { // suspend while asynchronously making request

    val result = makeRequest() // display result in UI display(result) } catch (exception: Throwable) { // process exception in UI } } Thread-bound UI programming 36

  • launchUI { try { // suspend while asynchronously making request

    val result = makeRequest() // display result in UI display(result) } catch (exception: Throwable) { // process exception in UI } } Thread-bound UI programming 37

  • launch(UI) { try { // suspend while asynchronously making request

    val result = makeRequest() // display result in UI display(result) } catch (exception: Throwable) { // process exception in UI } } Thread-bound UI programming 38 Coroutine context

  • launch(UI) { try { // suspend while asynchronously making request

    val result = makeRequest() // display result in UI run { display(result) } } catch (exception: Throwable) { // process exception in UI } } Thread-bound UI programming 39 Higher-order function Coroutine context

  • await higher-order function 40
  • await higher-order function 41 await Run(() => { Display(result); });

    C# Lambda

  • await higher-order function 42 var task = Run(() => {

    Display(result); }); await task; C# 1. Function called first 2. Then await

  • await higher-order function 43 var task = Run(() => {

    Display(result); }); await task; C# 1. Function called first 2. Then await A call to regular function

  • await higher-order function 44 run { display(result) } Kotlin Context

    is passed along the suspend callstack A call to suspending function

  • Cancellation The stumbling block of concurrency design 45
  • A Tour of Go Concurrency func fibonacci(c, quit chan int)

    { x, y := 0, 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } Go https://tour.golang.org/concurrency/5 46 A CancellationToken

  • A Tour of Go Concurrency func fibonacci(c, quit chan int)

    { x, y := 0, 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } Go https://tour.golang.org/concurrency/5 47 A CancellationToken A boilerplate

  • 48 quit chan
  • Pervasive cancellation context? 49 type Context interface { // …

    } Go https://golang.org/pkg/context/

  • Pervasive cancellation context? 50 type Context interface { // …

    // Done returns a channel that's closed when work done on // behalf of this context should be canceled. … Done() <-chan struct{} } Go https://golang.org/pkg/context/ At Google, we require that Go programmers pass a Context parameter as the first argument to every function on the call path between incoming and outgoing requests.

  • Lifetime prototype 51
  • Lifetime prototype 52 interface Lifetime : CoroutineContext.Element { // …

    } Kotlin

  • Lifetime prototype 53 interface Lifetime : CoroutineContext.Element { fun cancel(reason:

    Throwable? = null): Boolean } Kotlin

  • Lifetime prototype 54 interface Lifetime : CoroutineContext.Element { fun cancel(reason:

    Throwable? = null): Boolean fun onCompletion(handler: CompletionHandler): Registration } Kotlin

  • Job prototype 55 interface Job : CoroutineContext.Element { fun cancel(reason:

    Throwable? = null): Boolean fun onCompletion(handler: CompletionHandler): Registration } Kotlin

  • Job prototype: explicit cancel 56 val job = launch {

    say("world") } Kotlin

  • Job prototype: explicit cancel 57 val job = launch {

    say("world") } job.cancel() Kotlin

  • Job prototype: explicit cancel 58 val job = launch {

    say("world") } job.cancel() Kotlin delay(…) throw CancellationException()

  • Job prototype: higher-order operators 59 withTimeout(duration) { doSomething() } Kotlin

    job1 job2 Prototype worked like a charm

  • Children coroutines Nesting concurrent computations 60
  • Concurrent decomposition 61 val job1 = launch { say("hello") }

    val job2 = launch { say("world") } Kotlin These jobs are resources

  • Concurrent decomposition 62 val job1 = launch { say("hello") }

    val job2 = launch { say("world") } val jobs = CompositeJob() Kotlin

  • Concurrent decomposition 63 val job1 = launch { say("hello") }

    val job2 = launch { say("world") } val jobs = CompositeJob() jobs.add(job1) jobs.add(job2) Kotlin

  • Concurrent decomposition 64 val job1 = launch { say("hello") }

    val job2 = launch { say("world") } val jobs = CompositeJob() jobs.add(job1) jobs.add(job2) jobs.cancel() Kotlin

  • Job prototype Lifetime pattern 65 interface Job : CoroutineContext.Element {

    fun cancel(reason: Throwable? = null): Boolean fun onCompletion(handler: CompletionHandler): Registration } Kotlin

  • Concurrent decomposition 66 val jobs = CompositeJob() Kotlin
  • Concurrent decomposition 67 val job = Job() Kotlin
  • Concurrent decomposition 68 val job = Job() launch(job) { say("hello")

    } launch(job) { say("world") } Kotlin Job is a coroutine context!

  • Concurrent decomposition 69 val job = Job() launch(job) { say("hello")

    } launch(job) { say("world") } job.cancel() Kotlin

  • Concurrent decomposition 70 val job = Job() launch(job) { say("hello")

    } launch(job) { say("world") } job.cancel() Kotlin !

  • Concurrent decomposition 71 val job = Job() launch(job) { say("hello")

    } launch(job) { say("world") } job.cancel() Kotlin Job becomes a CancellationToken !

  • Context propagation 72 suspend fun doSomething() { } Kotlin
  • Context propagation 73 suspend fun doSomething() { launch(coroutineContext) { say("hello")

    } launch(coroutineContext) { say("world") } } Kotlin !

  • Context propagation 74 val job = launch { launch(coroutineContext) {

    say("hello") } launch(coroutineContext) { say("world") } } Kotlin ! !

  • Context propagation 75 val job = launch(UI) { launch(coroutineContext) {

    say("hello") } launch(coroutineContext) { say("world") } } Kotlin ! ! !

  • Error propagation? 76 val job = launch { launch(coroutineContext) {

    say("hello") } launch(coroutineContext) { say("world") } } Kotlin !

  • Error propagation 77 val job = launch { say("hello") say("world")

    } Kotlin It can fail

  • Error propagation 78 val job = launch { say("hello") say("world")

    } Kotlin It can fail

  • Error propagation 79 val job = launch { launch(coroutineContext) {

    say("hello") } launch(coroutineContext) { say("world") } } Kotlin It can fail It can fail They can fail concurrently Success or failure of this composite job can be known only when all children complete cancel(ex) cancel(ex)

  • Job: the real thing 80 https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
  • Scope 81 val job = launch { launch(coroutineContext) { say("hello")

    } launch(coroutineContext) { say("world") } } Kotlin

  • Abstracting concurrent decomposition 82 val job = launch { launch(coroutineContext)

    { say("hello") } launch(coroutineContext) { say("world") } } Kotlin

  • Abstracting concurrent decomposition 83 val job = launch { sayHelloWorld()

    } suspend fun sayHelloWorld() { launch(coroutineContext) { say("hello") } launch(coroutineContext) { say("world") } } Kotlin

  • Abstracting concurrent decomposition 84 val job = launch { sayHelloWorld()

    } suspend fun sayHelloWorld() { launch(coroutineContext) { say("hello") } launch(coroutineContext) { say("world") } } Kotlin It can fail

  • Abstracting concurrent decomposition 85 val job = launch { sayHelloWorld()

    } suspend fun sayHelloWorld() { launch(coroutineContext) { say("hello") } launch(coroutineContext) { say("world") } } Kotlin It can fail But call returns normally! !

  • Scoping concurrency: prototype 86 val job = launch { sayHelloWorld()

    } suspend fun sayHelloWorld() { withScope { // new job in the context launch(coroutineContext) { say("hello") } launch(coroutineContext) { say("world") } } } Kotlin Throws exception on failure Encapsulated concurrent decomposition

  • Scoping concurrency: prototype 87 val job = launch { sayHelloWorld()

    } suspend fun sayHelloWorld() { withScope { launch(coroutineContext) { say("hello") } launch(coroutineContext) { say("world") } } } Kotlin

  • Scoping concurrency: prototype problems 88 val job = launch {

    sayHelloWorld() } suspend fun sayHelloWorld() { withScope { launch(coroutineContext) { say("hello") } launch(coroutineContext) { say("world") } } } Kotlin Error-pone

  • Scoping concurrency: prototype problems 89 val job = launch {

    sayHelloWorld() } suspend fun sayHelloWorld() { withScope { launch(coroutineContext) { say("hello") } launch(coroutineContext) { say("world") } } } Kotlin Error-pone Verbose

  • Scoping concurrency: solution prototype 90 val job = launch {

    sayHelloWorld() } suspend fun sayHelloWorld() { withScope { launch { say("hello") } launch { say("world") } } } Kotlin Extension function

  • Scoping concurrency: solution prototype 91 val job = launch {

    sayHelloWorld() } suspend fun sayHelloWorld() { withScope { // this: CoroutineScope launch { say("hello") } launch { say("world") } } } Kotlin Extension function

  • Scoping concurrency: solution 92 val job = launch { sayHelloWorld()

    } suspend fun sayHelloWorld() { coroutineScope { // this: CoroutineScope launch { say("hello") } launch { say("world") } } } Kotlin Extension function

  • Scoping concurrency: solution 93 val job = launch { sayHelloWorld()

    } suspend fun sayHelloWorld() { coroutineScope { // this: CoroutineScope launch { say("hello") } launch { say("world") } } } Kotlin !

  • Scoping concurrency: solution 94 suspend fun sayHelloWorld() { coroutineScope {

    // this: CoroutineScope for (w in listOf("hello", "world")) { launch { say(w) } } } }

  • Scoping concurrency: solution 95 suspend fun sayHelloWorld() { coroutineScope {

    // this: CoroutineScope for (w in listOf("hello", "world")) { launch { say(w) } } } }

  • Scoping concurrency: solution 96 suspend fun sayHelloWorld() { coroutineScope {

    // this: CoroutineScope for (w in listOf("hello", "world")) { launchSay(w) } } } fun CoroutineScope.launchSay(w: String) = launch { say(w) }

  • Scoping concurrency: solution 97 suspend fun sayHelloWorld() { coroutineScope {

    // this: CoroutineScope for (w in listOf("hello", "world")) { launchSay(w) } } } fun CoroutineScope.launchSay(w: String) = launch { say(w) }

  • Scoping concurrency: solution 98 suspend fun sayHelloWorld() { coroutineScope {

    // this: CoroutineScope for (w in listOf("hello", "world")) { launchSay(w) } } } fun CoroutineScope.launchSay(w: String) = launch { say(w) } But name for all of it?

  • 99 https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
  • 100
  • Control flow with goto 101 https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
  • Structured Programming 102 https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
  • Control flow with go 103 https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
  • Structured Concurrency 104 https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
  • Structured Concurrency 105 coroutineScope { } launch { … }

    https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/

  • Structured Concurrency Parent always waits for children completion • Resource

    cleanup • Never loose a working coroutine • Error propagation • Never loose an exception 106

  • Structured concurrency everywhere? Similar problems, similar solutions 107
  • Structured concurrency everywhere 108
  • Structured concurrency everywhere 109 async with trio.open_nursery() as nursery: //

    … Python https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ CoroutineScope / Job

  • Structured concurrency everywhere 110 async with trio.open_nursery() as nursery: while

    True: incoming_connection = await server_socket.accept() nursery.start_soon(connection_handler, incoming_connection) Python https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ launch

  • Structured concurrency everywhere 111
  • Structured concurrency everywhere 112 ServerSocket listener = ... try (var

    scope = FiberScope.cancellable()) { while (...) { Socket s = listener.accept(); scope.schedule(() -> handle(s)); } } Java https://trio.discourse.group/t/project-loom-lightweight-concurrency-for-the-jvm/97 CoroutineScope / Job launch

  • Structured concurrency everywhere 113
  • Structured concurrency everywhere 114 var g errgroup.Group Go https://godoc.org/golang.org/x/sync/errgroup CoroutineScope

    / Job

  • Structured concurrency everywhere 115 var g errgroup.Group for _, url

    := range urls { // Launch a goroutine to fetch the URL. url := url // https://golang.org/doc/faq#closures_and_goroutines g.Go(func() error { resp, err := http.Get(url) if err == nil { resp.Body.Close() } return err }) } Go https://godoc.org/golang.org/x/sync/errgroup launch

  • Structured concurrency everywhere 116 var g errgroup.Group for _, url

    := range urls { // Launch a goroutine to fetch the URL. url := url // https://golang.org/doc/faq#closures_and_goroutines g.Go(func() error { resp, err := http.Get(url) if err == nil { resp.Body.Close() } return err }) } // Wait for all HTTP fetches to complete. if err := g.Wait(); err == nil { fmt.Println("Successfully fetched all URLs.") } Go https://godoc.org/golang.org/x/sync/errgroup scope completion

  • Next steps? 117 Get rid of unstructured concurrency launch {

    … }

  • Next steps? 118 Get rid of unstructured concurrency GlobalScope.launch {

    … }

  • 119 https://medium.com/@elizarov/the-reason-to-avoid-globalscope-835337445abc
  • More reading • Coroutines design document https://github.com/Kotlin/KEEP/blob/master/proposals/coroutines.md • Library guide

    https://kotlinlang.org/docs/reference/coroutines/coroutines-guide.html • Library source and issues https://github.com/Kotlin/kotlinx.coroutines • Ongoing improvement work! 120

  • Thank you Want to learn more? Questions? elizarov @ Roman

    Elizarov relizarov 121


  • RetroSearch is an open source project built by @garambo | Open a GitHub Issue

    Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

    HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4