
Chapter Outline
Chapter 14: Concurrency and Process Management
Python offers powerful tools for running external programs (subprocess) and for executing code in parallel (multiprocessing). This chapter will help you:
- Understand how to call external commands using
subprocess - Run code in parallel using the
multiprocessingmodule - Share data between processes
- Manage process pools for efficient task distribution
- Build and test an example: A parallel file compressor
14.1 Running External Commands with subprocess
Why Use subprocess?
The subprocess module allows you to spawn new processes, connect to their input/output/error pipes, and obtain their return codes.
Run a Shell Command
pythonimport subprocess# Run a command and capture its outputresult = subprocess.run(["ls", "-l"], capture_output=True, text=True)print("Exit Code:", result.returncode)print("Output:\n", result.stdout)
capture_output=True: Captures stdout and stderr.text=True: Returns string instead of bytes.
Handle Errors
pythontry:subprocess.run(["ls", "nonexistent_folder"], check=True)except subprocess.CalledProcessError as e:print("🛑 Failed:", e)
Redirect Output to File
pythonwith open("output.txt", "w") as f:subprocess.run(["echo", "Hello, subprocess!"], stdout=f)
subprocessruns external programs; it does not execute Python code directly unless you callpython.
Passing Input to a Subprocess
pythonresult = subprocess.run(['grep', 'hello'], input='hello world\nbye', text=True, capture_output=True)print(result.stdout)
Advanced Use: Using Popen
Popen is similar to run but offers more flexibility for complex or uncommon cases.
pythonprocess = subprocess.Popen(['echo', 'Hello'], stdout=subprocess.PIPE)stdout, stderr = process.communicate()print(stdout.decode())
Use Cases for subprocess
- Running shell commands from Python
- Automating OS-level tasks (e.g., compressing files, invoking Git, calling
ffmpeg) - Logging or interacting with CLI tools
Example: Run a Docker Container
Let's look at a more realistic and complex example. We want to run some ad-hoc Python code, generated within a Python program. Running this code within the same environment as the parent program may impose some security risks. To mitigate such risks, we could run the generated code within an isolated Docker container. The following example does just that:
This example assumes:
- You have Docker installed and running.
- You have a Python base image available locally or downloadable (e.g.,
python:3.10-slim). - Your local system can run
dockercommands withoutsudo.
python1import subprocess2import os3import tempfile4import textwrap56# Step 1: Define the Python code to be run inside the container7python_code = textwrap.dedent("""8 print("Hello from inside Docker!")9 for i in range(5):10 print(f"Processing item {i+1}")11""")1213# Step 2: Create a temporary Python file14temp_dir = tempfile.mkdtemp()15python_file_path = os.path.join(temp_dir, "app.py")1617with open(python_file_path, "w") as f:18 f.write(python_code)1920print(f"[INFO] Created Python file at {python_file_path}")2122# Step 3: Launch a Docker container in detached mode23container_name = "python_subprocess_demo"2425print("[INFO] Starting Docker container...")26subprocess.run(27 ["docker", "run", "-d", "--name", container_name, "python:3.10-slim", "sleep", "60"],28 check=True29)3031# Step 4: Copy Python file into the container32print("[INFO] Copying file into container...")33subprocess.run(34 ["docker", "cp", python_file_path, f"{container_name}:/app.py"],35 check=True36)3738# Step 5: Run the Python program inside the container39print("[INFO] Executing Python program inside container...")40result = subprocess.run(41 ["docker", "exec", container_name, "python", "app.py"],42 capture_output=True,43 text=True44)4546print("[INFO] Output from container:\n", result.stdout)4748# Step 6: Cleanup - Stop and remove the container49print("[INFO] Stopping and removing container...")50subprocess.run(["docker", "rm", "-f", container_name], check=True)5152# Step 7: Remove temporary file53os.remove(python_file_path)54os.rmdir(temp_dir)55print("[INFO] Cleanup complete.")
What This Code Does
- Creates a Python file dynamically in a temporary folder.
- Starts a Docker container from
python:3.10-slimin the background. - Copies the generated Python script into the container.
- Executes the script inside the container using
docker exec. - Captures and prints the container’s output.
- Cleans up by stopping/removing the container and deleting temporary files.
Key Points
subprocess.run(..., check=True)ensures the program fails immediately if any Docker command fails.- We decouple the container lifecycle from the main program — meaning the container exists just long enough to run the script.
- This approach could be expanded to deploy AI models, run integration tests in isolation, or simulate production environments.
14.2 Parallel Execution with multiprocessing
Python’s multiprocessing module enables true parallelism using separate processes (unlike threads, which are limited by the Global Interpreter Lock).
Basic Example
pythonfrom multiprocessing import Processdef greet(name):print(f"Hello, {name}")if __name__ == '__main__':p = Process(target=greet, args=('Alice',))p.start()p.join()
target=greet→ tells the process to run thegreetfunction.args=('Alice',)→ arguments passed to the function.
Note
- Protecting the main block:
if __name__ == '__main__':- This is critical in Windows (and recommended for all platforms) when using
multiprocessing. - Without this guard, the child process might re-import the module, leading to infinite process creation.
- This is critical in Windows (and recommended for all platforms) when using
- Even if there’s only one argument, the value passed as
argsmust be a tuple (hence the comma after'Alice'). - If you don’t
join(), the main program might exit before the child process finishes running.
Execution Flow: Process
- Main process creates
p(a new process object). p.start()→ New process begins and executesgreet('Alice').p.join()→ Main process pauses until the greeting process finishes.- Output is printed from the child process.
Use Cases for multiprocessing
- CPU-bound tasks (e.g., image processing, data transformation)
- Parallel computation
- Scaling out a task across multiple cores
Using a Pool of Workers
Python’s multiprocessing.Pool is used to run the same function on multiple pieces of data in parallel.
pythonfrom multiprocessing import Pooldef square(n):return n * nif __name__ == '__main__':with Pool(4) as pool:results = pool.map(square, [1, 2, 3, 4, 5])print(results)
Poolcreates a pool of worker processes.- Automatically handles process creation and joining.
- Instead of manually creating
Processobjects for each task,Poollets you easily distribute work across multiple processes. withensures the pool is automatically closed and cleaned up when done (no need to callpool.close()orpool.join()manually).
Distributing tasks with map
pythonresults = pool.map(square, [1, 2, 3, 4, 5])
pool.map()works like the built-inmap(), but runs in parallel using multiple processes.- It splits the list
[1, 2, 3, 4, 5]into chunks and assigns them to available workers. - Each worker runs
square(n)on its assigned numbers. - The results are collected and returned in the same order as the input.
Execution Flow: Pool
- Main process starts.
- Pool of 4 worker processes is created.
map()distributes list elements across workers.- Each worker computes the square and sends results back.
- Main process collects results and prints them.
When to use Pool
- You have many items to process and the work is CPU-bound.
- Instead of manually creating and managing many
Processobjects, you can:- Limit the number of workers (avoiding CPU overload).
- Let
Poolhandle task distribution and result gathering.
Sharing State Between Processes
pythonfrom multiprocessing import Process, Valuedef increment(counter):for _ in range(1000):counter.value += 1if __name__ == '__main__':counter = Value('i', 0)processes = [Process(target=increment, args=(counter,)) for _ in range(4)]for p in processes: p.start()for p in processes: p.join()print(f"Counter value: {counter.value}")
This is an example of shared state, race conditions, and synchronization in
multiprocessing. The current code “works sometimes,” but it’s actually racy.
counter.value += 1 is not atomic. Two processes can read the same old value and both write back, losing increments. You might print something less than 4000.
What’s going wrong?
Value('i', 0)gives aintin shared memory with an associated lock, but you’re not using the lock.+= 1compiles to multiple steps (read → add → write). Without a lock, increments can collide.
Fix 1: Use the built‑in lock around each increment (correct but slower)
pythonfrom multiprocessing import Value, Processdef increment(counter, n_increments):for _ in range(n_increments):with counter.get_lock(): # acquire/release per incrementcounter.value += 1if __name__ == '__main__':N_PROC = 4N = 1000counter = Value('i', 0) # 'i' = 32-bit signed intprocs = [Process(target=increment, args=(counter,)) for _ in range(4)]for p in procs: p.start()for p in procs: p.join()print(counter.value) # reliably 4000
Pros: correct.
Cons: heavy lock contention → poor scalability. If you increase the value of N the program execution slows down significantly.
Fix 2: Batch updates (much faster, still correct)
Do the work locally in each process, then add once (or a few times) under the lock.
pythonfrom multiprocessing import Value, Processdef increment_batched(counter, n_increments, batch=1000):# all local work, no locking herelocal_count = n_increments# single critical sectionwith counter.get_lock():counter.value += local_countif __name__ == '__main__':N_PROC = 4N = 1000counter = Value('i', 0)procs = [Process(target=increment_batched, args=(counter, N)) for _ in range(N_PROC)]for p in procs: p.start()for p in procs: p.join()print(counter.value) # 4000, with minimal lock time
If you really need per‑increment semantics, you can chunk (e.g., add every 100 or 1000) to balance accuracy and contention.
Fix 3: Use an explicit Lock
Value can share a lock, but you can also share one Lock across multiple shared values/arrays.
pythonfrom multiprocessing import Value, Process, Lockdef increment(counter, lock, n):for _ in range(n):with lock:counter.value += 1if __name__ == '__main__':lock = Lock()counter = Value('i', 0, lock=False) # no per-Value lock; we use our ownprocs = [Process(target=increment, args=(counter, lock, 1000)) for _ in range(4)]for p in procs: p.start()for p in procs: p.join()print(counter.value) # 4000
This performs similar to Fix 1 shown earlier, because the computational complexity of both versions of the increment() functions is linear. The computational complexity of Fix 2 is constant because of the simplification of the iteration into a single operation, albeit spread out over 4 separate processes.
This is why choosing efficient data structures and algorithms (DSA) matters in parallel and distributed computing.
A discussion of efficient parallel and distributed algorithms design is beyond the scope of this chapter, which focuses on Python and related frameworks.
Using a Manager for Shared Dict or List
multiprocessing.Manager in Python's standard library provides a way to create and share Python objects among multiple processes, enabling inter-process communication (IPC) and synchronization. Manager creates proxy objects such as list, dict, and Queue.
Other processes receive proxy objects for these managed objects. Any operations performed on these proxies are automatically communicated to the original object managed by the server process.
pythonfrom multiprocessing import Process, Managerdef add_value(shared_list):shared_list.append(42)if __name__ == '__main__':with Manager() as manager:shared = manager.list()p = Process(target=add_value, args=(shared,))p.start()p.join()print(shared)
How this Works:
Manager()starts a server process in the background.- When you call manager.list(), it doesn’t give you a normal Python list. Instead, it gives you a proxy object to a list that lives inside the Manager’s server process.
- When
add_value()callsshared_list.append(42), the proxy sends a message to the Manager server telling it to append42to the real list. - The manager server modifies the single shared list instance in its own memory space.
- When the parent process later prints
shared, the proxy fetches the latest contents from the server.
Managers use IPC (Inter-Process Communication) under the hood (pipes/sockets + pickling). This makes it easy but slower than raw shared memory (
Value,Array), because of serialization overhead.
Practical Example: multiprocessing
Let's build a mini utility that compresses multiple files using gzip in parallel using multiprocessing.
Step 1: compress.py
compress.py1import gzip2import shutil3from multiprocessing import Pool4from pathlib import Path56def compress_file(file_path):7 compressed_path = f"{file_path}.gz"8 with open(file_path, 'rb') as f_in, gzip.open(compressed_path, 'wb') as f_out:9 shutil.copyfileobj(f_in, f_out)10 return compressed_path1112def get_files(directory):13 return [str(p) for p in Path(directory).glob('*.log')]1415if __name__ == '__main__':16 files = get_files('data')17 with Pool() as pool:18 results = pool.map(compress_file, files)19 print(f"Compressed: {results}")
Step 2: Create Sample Files sample_data.py
sample_data.py1from pathlib import Path23Path("data").mkdir(exist_ok=True)45for i in range(5):6 with open(f"data/sample_{i}.log", 'w') as f:7 f.write("This is some text data.\n" * 1000)
Run It
bashpython compress.py
Check the data/ directory — you should see .gz files created in parallel.
14.3 Testing your parallel code
test_counter.py1from multiprocessing import Value, Process23def increment(counter, n):4 for _ in range(n):5 with counter.get_lock():6 counter.value += 178def test_multiprocessing_counter():9 counter = Value('q', 0) # 64-bit10 procs = [Process(target=increment, args=(counter, 10_000)) for _ in range(8)]11 for p in procs: p.start()12 for p in procs: p.join()13 assert counter.value == 80_000
The test code scaffolds the main execution function and invokes the test function.
14.4 Chapter Assignment:
Rewrite the Docker Container example with the following changes:
- Encapsulate the logic using OOP paradigms. The refactored code should use an Abstract Base Class (
AbstractDockerCodeRunner). Extend the base class to be able to run both Python and JavaScript code. - Allow the user to run code within a
/scriptsdirectory sequentially or simultaneously. - Log all application and container outputs in a
/logsdirectory. Each log file should be rotated every 24 hours (hint: Check out the previous chapter on Python Standard LIbraries. - Use proper error handling (e.g. if the code fails due to syntax error).
- Ensure that your code is fully tested.
Check your understanding
Test your knowledge of Concurrency and Process Management in Python