Concurrency In Python

Page Contents

References

An Introduction To Concurrency

When a program can interleave statements from multiple workflows we can say the program is concurrent. This can be done via processes and threads. The main difference between the two is that a process has its own address space and cannot access the memory of other processes directly. A thread, on the otherhand, runs within a process. It can access the entire address space of the process it belongs to.

Here I will mostly talk about threads...

Correctness Properties

Safety. We want our programs to be safe so that having multiple threads try to access shared resources etc does not cause errors and that all our threads can run continuously:

Liveness. We out programs to produce something useful! I.e, each process is allocated the resources it needs to do its job:

What Is A "Critical Region"

A critical region is a section of code which must only be executed by one thread at any one time. Lets take, for example, a shared counter. Let's say that there are N threads, each of which increments the counter when some event occurs. The pseudo code that increments the counter is:

var = counter value
var = var + 1
counter value = var

Without any protection it is quite possible to get this interleaving:

Counter value is 5
Thread A: var = 5
Thread B: var = 5
Thread B: var = var + 1 = 6
Thread B: counter value = 6
Thread A: var = var + 1 = 6
Thread A: counter value = 6
Counter value is 6 (oops!)

The value of the counter at the end of this sequence is wrong. It should have been incremented twice, once by thread A and once by thread B, giving it a new value of 7. As it stands it ends up being 6!

Clearly from the point just before a thread reads the counter to just after it stores the incremented value, a thread must not be interrupted by another thread that is also trying to increment the counter. If we make our pseudo code this:

ENTER CRITICAL REGION
var = counter value
var = var + 1
counter value = var
LEAVE CRITICAL REGION

The the malfunctioning interleaving we saw above cannot happed because we have placed this "guard" around the access to the counter. This guard stops more than 1 thread being in this section of code at once. Thus, our previous sequence would now look like this:

Counter value is 5
Thread A: Enters critical region
Thread B: Tries to enter critical region but cannot. Blocks!
Thread A: var = var + 1 = 6
Thread A: counter value = 6
Thread A: Leaves critical region
Thread B: Unblocks and enters critical region
Thread B: var = 6
Thread B: var = var + 1 = 7
Thread B: counter value = 7
Thread B: Leaves critical region
Counter value is 7 (yay!)

So, how do we define critical regions in Pyhon? We use Locks, Recursive Locks, Semaphores, and Condition Objects, which we will come to later....

The Advantages Of Threads

So, why use threads? Because sometimes programs are waiting doing nothing, so by having more threads, whilst one thread is waiting on say an internet query, the other thread can be getting on doing useful work. Esessntially you can get more work done. It can be more efficient to have multiple threads in one process rather than many processes.

So... to give myself some structure writing this and to revise a little I looked back at my uni notes which were for Java and made almost 15 years ago now! The principles haven't changed and rather than always looking up the docs and coding as I go I though writing this would cement it in my head for Python. So here goes...

Create A Thread

The Python Standard Library provides the class Thread, which can either have a callable passed into the constructor or the class itself can be subclassed (only override __init__() and run().

Passing A Callable To The Thead Constructor

Create a thread by passing a callable (usually a functionm) to the Thread constructor as demonstrated below.

from threading import Thread

def my_thread(arg1):
   print("Have arg1 = {}".format(arg1))
   for i in range(10):
       print(i)

threads = []
for i in range(5):
   thread = Thread(
      target = my_thread,
      name = "my_thread_{}".format(i), args=(5-i,))
   threads.append(thread)
   thread.start()

for thread in threads:
   print(
      "Waiting for thread {} with name '{}'".format(
         thread.ident, thread.name))
   thread.join() # Block until this thread has finished

The above example starts 5 threads, which don't do anything particularly interesting except print out the argument they were passed and then a sequence of 10 numbers. The output should be enough to convince you that the seperate flows of execution are running concurrently however.

To Thread we can pass an args option which has to be a tuple. Each member of that tuple is passed as an argument to the callable my_thread. The Thread is told to run the function my_thread through the use of the target keyword.

The thread's name keyword argument isn't used by the thread, but it can be retrieved using the thread.name property. You can set it to anything you like and it does not have to be unique.

Once each thread is created it is immediately started using thread.start(). As soon as this call completes the thread is active and can be scheduled at any time.

Once a thread is active it is given a unqiue identified, which we print out in the "Waiting for thread..." message using the property thread.ident. Note, that although the identified is unique, as soon as a thread dies it's ident may be reused by new threads.

Subclassing Thread

If you subclass Thread you must override the run() method and optionally the constructor.

from threading import Thread

class MyThread(Thread):
  def __init__(self, arg1):
     super(MyThread, self).__init__()
     self._arg1 = arg1

  def run(self):
     print("Have arg1 = {}".format(self._arg1))
     for i in range(10):
        print(i)

threads = []
for i in range(5):
   thread = MyThread(5-i)
   threads.append(thread)
   thread.start()

for thread in threads:
   print(
      "Waiting for thread {} with name '{}'".format(
         thread.ident, thread.name))
   thread.join() # Block until this thread has finished

I dont think there is much difference between giving Thread a target and just sublassing it. The later just has a more object oriented feel about it I guess.

Waiting For A Thread To End

In the above examples you will have seen the command thread.join(). The join() method is part of the Thread object and causes the calling thread of execution to block until the thread has terminated.

Create A Semaphore

The following snippet creates a semaphore with an initial value of 2. This means that up to 2 threads can be in the critical section that this semaphore can establish at any one time. If a 3rd thread were to try and enter it would block until 1 or more of the existing threads had left the CR.

num_resources = 2
my_sem = threading.Semaphore(num_resources)

To use the semaphore to define a critical region you can do this:

with my_sem:
    # do something critical 
# get to here and you're outside the with block and therefore
# no longer in the critical region

This is a nice and safe way of using a semaphore. At the start of the with block, the semaphore will have its acquire() function called automatically for you. Then the code in the with block is executed. When the with block is exited, for any reason, be it exception or otherwise, the semaphore will have its release() method called.

You can, of course, call the acquire() and release() methods yourself. You might do it like this:

my_sem.aqcuire()
try:
    # do some work
finally:
    my_sem.release()

Notice that we have used a try block to ensure that no matter what happens, the semaphore will be released, whether you return from within the try block or throw an exception etc.

Sometimes you will write things like this:

my_sem.acquire()
if some condition:
    my_sem.release()
    return some error
# do critical work
my_sem.release()

This is quite error prone because you have to remember to release the semaphore wherever you end the normal flow of execution. You are much much much better off using a with block, or failing that the try/finally block structures shown above.

Create A (Recursive) Lock

A lock is like a sempahore with an initial value of 1. It is effectively a mutex. It allows one and only one thread access to the resource or critical section that it defines.

You create a lock as follows:

my_lock = threading.Lock() #< Created unlocked

Just like the semaphore the lock has methods acquire() and release(). The lock can also be used with the with block in the same way.

A thread cannot re-take a lock that it has already aquired. If it tries to it will block:

my_lock.acquire()
my_lock.acquire() #< This WILL block!

If you want a thread to be able to take a lock and then, take it again before releasing it - this is called recursive locking then you need to use an RLock:

my_rlock = threading.RLock()
my_rlock.acquire()
my_rlock.acquire() #< Will NOT block

The Bounded Buffer Problem: Asynchronous Message Passing

In this scenario there a thread that is create a load of data. It could be a thread on your server, accepting client connections, reading data from the client and handling the higher level protocol, and then passing the the received data on to another thread in your application that will process the data.

The thread getting the data that is getting pumped into your program's analysis algorithm thread is called the producer. The algorithm thread is said be be the consumer because it is consuming the data.

If we didn't have a queue in between these two threads they would have to operate in lock-step. The producer would get data over the network. It would then have to wait for the analysis (consumer) thread to be available so it could pass the data through to it. If the analysis thread was busy it would have to wait and so it could miss some incoming data. The same for the analysis thread. It will have to wait until data is available. If there is a sudden load on the network and there is a large delay in receiving data, the analysis thread will sit idle until suddenly a glut of data is received.

What we want to do is to decouple these two threads so that if the producer is busy, there is still enough data available to occupy the consumer. If the consumer is busy, the producer has somewhere to stash the data quickly so it can get back to its task. This is done by placing a buffer between the two processes.

We need to make sure of two things. The consumer must not read from and empty buffer (underflow) and the producer must not write to a full buffer (overflow).

Now, in Python, the standard library already gives us such a thread safe buffer. It is found in collections.deque: Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction.

Although, therefore, we would never implment this ourselves, it is a nice exercise to try and implement this ourselves so that we can learn about and practice using the threading library.

This can be implemented using this pseudo code:

put_data:
    spaces.acquire()
    write value into queue
    elements.release()

get_data:
    elements.acquire()
    read value from queue
    spaces.release()

But, in Python we can use condition variables. Here I replicate, with added comments, the example from the Python docs. We can see that only the one condition variable is required, rather than a pair of semaphores, which makes the implementation a little cleaner.

# Consume one item
cv.acquire()                      #< Lock is aquired
while not an_item_is_available(): #< Therefore this executes in the CR
    cv.wait()                     #< Lock is released and thread sleeps
                                  #  until cv.notify[All]() is called.
# When cw.wait() unblocks it reaquires the lock so at this point
# we are back inside the CR
get_an_available_item()
cv.release()

# Produce one item
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()
    

The Dining Philosophers Problem: Sharing Resources

The Readers And Writers Problem: Conditional Forms Of Mutual Exclusion

The Handshaking Problem: Synchronous Message Passing