Implementing a Concurrent Set in Go – Part III

In the previous part of this series, we studied the Coarse-Grained Synchronization approach to run concurrent operations in our Set. We concluded that it’s a correct and reliable implementation, but it can suffer from contention when the concurrency levels are high.

In this third part, we’ll review the Fine-Grained Synchronization approach. Unlike Coarse-Grained, in which we have a lock for the entire list, we only lock the nodes that are critical for the current operation. Since the algorithm uses a current and a predecessor pointer to traverse the list, we can lock only these nodes, so other threads can traverse the list concurrently without blocking them entirely.

Let’s add a new Mutex to our Node type and two methods to lock and unlock the Node to have a more clear notation:

type node struct {
mu sync.Mutex
item []byte
key uint64
next *node
}
func (n *node) lock() {
n.mu.Lock()
}
func (n *node) unlock() {
n.mu.Unlock()
}
view raw U8fs+UoJ.go hosted with ❤ by GitHub

Now we can use these methods in Add() and only lock the Nodes we need to protect:

func (l *list) Add(item []byte) bool {
key, _ := hashstructure.Hash(item, nil)
l.head.lock()
pred := l.head
curr := pred.next
curr.lock()
// Displace locks during each iteration.
// We keep the lock for curr when unlocking pred.
// This way the coupled-lock is maintained
for curr.key < key {
pred.unlock()
pred = curr
curr = curr.next
curr.lock()
}
if curr.key == key {
pred.unlock()
curr.unlock()
return false
}
n := newNode(item)
n.next = curr
pred.next = n
l.mu.Lock()
l.size++
l.mu.Unlock()
curr.unlock()
pred.unlock()
return true
}
view raw 6sgN7riQ.go hosted with ❤ by GitHub

In this case, we don’t use defer() to unlock the nodes. Since we are locking and unlocking nodes in a loop, it’s clearer to manage the unlocking explicitly.

The algorithm starts by locking head and its successor. Then during each iteration, we displace both locks across the list and unlock the nodes that were previously locked. We always hold the two locks until we decide what to do. If the key is already present, we unlock and return. If not, just like in the other cases, we add a new node, and then we unlock.

This pattern improves concurrency because if we add nodes at some point of the list, we move the locks until we reach the insertion point, and all the previous nodes remain unlocked in case another thread is operating in the Set. In the Coarse-Grained approach, the position didn’t matter. The whole list was locked until the current operation was done.

The Remove() and Contains() method are similar with minor changes:

func (l *list) Remove(item []byte) bool {
key, _ := hashstructure.Hash(item, nil)
l.head.lock()
pred := l.head
curr := pred.next
curr.lock()
for curr.key < key {
pred.unlock()
pred = curr
curr = curr.next
curr.lock()
}
if curr.key == key {
pred.next = curr.next
curr.unlock()
pred.unlock()
l.mu.Lock()
l.size--
l.mu.Unlock()
return true
}
curr.unlock()
pred.unlock()
return false
}
func (l *list) Contains(item []byte) bool {
key, _ := hashstructure.Hash(item, nil)
l.head.lock()
pred := l.head
curr := pred.next
curr.lock()
for curr.key < key {
pred.unlock()
pred = curr
curr = curr.next
curr.lock()
}
if curr.key == key {
pred.unlock()
curr.unlock()
return true
}
curr.unlock()
pred.unlock()
return false
}
view raw kpuHJCy8.go hosted with ❤ by GitHub

For this approach, we also want to keep the Mutex for the list since we are concurrently modifying its size every time we add or remove nodes. If we don’t lock that variable, we can get data races between threads adding or removing items.

Even though we reduced locking by using Fine-Grained Synchronization, there are still cases where substantial blocking can occur. For example, if one thread adds or removes an item at the beginning of the list, it will keep a lock on those nodes, preventing other threads from traveling the list until the operation finishes.

Last revision: 06-16-2020