Share this page : facebooktwitterlinkedinmailfacebooktwitterlinkedinmail
Processes
You can implement queues in many ways. For a single machine, the standard library’s multiprocessing module contains a Queue function. 

Reference: lnhttps://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.run

Example:

Suppose that you’re washing dishes. If you’re stuck with the entire job, you need to wash each dish, dry it, and put it away. You can do this in a number of ways. You might wash the first dish, dry it, and then put it away. You then repeat with the second dish, and so on. Or, you might batch operations and wash all the dishes, dry them all, and then put them away; this assumes you have space in your sink and drainer for all the dishes that accumulate at each step. These are all synchronous approaches—one worker, one thing at a time.

what if you wash faster than the dryer dries? Wet dishes either fall on the floor, or you pile them up between you and the dryer, or you just whistle off-key until the
dryer is ready. And if the last person is slower than the dryer, dry dishes can end up falling on the floor, or piling up, or the dryer does the whistling.

Let’s simulate just a single washer and multiple dryer processes (someone can put the dishes away later) and an intermediate dish_queue.
Call this program dishes.py:

 

import multiprocessing as mp
def washer(dishes,output):
    for dish in dishes:
        print(‘Washing’,dish,’dish’)
        output.put(dish)
def dryer(input):
    while True:
        dish=input.get()
        print(‘Drying’,dish,’dish’)
        input.task_done()
dish_queue=mp.JoinableQueue()
dryer_proc=mp.Process(target=dryer,args=(dish_queue,))
dryer_proc.daemon=True
dryer_proc.start()
dishes=[‘salad’,’bread’,’entree’,’dessert’]
washer(dishes,dish_queue)
dish_queue.join()

 

Run your new program thusly:

$ python dishes.py
Washing salad dish
Washing bread dish
Washing entree dish
Washing dessert dish
Drying salad dish
Drying bread dish
Drying entree dish
Drying dessert dish

Explanation of the methods

 

class multiprocessing.JoinableQueue([maxsize])
 

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

       
        task_done()
 

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get()used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

   
         join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

The Queue, multiprocessing.queues.SimpleQueue and JoinableQueue types are multi-producer, multi-consumer FIFO queues modelled on the Queue.Queue class in the standard library.

They differ in that Queue lacks the task_done()and join() methods.

If you use JoinableQueue then you must call JoinableQueue.task_done() for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.


put(obj[block[timeout]])
 

Put obj into the queue. If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout_seconds and raises the Queue.Full exception if no free slot was available within that time. Otherwise ( block is False ), put an item on the queue if a free slot is immediately available ( no wait ), else raise the Queue. Full exception ( timeout is ignored in that case ).

put_nowait(obj)
 

Equivalent to put(obj, False).

get([block[timeout]])

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None(the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Queue. Empty exception if no item was available within that time.  Otherwise (block is False), return an item if one is immediately available( no wait ), else raise the Queue.Empty exception ( timeout is ignored in that case ).

get_nowait()

 Equivalent to get(False).

run()

Method representing the process’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

start()

Start the process’s activity.

This must be called at most once per process object. It arranges for the object’s run() method to be invoked in a separate process.

 

join([timeout])
 

Block the calling thread until the process whose join() method is called terminates or until the optional timeout occurs.

If timeout is None then there is no timeout.

A process can be joined many times.

A process cannot join itself because this would cause a deadlock. It is an error to attempt to join a process before it has been started.

 

multiprocessing.Process(group=Nonetarget=Nonename=Noneargs=()kwargs={})

Process objects represent activity that is run in a separate process. The Process class has equivalents of all the methods of threading.Thread.

The constructor should always be called with keyword arguments.

  • group should always be None; it exists solely for compatibility with threading.Thread.
  • target is the callable object to be invoked by the run() method. It defaults to None, meaning nothing is called.
  • name is the process name.
  • args is the argument tuple for the target invocation.
  • kwargs is a dictionary of keyword arguments for the target invocation. By default, no arguments are passed to target.

 

daemon

The process’s daemon flag, a Boolean value. This must be set before start() is called.

The initial value is inherited from the creating process.

When a process exits, it attempts to terminate all of its daemonic child processes.

Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits. Additionally, these are not Unix daemons or services, they are normal processes that will be terminated (and not joined) if non-daemonic processes have exited.

 

Threads

 

The multiprocessing module has a cousin called threading that uses threads instead of processes (actually, multiprocessing was designed later as its process-based counterpart). Let’s redo our process example with threads:

import threading

def do_this(what):

whoami(what)

def whoami(what):

print(“Thread %s says: %s” % (threading.current_thread(), what))

if __name__ == “__main__”:

whoami(“I’m the main program”)

for n in range(4):

p = threading.Thread(target=do_this,

args=(“I’m function %s” % n,))

p.start()

Here is the result:

Thread <_MainThread(MainThread, started 140735207346960)> says: I’m the main program
Thread <Thread(Thread-1, started 4326629376)> says: I’m function 0
Thread <Thread(Thread-2, started 4342157312)> says: I’m function 1
Thread <Thread(Thread-3, started 4347412480)> says: I’m function 2
Thread <Thread(Thread-4, started 4342157312)> says: I’m function 3

Methods explanation:
Reference: https://docs.python.org/3/library/threading.html
threading.current_thread()

Return the current Thread object, corresponding to the caller’s thread of control. If the caller’s thread of control was not created through the threading module, a dummy thread object with limited functionality is returned.

 

threading.Thread(group=Nonetarget=Nonename=Noneargs=()kwargs={}*daemon=None)
 

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If not None, daemon explicitly sets whether the thread is daemonic. If None (the default), the daemonic property is inherited from the current thread.

If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

daemon argument.

start()

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargsarguments, respectively.

join(timeout=None)

Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception –, or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raise the same exception.

name

A string used for identification purposes only. It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

One difference between multiprocessing and threading is that threading does not have a terminate() function.

Threads can be dangerous. Like manual memory management in languages such as C and C++, they can cause bugs that are extremely hard to find, let alone fix. To use threads, all the code in the program—and in external libraries that it uses—must be threadsafe. In the preceding example code, the threads didn’t share any global variables, so they could run independently without breaking anything.

Threads can be useful and safe when global data is not involved. In particular, threads are useful for saving time while waiting for some I/O operation to complete.

one common reason to launch multiple threads is to let them divide up the work on some data, so a certain degree of change to the data is expected. The usual way to share data safely is to apply a software lock before modifying a variable in a thread.

So for Python, the recommendations are as follows:
• Use threads for I/O bound problems
• Use processes, networking, or events (discussed in the next section) for CPU-bound problems