Consistent Hashing with Kotlin

For the final project of my parallel programming course this semester in grad school, I chose to implement a DynmoDB clone in C using OpenMP and MPI. Dynamo uses a partitioning strategy where the data is spread across multiple nodes. It hashes the key to determine, which server to store the item on i.e. consistent hashing.

Consistent hashing is a powerful technique that is useful to keep in mind when architecting large scale distributed systems. Let's look at the three levels of this technique.

Level 1: Hash + Modulo

The first level of consistent hashing is to hash your item key with a fast well-distributed hash function and then modulo with the number of nodes in the system. The function should distribute the data randomly without any noticeable correlation.

Side note: Which hash function to use is a broader discussion on its own. Rule of thumb is to pick something super fast. You generally don't need a cryptographically secure hash. A cryptographically secure fast hash may still be too slow here.

If you use MD5, your hash function will look something like this

fun keyToMd5Hex(itemKey: String): String {
  val digest = MessageDigest.getInstance("MD5").digest(itemKey.toByteArray())
  val sb = StringBuilder()
  for (b in digest) { sb.append(String.format("%02x", b)) }
  return sb.toString()
}

println(strToMd5Hex("funny_username917"))
// 262F27F3C4F63712F63328A1440FB808

And if you have nine servers - you would hash the requested item's key using your hash function and then modulo it with nine giving you the index of the server that the item is on.

val nodeIndex = hash(itemKey) % numOfServers

This approach is easy to reason about and straight forward to implement. There is a small issue though and that is scaling the nodes. A small change in the number of nodes will result in a lot of work to reshuffle all the keys around the cluster. This becomes difficult to deal with as the cluster size grows as numOfServers/(numOfServers+1) keys have to be moved around every time you add a node.

Level 2: Hashing with virtual nodes

Instead of thinking in terms of physical nodes, we could think of them as dots on a ring appearing multiple times. Each dot is a virtual node that gets mapped to a physical node. Many virtual nodes map onto the same physical resource. The circle represents the range of values that the hash function can take. To look up a node for a given key, hash the item and look for the first hash value of a node after the item hash.

To model the ring we can use a TreeMap. The following access patterns become quite easy with a TreeMap

  • Check if a given key exists in a TreeMap
  • Retrieve the entry whose key is just lower than the given key
  • Retrieve the entry whose key is just higher than the given key

The last two come quite handy when dealing with trying to find the next hash in the ring.

// Using hex string as key
val ring = TreeMap<String, VirtualNode>()

Virtual Node is defined as

data class VirtualNode(val node: Node, val replicaIndex: Int) {
  fun getKey(): String {
    return node.host + ":" + node.port + "-" + replicaIndex
  }

  fun isVirtualNodeOf(otherNode: Node): Boolean {
    return node == otherNode
  }

  fun getPhysicalNode(): Node {
    return node
  }
}

and Node is defined as

data class Node(val tag: String, val host: String, val port: Int)

To add a node we can do

fun addNode(node: Node, replicas: Int) {
  require(replicas >= 0) { "illegal virtual node counts $replicas" }
  if (getExistingReplicas(node) == replicas) return
  for (i in 0 until replicas) {
    val vNode = VirtualNode(node, i)
    val myHash = keyToMd5Hex(vNode.getKey())
    ring[myHash] = vNode
  }
}

To lookup a node we can do

fun lookupNode(reqKey: String): Node {
    val tailMap = ring.tailMap(keyToMd5Hex(reqKey))
    val nodeHashVal = when {
      !tailMap.isEmpty() -> tailMap.firstKey()
      else -> ring.firstKey()
    }
    return ring[nodeHashVal]?.getPhysicalNode() ?: activeNodes.random()
}

This helps a lot. Now if we have to add a node only 1/n of the keys will be moved around, which is a lot better than before.

But now we have another issue. Some items are more popular than others and since they sit on one server they can overload the server that hosts it.

We could split the partition that is hot but that would only help to a certain extent. Another option is to add a bit of randomness to the item key before hashing it so that it sits on multiple servers i.e. smear the cache. Classic storage vs performance tradeoff.

It would be really great if we could get both. Only move 1/n keys when scaling nodes while also not overloading a single server. That's where level 3 comes to play.

Level 3: Consistent hashing with bounded loads

This combines the virtual nodes approach with keeping track of server load [add hooks before and after requests] and not letting any server get more than a pre-determined factor of 'c' load compared to every other server. If the server is overloaded than fall back to another server in a deterministic fashion, which means the fallback servers are also always consistent.

Here's the paper by Google on it: ai.googleblog.com/2017/04/consistent-hashin..

Consistent hashing is a great technique to have. You can use it for better cache hits, routing certain requests to certain nodes [consistent routing] or for object storage. If you are in a Kubernetes environment proxies such as Nginx and Envoy use consistent hashing to provide consistent routing to your pods.

No Comments Yet