Chapter 14: Concurrency and Process Management
Python offers powerful tools for running external programs (subprocess
) and for executing code in parallel (multiprocessing
). This chapter will help you:
- Understand how to call external commands using
subprocess
- Run code in parallel using the
multiprocessing
module - Share data between processes
- Manage process pools for efficient task distribution
- Build and test an example: A parallel file compressor
14.1 Running External Commands with subprocess
Why Use subprocess
?
The subprocess
module allows you to spawn new processes, connect to their input/output/error pipes, and obtain their return codes.
Run a Shell Command
import subprocess
# Run a command and capture its output
result = subprocess.run(["ls", "-l"], capture_output=True, text=True)
print("Exit Code:", result.returncode)
print("Output:\n", result.stdout)
capture_output=True
: Captures stdout and stderr.text=True
: Returns string instead of bytes.
Handle Errors
try:
subprocess.run(["ls", "nonexistent_folder"], check=True)
except subprocess.CalledProcessError as e:
print("🛑 Failed:", e)
Redirect Output to File
with open("output.txt", "w") as f:
subprocess.run(["echo", "Hello, subprocess!"], stdout=f)
⚠️
subprocess
runs external programs; it does not execute Python code directly unless you callpython
.
Passing Input to a Subprocess
result = subprocess.run(['grep', 'hello'], input='hello world\nbye', text=True, capture_output=True)
print(result.stdout)
Advanced Use: Using Popen
Popen
is similar to run
but offers more flexibility for complex or uncommon cases.
process = subprocess.Popen(['echo', 'Hello'], stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
print(stdout.decode())
Use Cases for subprocess
- Running shell commands from Python
- Automating OS-level tasks (e.g., compressing files, invoking Git, calling
ffmpeg
) - Logging or interacting with CLI tools
Example: Run a Docker Container
Let's look at a more realistic and complex example. We want to run some ad-hoc Python code, generated within a Python program. Running this code within the same environment as the parent program may impose some security risks. To mitigate such risks, we could run the generated code within an isolated Docker container. The following example does just that:
This example assumes:
- You have Docker installed and running.
- You have a Python base image available locally or downloadable (e.g.,
python:3.10-slim
). - Your local system can run
docker
commands withoutsudo
.
import subprocess
import os
import tempfile
import textwrap
# Step 1: Define the Python code to be run inside the container
python_code = textwrap.dedent("""
print("Hello from inside Docker!")
for i in range(5):
print(f"Processing item {i+1}")
""")
# Step 2: Create a temporary Python file
temp_dir = tempfile.mkdtemp()
python_file_path = os.path.join(temp_dir, "app.py")
with open(python_file_path, "w") as f:
f.write(python_code)
print(f"[INFO] Created Python file at {python_file_path}")
# Step 3: Launch a Docker container in detached mode
container_name = "python_subprocess_demo"
print("[INFO] Starting Docker container...")
subprocess.run(
["docker", "run", "-d", "--name", container_name, "python:3.10-slim", "sleep", "60"],
check=True
)
# Step 4: Copy Python file into the container
print("[INFO] Copying file into container...")
subprocess.run(
["docker", "cp", python_file_path, f"{container_name}:/app.py"],
check=True
)
# Step 5: Run the Python program inside the container
print("[INFO] Executing Python program inside container...")
result = subprocess.run(
["docker", "exec", container_name, "python", "app.py"],
capture_output=True,
text=True
)
print("[INFO] Output from container:\n", result.stdout)
# Step 6: Cleanup - Stop and remove the container
print("[INFO] Stopping and removing container...")
subprocess.run(["docker", "rm", "-f", container_name], check=True)
# Step 7: Remove temporary file
os.remove(python_file_path)
os.rmdir(temp_dir)
print("[INFO] Cleanup complete.")
What This Code Does
- Creates a Python file dynamically in a temporary folder.
- Starts a Docker container from
python:3.10-slim
in the background. - Copies the generated Python script into the container.
- Executes the script inside the container using
docker exec
. - Captures and prints the container’s output.
- Cleans up by stopping/removing the container and deleting temporary files.
Key Points
subprocess.run(..., check=True)
ensures the program fails immediately if any Docker command fails.- We decouple the container lifecycle from the main program — meaning the container exists just long enough to run the script.
- This approach could be expanded to deploy AI models, run integration tests in isolation, or simulate production environments.
14.2 Parallel Execution with multiprocessing
Python’s multiprocessing
module enables true parallelism using separate processes (unlike threads, which are limited by the Global Interpreter Lock).
Basic Example
from multiprocessing import Process
def greet(name):
print(f"Hello, {name}")
if __name__ == '__main__':
p = Process(target=greet, args=('Alice',))
p.start()
p.join()
target=greet
→ tells the process to run thegreet
function.args=('Alice',)
→ arguments passed to the function.
Note
- Protecting the main block:
if __name__ == '__main__':
- This is critical in Windows (and recommended for all platforms) when using
multiprocessing
. - Without this guard, the child process might re-import the module, leading to infinite process creation.
- This is critical in Windows (and recommended for all platforms) when using
- Even if there’s only one argument, the value passed as
args
must be a tuple (hence the comma after'Alice'
). - If you don’t
join()
, the main program might exit before the child process finishes running.
Execution Flow: Process
- Main process creates
p
(a new process object). p.start()
→ New process begins and executesgreet('Alice')
.p.join()
→ Main process pauses until the greeting process finishes.- Output is printed from the child process.
Use Cases for multiprocessing
- CPU-bound tasks (e.g., image processing, data transformation)
- Parallel computation
- Scaling out a task across multiple cores
Using a Pool of Workers
Python’s multiprocessing.Pool
is used to run the same function on multiple pieces of data in parallel.
from multiprocessing import Pool
def square(n):
return n * n
if __name__ == '__main__':
with Pool(4) as pool:
results = pool.map(square, [1, 2, 3, 4, 5])
print(results)
Pool
creates a pool of worker processes.- Automatically handles process creation and joining.
- Instead of manually creating
Process
objects for each task,Pool
lets you easily distribute work across multiple processes. with
ensures the pool is automatically closed and cleaned up when done (no need to callpool.close()
orpool.join()
manually).
Distributing tasks with map
results = pool.map(square, [1, 2, 3, 4, 5])
pool.map()
works like the built-inmap()
, but runs in parallel using multiple processes.- It splits the list
[1, 2, 3, 4, 5]
into chunks and assigns them to available workers. - Each worker runs
square(n)
on its assigned numbers. - The results are collected and returned in the same order as the input.
Execution Flow: Pool
- Main process starts.
- Pool of 4 worker processes is created.
map()
distributes list elements across workers.- Each worker computes the square and sends results back.
- Main process collects results and prints them.
When to use Pool
- You have many items to process and the work is CPU-bound.
- Instead of manually creating and managing many
Process
objects, you can:- Limit the number of workers (avoiding CPU overload).
- Let
Pool
handle task distribution and result gathering.
Sharing State Between Processes
from multiprocessing import Process, Value
def increment(counter):
for _ in range(1000):
counter.value += 1
if __name__ == '__main__':
counter = Value('i', 0)
processes = [Process(target=increment, args=(counter,)) for _ in range(4)]
for p in processes: p.start()
for p in processes: p.join()
print(f"Counter value: {counter.value}")
Warning: This is an example of shared state, race conditions, and synchronization in
multiprocessing
. The current code “works sometimes,” but it’s actually racy.
counter.value += 1
is not atomic. Two processes can read the same old value and both write back, losing increments. You might print something less than 4000
.
What’s going wrong?
Value('i', 0)
gives aint
in shared memory with an associated lock, but you’re not using the lock.+= 1
compiles to multiple steps (read → add → write). Without a lock, increments can collide.
Fix 1: Use the built‑in lock around each increment (correct but slower)
from multiprocessing import Value, Process
def increment(counter, n_increments):
for _ in range(n_increments):
with counter.get_lock(): # acquire/release per increment
counter.value += 1
if __name__ == '__main__':
N_PROC = 4
N = 1000
counter = Value('i', 0) # 'i' = 32-bit signed int
procs = [Process(target=increment, args=(counter,)) for _ in range(4)]
for p in procs: p.start()
for p in procs: p.join()
print(counter.value) # reliably 4000
Pros: correct.
Cons: heavy lock contention → poor scalability. If you increase the value of N
the program execution slows down significantly.
Fix 2: Batch updates (much faster, still correct)
Do the work locally in each process, then add once (or a few times) under the lock.
from multiprocessing import Value, Process
def increment_batched(counter, n_increments, batch=1000):
# all local work, no locking here
local_count = n_increments
# single critical section
with counter.get_lock():
counter.value += local_count
if __name__ == '__main__':
N_PROC = 4
N = 1000
counter = Value('i', 0)
procs = [Process(target=increment_batched, args=(counter, N)) for _ in range(N_PROC)]
for p in procs: p.start()
for p in procs: p.join()
print(counter.value) # 4000, with minimal lock time
If you really need per‑increment semantics, you can chunk (e.g., add every 100 or 1000) to balance accuracy and contention.
Fix 3: Use an explicit Lock
Value
can share a lock, but you can also share one Lock
across multiple shared values/arrays.
from multiprocessing import Value, Process, Lock
def increment(counter, lock, n):
for _ in range(n):
with lock:
counter.value += 1
if __name__ == '__main__':
lock = Lock()
counter = Value('i', 0, lock=False) # no per-Value lock; we use our own
procs = [Process(target=increment, args=(counter, lock, 1000)) for _ in range(4)]
for p in procs: p.start()
for p in procs: p.join()
print(counter.value) # 4000
This performs similar to Fix 1 shown earlier, because the computational complexity of both versions of the increment()
functions is linear. The computational complexity of Fix 2 is constant because of the simplification of the iteration into a single operation, albeit spread out over 4 separate processes.
This is why choosing efficient data structures and algorithms (DSA) matters in parallel and distributed computing.
Note: A discussion of efficient parallel and distributed algorithms design is beyond the scope of this chapter, which focuses on Python and related frameworks.
Using a Manager for Shared Dict or List
multiprocessing.Manager
in Python's standard library provides a way to create and share Python objects among multiple processes, enabling inter-process communication (IPC) and synchronization. Manager
creates proxy objects such as list
, dict
, and Queue
.
Other processes receive proxy objects for these managed objects. Any operations performed on these proxies are automatically communicated to the original object managed by the server process.
from multiprocessing import Process, Manager
def add_value(shared_list):
shared_list.append(42)
if __name__ == '__main__':
with Manager() as manager:
shared = manager.list()
p = Process(target=add_value, args=(shared,))
p.start()
p.join()
print(shared)
How this Works:
Manager()
starts a server process in the background.- When you call manager.list(), it doesn’t give you a normal Python list. Instead, it gives you a proxy object to a list that lives inside the Manager’s server process.
- When
add_value()
callsshared_list.append(42)
, the proxy sends a message to the Manager server telling it to append42
to the real list. - The manager server modifies the single shared list instance in its own memory space.
- When the parent process later prints
shared
, the proxy fetches the latest contents from the server.
Note: Managers use IPC (Inter-Process Communication) under the hood (pipes/sockets + pickling). This makes it easy but slower than raw shared memory (
Value
,Array
), because of serialization overhead.
Practical Example: multiprocessing
Let's build a mini utility that compresses multiple files using gzip
in parallel using multiprocessing
.
Step 1: compress.py
import gzip
import shutil
from multiprocessing import Pool
from pathlib import Path
def compress_file(file_path):
compressed_path = f"{file_path}.gz"
with open(file_path, 'rb') as f_in, gzip.open(compressed_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
return compressed_path
def get_files(directory):
return [str(p) for p in Path(directory).glob('*.log')]
if __name__ == '__main__':
files = get_files('data')
with Pool() as pool:
results = pool.map(compress_file, files)
print(f"Compressed: {results}")
Step 2: Create Sample Files sample_data.py
from pathlib import Path
Path("data").mkdir(exist_ok=True)
for i in range(5):
with open(f"data/sample_{i}.log", 'w') as f:
f.write("This is some text data.\n" * 1000)
Run It
python compress.py
Check the data/
directory — you should see .gz
files created in parallel.
14.3 Testing your parallel code
# test_counter.py
from multiprocessing import Value, Process
def increment(counter, n):
for _ in range(n):
with counter.get_lock():
counter.value += 1
def test_multiprocessing_counter():
counter = Value('q', 0) # 64-bit
procs = [Process(target=increment, args=(counter, 10_000)) for _ in range(8)]
for p in procs: p.start()
for p in procs: p.join()
assert counter.value == 80_000
The test code scaffolds the main execution function and invokes the test function.
14.4 Chapter Assignment:
Rewrite the Docker Container example with the following changes:
- Encapsulate the logic using OOP paradigms. The refactored code should use an Abstract Base Class (
AbstractDockerCodeRunner
). Extend the base class to be able to run both Python and JavaScript code. - Allow the user to run code within a
/scripts
directory sequentially or simultaneously. - Log all application and container outputs in a
/logs
directory. Each log file should be rotated every 24 hours (hint: Check out the previous chapter on Python Standard LIbraries. - Use proper error handling (e.g. if the code fails due to syntax error).
- Ensure that your code is fully tested.