Rohan, 16, from Pune had just finished Class 10. His plant disease classifier took 45 minutes to train on Google Colab's free tier. He asked his older cousin Priya, who works at a Pune AI startup, why it was so slow.
Priya looked at his code and laughed — not unkindly. "Rohan, you're using Python loops inside your data pipeline. NumPy vectorisation would make this 200x faster. And your model function re-loads the tokeniser every call — that's why your inference is slow too."
Rohan spent one week learning advanced Python for AI. His training loop went from 45 minutes to 4 minutes. His inference API went from 800ms per request to 12ms. The model didn't change. The Python did.
Python loops are slow because Python is interpreted and every operation has overhead. NumPy operations run in compiled C — they process entire arrays at once. The difference is dramatic:
❌ SLOW — Python Loop
# Process 1 million pixels
result = []
for pixel in image_data:
result.append(pixel / 255.0)
# Time: ~2.1 seconds
✅ FAST — NumPy Vectorised
import numpy as np
result = image_data / 255.0
# Time: ~0.003 seconds
# 700x faster!
The rule: Never loop over NumPy arrays element-by-element. Use broadcasting, universal functions (ufuncs), and vectorised operations instead.
import numpy as np
import time
# Generate 1 million random values
data = np.random.randn(1_000_000).astype(np.float32)
# SLOW: Python loop
start = time.time()
result_slow = [x**2 + 2*x + 1 for x in data]
print(f"Loop: {time.time()-start:.3f}s")
# FAST: NumPy vectorised (broadcasted polynomial)
start = time.time()
result_fast = data**2 + 2*data + 1
print(f"NumPy: {time.time()-start:.4f}s")
# ADVANCED: numpy.polynomial for even faster
# result = np.polyval([1, 2, 1], data)
# Broadcasting example: normalise a (1000, 224, 224, 3) image batch
images = np.random.randint(0, 256, (1000, 224, 224, 3), dtype=np.uint8)
mean = np.array([0.485, 0.456, 0.406]) # ImageNet mean
std = np.array([0.229, 0.224, 0.225]) # ImageNet std
# Broadcasting: (1000,224,224,3) - (3,) = works! NumPy broadcasts automatically
normalised = (images / 255.0 - mean) / std
print(f"normalised shape: {normalised.shape}") # (1000, 224, 224, 3)
Key NumPy operations every AI developer must know:
- np.einsum('ij,jk->ik', A, B) — fast matrix multiply with Einstein notation
- np.where(condition, x, y) — vectorised if-else
- np.argsort, np.argmax, np.argmin — indices of sorted/extreme values
- np.clip(arr, 0, 1) — clamp values (used in normalisation)
- np.concatenate, np.stack, np.vstack — combining arrays without loops
Decorators let you add functionality to functions without modifying their code. They're used everywhere in AI: caching model outputs, timing functions, validating inputs, logging predictions.
import time
import functools
# ── Decorator 1: Timer ──────────────────────────────────────────
def timer(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} took {elapsed:.4f}s")
return result
return wrapper
@timer
def run_inference(model, image_batch):
return model.predict(image_batch)
# ── Decorator 2: Cache with TTL ─────────────────────────────────
import hashlib, json, time as _time
def cache_result(ttl_seconds=300):
"""Cache function results for ttl_seconds."""
cache = {}
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = hashlib.md5(
json.dumps((args, sorted(kwargs.items())),
default=str).encode()
).hexdigest()
if key in cache:
result, ts = cache[key]
if _time.time() - ts < ttl_seconds:
return result
result = func(*args, **kwargs)
cache[key] = (result, _time.time())
return result
return wrapper
return decorator
@cache_result(ttl_seconds=600)
def embed_text(text: str) -> list[float]:
"""Embed text — expensive API call, cached for 10 min."""
# calls Gemini / OpenAI embedding API
return [0.1, 0.3, ...] # placeholder
# ── Decorator 3: Input validator ────────────────────────────────
def validate_image(func):
@functools.wraps(func)
def wrapper(image_array, *args, **kwargs):
import numpy as np
if not isinstance(image_array, np.ndarray):
raise TypeError(f"Expected np.ndarray, got {type(image_array)}")
if image_array.ndim not in (3, 4):
raise ValueError(f"Expected (H,W,C) or (N,H,W,C), got {image_array.shape}")
if image_array.max() > 1.0:
image_array = image_array / 255.0 # auto-normalise
return func(image_array, *args, **kwargs)
return wrapper
@validate_image
def classify_image(image_array, model):
return model.predict(image_array[np.newaxis, ...])
When your dataset is 50GB of images, you cannot load it all into RAM. Generators produce data one item at a time, keeping memory usage flat regardless of dataset size.
from pathlib import Path
from PIL import Image
import numpy as np
# ── Generator-based image loader ───────────────────────────────
def image_batch_generator(image_dir: str, batch_size: int = 32):
"""Yields (images, labels) batches without loading full dataset."""
paths = list(Path(image_dir).rglob("*.jpg"))
# Extract labels from folder names: dataset/roses/img1.jpg → "roses"
labels = [p.parent.name for p in paths]
for i in range(0, len(paths), batch_size):
batch_paths = paths[i:i+batch_size]
batch_labels = labels[i:i+batch_size]
images = []
for p in batch_paths:
img = Image.open(p).resize((224, 224))
images.append(np.array(img) / 255.0)
yield np.array(images, dtype=np.float32), batch_labels
# Usage — memory stays flat even for 100k images:
for images, labels in image_batch_generator("dataset/", batch_size=32):
predictions = model.predict(images) # process one batch, discard
# images goes out of scope → GC collects it
# ── Generator expression (even simpler) ────────────────────────
squares = (x**2 for x in range(1_000_000)) # uses ~56 bytes
# vs list comprehension: [x**2 for x in range(1_000_000)] uses ~8MB
# ── yield from — delegate to sub-generators ────────────────────
def multi_dir_loader(*dirs):
for d in dirs:
yield from image_batch_generator(d)
Keras and PyTorch DataLoaders are built on the same generator pattern. When you subclass tf.keras.utils.Sequence or torch.utils.data.Dataset, you're implementing a generator interface.
Before optimising, measure. Premature optimisation wastes time. Profiling tells you exactly which lines are slow.
# ── Method 1: cProfile (function-level) ────────────────────────
import cProfile, pstats
profiler = cProfile.Profile()
profiler.enable()
# --- code to profile ---
run_training_loop(model, dataset, epochs=1)
# -----------------------
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # top 10 slowest functions
# Output example:
# ncalls tottime percall cumtime percall filename:lineno(function)
# 1000 5.231 0.005 5.231 0.005 data_utils.py:42(load_image)
# → load_image is called 1000 times taking 5s total → vectorise it!
# ── Method 2: line_profiler (line-by-line) ──────────────────────
# pip install line_profiler
# Add @profile decorator, run: kernprof -l -v your_script.py
# ── Method 3: memory_profiler ──────────────────────────────────
# pip install memory_profiler
from memory_profiler import profile
@profile
def prepare_dataset(path):
images = load_all_images(path) # line 1: +2.4 GB
features = extract_features(images) # line 2: +800 MB
del images # line 3: -2.4 GB ← important!
return features
# ── Method 4: timeit for micro-benchmarks ──────────────────────
import timeit
loop_time = timeit.timeit('[x**2 for x in range(1000)]', number=10000)
numpy_time = timeit.timeit('import numpy as np; np.arange(1000)**2', number=10000)
print(f"Loop: {loop_time:.3f}s | NumPy: {numpy_time:.3f}s")
| Tool | Best for | Output |
|---|---|---|
| cProfile | Finding slowest functions | Cumulative time per function |
| line_profiler | Finding slowest lines inside a function | Time per line |
| memory_profiler | Tracking RAM growth | MB increment per line |
| timeit | Comparing two implementations | Seconds for N repetitions |
Type hints make your AI code self-documenting and enable IDE autocomplete. Dataclasses give you structured configuration objects — much better than passing 15 arguments to a function.
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class TrainingConfig:
"""All hyperparameters for a training run."""
model_name: str = "MobileNetV2"
num_classes: int = 10
image_size: int = 224
batch_size: int = 32
epochs: int = 20
learning_rate: float = 1e-4
dropout_rate: float = 0.3
weight_decay: float = 1e-5
augment: bool = True
checkpoint_dir: str = "checkpoints/"
tags: list[str] = field(default_factory=list)
notes: Optional[str] = None
def __post_init__(self):
if self.learning_rate <= 0:
raise ValueError("learning_rate must be positive")
if self.batch_size not in [8, 16, 32, 64, 128]:
raise ValueError("batch_size must be a power of 2 between 8–128")
# Clean function signatures with type hints
def train(
config: TrainingConfig,
train_dir: str,
val_dir: str,
) -> dict[str, float]:
"""Returns {val_accuracy, val_loss, best_epoch}."""
...
# Usage — clear, self-documenting:
cfg = TrainingConfig(
model_name="EfficientNetB0",
num_classes=38,
learning_rate=3e-4,
tags=["plantvillage", "class10-demo"],
notes="First run with augmentation"
)
results = train(cfg, "data/train", "data/val")
print(results["val_accuracy"]) # IDE knows this is float
AI code regularly deals with resources that must be cleaned up: GPU memory, file handles, database connections, MLflow runs. Context managers guarantee cleanup even when exceptions occur.
import contextlib
import mlflow
# ── Custom context manager using contextlib.contextmanager ──────
@contextlib.contextmanager
def gpu_memory_guard(name: str):
"""Clears GPU cache before and after a block."""
import gc
try:
import torch
torch.cuda.empty_cache()
except ImportError:
pass
print(f"[GPU] Starting: {name}")
try:
yield
finally:
try:
import torch
torch.cuda.empty_cache()
gc.collect()
except ImportError:
pass
print(f"[GPU] Done: {name}")
# Usage:
with gpu_memory_guard("inference_batch_100"):
predictions = model(large_batch)
# Even if model() raises an exception, GPU cache is cleared
# ── MLflow run as context manager ──────────────────────────────
with mlflow.start_run(run_name="experiment_v3"):
mlflow.log_param("lr", 3e-4)
history = model.fit(train_ds, validation_data=val_ds, epochs=20)
mlflow.log_metric("val_accuracy", max(history.history["val_accuracy"]))
mlflow.keras.log_model(model, "model")
# Run ends automatically — even on exception
# ── File management with context manager ───────────────────────
with open("predictions.jsonl", "w") as f:
for batch in data_loader:
preds = model.predict(batch)
for p in preds:
f.write(json.dumps(p.tolist()) + "\n")
# File is closed automatically