Parallel requests in Kotlin vs Golang

Subscribe to my newsletter and never miss my upcoming articles

Background context

Both Kotlin and go have the concept of Coroutines [or goroutines]. While Kotlin's coroutines are primarily used to provide a convenience layer over callback/futures based APIs and interfaces, goroutines are mainly there to allow for simplified concurrency. Interestingly enough they are implemented slightly differently as well. You can read more around the differences in this stack-overflow answer here by Roman Elizarov who is the team lead at Jetbrains for Kotlin coroutines.

It is a common task for developers to have to fetch multiple things from a backend or 3rd party service and join it together before sending the response back e.g. taking a list of user IDs and getting the user data associated with each from a user service. Although a simple 'loop over each ID and make the request' approach would work - situations like these are prime candidates to benefit from concurrency by making all the requests in parallel instead of waiting on the response of one before sending out the next one.

In this post let's look at how the code looks when trying to achieve this task in go and Kotlin. We will use https://reqres.in/ as our backend. The endpoint we will be calling is https://reqres.in/api/users/:id where id is [1,12] which will return us the details about user id == :id.

// response for https://reqres.in/api/users/1
{
  "data": {
    "id": 1,
    "email": "george.bluth@reqres.in",
    "first_name": "George",
    "last_name": "Bluth",
    "avatar": "https://s3.amazonaws.com/uifaces/faces/twitter/calebogden/128.jpg"
  },
  "ad": {
    "company": "StatusCode Weekly",
    "url": "http://statuscode.org/",
    "text": "A weekly newsletter focusing on software development, infrastructure, the server, performance, and the stack end of things."
  }
}

Golang

Starting with Golang - First we need to convert the json response into a go struct to decode the response body into. Thanks to https://mholt.github.io/json-to-go/ we can simply paste the json response above and get the struct needed.

type APIResponse struct {
    Data struct {
        ID        int    `json:"id"`
        Email     string `json:"email"`
        FirstName string `json:"first_name"`
        LastName  string `json:"last_name"`
        Avatar    string `json:"avatar"`
    } `json:"data"`
    Ad struct {
        Company string `json:"company"`
        URL     string `json:"url"`
        Text    string `json:"text"`
    } `json:"ad"`
}

How to make the request synchronously in golang?

func main() {
    // create an array for holding the responses
    userDataList: = [12]*APIResponse{}
    startTime: = time.Now()
    // for each user id
    for i: = 1; i <= 12; i++{
        // make the http request
        endpoint := fmt.Sprintf("https://reqres.in/api/users/%d", i)
        res, _ := http.Get(endpoint)

        // close response body
        defer res.Body.Close()

        // decode resp body to the APIResponse struct
        apiResp := &APIResponse{}
        json.NewDecoder(res.Body).Decode(apiResp)

        // add to our response array
        userDataList[i - 1] = apiResp
    }
    // calculate how long it took
    elapsedTime := time.Now().Sub(startTime)
    fmt.Println(elapsedTime)
    fmt.Println(len(userDataList)) // prints 12
}

Not too bad. Let's see how long its taking to do the 12 requests sequentially. Running the code above three times gives me:

// iteration 1
854.980516ms
12
// iteration 2
722.843678ms
12
// iteration 3
651.220378ms
12

Ouch. 854ms response times are tough to swallow. Your users will not be happy if they have to wait almost a second before their page loads.

Parallel request in Golang

Now let's convert the above code to make the requests in parallel and see how the code looks and hopefully we will also reduce the time it takes.

func main() {
    // parallel with mutex and waitgroup
    // create a wait group
    wg := &sync.WaitGroup{}
    // init a mutex
    mutex := & ync.Mutex{}
    startTime := time.Now()
    // for each user id
    for i: = 1; i <= 12; i++{
        // add one to the waitgroup for each goroutine launched
        wg.Add(1)
        go func(userID int) {
            // reduce the waitgroup count once this goroutine finishes
            defer wg.Done()

            // make the http request
            endpoint := fmt.Sprintf("https://reqres.in/api/users/%d", userID)
            res, _ := http.Get(endpoint)

            // close response body
            defer res.Body.Close()

            // decode resp body to the APIResponse struct
            apiResp := &APIResponse{}
            json.NewDecoder(res.Body).Decode(apiResp)

            // use mutex to prevent concurrent access to shared mutable state
            mutex.Lock()
            // add to our response array
            userDataList[userID - 1] = apiResp
            mutex.Unlock()
        }(i)
    }
    // wait till waitgroup count goes to 0 i.e. wait till all goroutines have called done
    wg.Wait()
    elapsedTime := time.Now().Sub(startTime)
    fmt.Println(elapsedTimeMt)
    fmt.Println(len(userDataList))
}

What is a wait group?

A wait group is a synchronisation primitive that allows you to 'hang' your program until the counter goes down to 0. It is similar to CountDownLatch in JVM land. If you launch a bunch of goroutines without a wait group your function or program may finish and exit before the work is actually done. To combat the program exiting early - you create a wait group and add 1 for each goroutine that you launch and do a Done() once the work is finished then on the Wait() call the program will hang until all the dones are called and the counter goes to 0.

There is also errGroup in go - but that's for some other time.

What is a mutex?

A mutex is a synchronisation primitive that allows safe access to shared mutable state. If you have a shared piece of data - it is unsafe to write to it concurrently from multiple threads. A mutex makes sure that the data is accessed safely.

Running it three times on my machine gives me:

// iteration 1
45.037761ms
12
// iteration 2
119.491611ms
12
// iteration 3
42.831061ms
12

Not bad at all! 45ms to get all the 12 responses.

Parallel request in Kotlin

Now let's look at dealing with the same situation in Kotlin.

How to make the request synchronously in Kotlin?

suspend fun main() {
  // create a vertx instance
  val vertx = Vertx.vertx()
  // create an http client
  val webClient = WebClient.create(vertx)
  val userIds = IntRange(1, 12)
  // response list
  val userDataList = mutableListOf<UserData>()

  repeat(3) {
    val timeMillis = measureTimeMillis {
     // for each user id
      userIds.forEach {
        // make the API request
        val apiData = webClient.getAbs("https://reqres.in/api/users/${it}")
          .`as`(BodyCodec.jsonObject()) // convert response buffer to JsonObject
          .sendAwait() // make the request
          .body() // retrieve body [jsonObject]
        // add the response to the response list
        userDataList.add(UserData(it, apiData))
      }
    }
    println("iteration $it sync took $timeMillis ms")
   }
 }

iteration 0 sync took 1869 ms
iteration 1 sync took 465 ms
iteration 2 sync took 449 ms

We don't need to do the same elapsedtime := endTime - startTime in Kotlin to measure the time of a code block. Kotlin provides us with a measureTimeMillis which takes in a lambda and returns the milliseconds it took to execute it. The first iteration takes a long time because of the JVM warmup time. Repeat is my favourite way of looping in Kotlin - it takes a code block and executes it n number of times

Parallel with mutex in Kotlin

// parallel with mutex
// init a mutex
val mutex = Mutex()
repeat(3) {
  val timeMillis = measureTimeMillis {
    // start a coroutine scope to practice structured concurrency
    coroutineScope {
      // for each user Id
      userIds.forEach {
        // launch a coroutine
        launch {
          // that gets the data from the api
          val apiData = webClient.getAbs("https://reqres.in/api/users/${it}")
            .`as`(BodyCodec.jsonObject())
            .sendAwait()
            .body()
          // add the response to the response protecting access via the mutex
          mutex.withLock { userDataList.add(UserData(it, apiData)) }
        }
      }
    }
  }
  println("iteration $it took $timeMillis ms")
}

iteration 0 took 376 ms
iteration 1 took 121 ms
iteration 2 took 108 ms

We can use a mutex in Kotlin as well. Kotlin coroutines library has a non-blocking mutex [it suspends instead of blocking the thread] that we can safely use in coroutines and other suspending functions.

What is a Coroutine scope?

CoroutineScope is a way of managing the life cycle of coroutines in Kotlin. It allows us to have structured concurrency. Making sure that all launched coroutines have finished before proceeding to the next line and that cleanup and cancellation are done in case any of the coroutines fail/throw an exception. You have to wrap your launch/async calls in a coroutineScope { } block that establishes the boundary of the operations. The scope becomes the parent and manages cancellation and cleanups in case any of its children coroutines fail. You can read more here and here

Roman Elizarov suggests that mutexes are a low level construct and we can do the task required without it using pure kotlin functions.

Parallel without mutex in Kotlin

// parallel without mutex
repeat(3) {
  val timeMillis = measureTimeMillis {
    coroutineScope {
      // create a list of Deferred UserData Objects
      val jobs = mutableListOf<Deferred<UserData>>()
      // for each user id
      userIds.forEach {
        // launch an async coroutine that returns the API response we want
        // add the deferred to the list above
        jobs += scope.async {
          val apiData = webClient.getAbs("https://reqres.in/api/users/${it}")
            .`as`(BodyCodec.jsonObject())
            .sendAwait()
            .body()
          UserData(it, apiData) // this is the return from async
        }
      }
      // awaitAll is a suspending function that suspends while the jobs finish and return the response object
      // userDataResponses is now a list of UserData
      val userDataResponses = jobs.awaitAll()
    }
    println("iteration $it took $timeMillis ms")
  }
}

iteration 0 took 131 ms
iteration 1 took 116 ms
iteration 2 took 124 ms

Look Ma! no mutexes. In this case we use async instead of launch, which returns a deferred - Kotlin's version of a Future or Task. A deferred is something that will hold a value sometime in the future once the asynchronous work completes - in this case it is our response object.

Conclusion

Being well versed in Kotlin and Go has helped me immensely in my understanding of concurrent and asynchronous programming. Seeing how coroutines are used and implemented in two distinct languages really helps you see the vast range of the concept. Sometimes the best way to learn a concept in one language is tinkering with it in another.

No Comments Yet