see also:
# Import PyTorch import torch from torch import nn # Import torchvision import torchvision # from torchvision import datasets from torchvision import transforms from torchvision.transforms import ToTensor # Import matplotlib for visualization import matplotlib.pyplot as plt device = "cuda" if torch.cuda.is_available() else "cpu" #create device agnostic code mechanism #print(device)
from timeit import default_timer as timer
def print_train_time(start: float,
end: float,
device: torch.device = None):
"""Prints difference between start and end time."""
total_time = end - start
print(f"Train time on {device}: {total_time:.3f} seconds")
return total_time
# Import tqdm for progress bar
from tqdm.auto import tqdm
# Calculate accuracy (a classification metric)
def accuracy_fn(y_true, y_pred):
"""Calculates accuracy between truth labels and predictions.
Args:
y_true (torch.Tensor): Truth labels for predictions.
y_pred (torch.Tensor): Predictions to be compared to predictions.
Returns:
[torch.float]: Accuracy value between y_true and y_pred, e.g. 78.45
"""
correct = torch.eq(y_true, y_pred).sum().item()
acc = (correct / len(y_pred)) * 100
return acc
# Setup loss function loss_fn = nn.CrossEntropyLoss() #one could set optimizer here but as it takes the model as a input, I will place this inside the batch step section which alos takes a model as a parameter
# create a generic batch training loop
def train_step(model: torch.nn.Module,
data_loader: torch.utils.data.DataLoader,
loss_fn: torch.nn.Module,
#optimizer: torch.optim.Optimizer,
accuracy_fn,
device: torch.device = device):
"""Performs a training with model trying to learn on data_loader."""
train_loss, train_acc = 0, 0
torch.manual_seed(42)
torch.cuda.manual_seed(42)
optimizer = torch.optim.SGD(params=model.parameters(),
lr=0.1)
model.to(device)
# Put model into training mode
model.train()
# Add a loop to loop through the training batches
for batch, (X, y) in enumerate(data_loader):
# Put data on target device
X, y = X.to(device), y.to(device)
# 1. Forward pass (outputs the raw logits from the model)
y_pred = model(X)
# 2. Calculate loss and accuracy (per batch)
loss = loss_fn(y_pred, y)
train_loss += loss # accumulate train loss
train_acc += accuracy_fn(y_true=y,
y_pred=y_pred.argmax(dim=1)) # go from logits -> prediction labels
# 3. Optimizer zero grad
optimizer.zero_grad()
# 4. Loss backward
loss.backward()
# 5. Optimizer step (update the model's parameters once *per batch*)
optimizer.step()
# Divide total train loss and acc by length of train dataloader
train_loss /= len(data_loader)
train_acc /= len(data_loader)
print(f"Train loss: {train_loss:.5f} | Train acc: {train_acc:.2f}%")
# create a generic batch testing loop
def test_step(model: torch.nn.Module,
data_loader: torch.utils.data.DataLoader,
loss_fn: torch.nn.Module,
accuracy_fn,
device: torch.device = device):
"""Performs a testing loop step on model going over data_loader."""
test_loss, test_acc = 0, 0
torch.manual_seed(42)
torch.cuda.manual_seed(42)
# Put the model in eval mode
model.eval()
# Turn on inference mode context manager
with torch.inference_mode():
for X, y in data_loader:
# Send the data to the target device
X, y = X.to(device), y.to(device)
# 1. Forward pass (outputs raw logits)
test_pred = model(X)
# 2. Calculuate the loss/acc
test_loss += loss_fn(test_pred, y)
test_acc += accuracy_fn(y_true=y,
y_pred=test_pred.argmax(dim=1)) # go from logits -> prediction labels
# Adjust metrics and print out
test_loss /= len(data_loader)
test_acc /= len(data_loader)
print(f"Test loss: {test_loss:.5f} | Test acc: {test_acc:.2f}%\n")
#create image datasets
from torchvision import datasets
train_data = datasets.FashionMNIST(
root="data", # where to download data to?
train=True, # do we want the training dataset?
download=True, # do we want to download yes/no?
transform=torchvision.transforms.ToTensor(), # how do we want to transform the data?
target_transform=None # how do we want to transform the labels/targets?
)
test_data = datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=ToTensor(),
target_transform=None
)
len(train_data), len(test_data) class_names = train_data.classes class_names class_to_idx = train_data.class_to_idx class_to_idx
# Plot some images # torch.manual_seed(42) fig = plt.figure(figsize=(9, 9)) rows, cols = 4, 4 for i in range(1, rows*cols+1): random_idx = torch.randint(0, len(train_data), size=[1]).item() img, label = train_data[random_idx] fig.add_subplot(rows, cols, i) plt.imshow(img.squeeze(), cmap="gray") plt.title(class_names[label]) plt.axis(False);
#prepare dataloader to create batches of images
from torch.utils.data import DataLoader
# Setup the batch size hyperparameter
BATCH_SIZE = 32
# Turn datasets into iterables (batches)
train_dataloader = DataLoader(dataset=train_data,
batch_size=BATCH_SIZE,
shuffle=True) #random selected batches in case images are in class order
test_dataloader = DataLoader(dataset=test_data,
batch_size=BATCH_SIZE,
shuffle=False) #no need to shuffle in test mode
train_dataloader, test_dataloader
print(f"DataLoaders: {train_dataloader, test_dataloader}")
print(f"Length of train_dataloader: {len(train_dataloader)} batches of {BATCH_SIZE}...")
print(f"Length of test_dataloader: {len(test_dataloader)} batches of {BATCH_SIZE}...")
train_features_batch, train_labels_batch = next(iter(train_dataloader)) train_features_batch.shape, train_labels_batch.shape
# Show a sample
# torch.manual_seed(42)
random_idx = torch.randint(0, len(train_features_batch), size=[1]).item()
img, label = train_features_batch[random_idx], train_labels_batch[random_idx]
plt.imshow(img.squeeze(), cmap="gray")
plt.title(class_names[label])
plt.axis(False)
print(f"Image size: {img.shape}")
print(f"Label: {label}, label size: {label.shape}")
#Now train this model
# Set the seed and start the timer
def train_and_test_model(model: torch.nn.Module,
epochs: int):
torch.manual_seed(42)
torch.cuda.manual_seed(42)
train_time_start = timer()
# Set epochs
#epochs = 3
# Create a optimization and evaluation loop using train_step() and test_step()
for epoch in tqdm(range(epochs)):
print(f"Epoch: {epoch}\n----------")
train_step(model=model,
data_loader=train_dataloader,
loss_fn=loss_fn,
#optimizer=optimizer,
accuracy_fn=accuracy_fn,
device=device)
test_step(model=model,
data_loader=test_dataloader,
loss_fn=loss_fn,
accuracy_fn=accuracy_fn,
device=device)
train_time_end = timer()
total_train_time = print_train_time(start=train_time_start,
end=train_time_end,
device=str(next(model.parameters()).device))
# Create a convolutional neural network
class FashionMNISTModelV2(nn.Module):
"""
Model architecture that replicates the TinyVGG
model from CNN explainer website.
"""
def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
super().__init__()
self.conv_block_1 = nn.Sequential(
# Create a conv layer - https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
nn.Conv2d(in_channels=input_shape,
out_channels=hidden_units,
kernel_size=3,
stride=1,
padding=1), # values we can set ourselves in our NN's are called hyperparameters
nn.ReLU(),
nn.Conv2d(in_channels=hidden_units,
out_channels=hidden_units,
kernel_size=3,
stride=1,
padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2) #kernel of 2 will get the max value of 4 pixels (2x2) and output image will thus be a quarter the size of the original image area
)
self.conv_block_2 = nn.Sequential(
nn.Conv2d(in_channels=hidden_units,
out_channels=hidden_units,
kernel_size=3,
stride=1,
padding=1),
nn.ReLU(),
nn.Conv2d(in_channels=hidden_units,
out_channels=hidden_units,
kernel_size=3,
stride=1,
padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2)
)
finalheight = 7
finalwidth = 7
''' there's a trick to calculating these values...as it equals the outputs of previous block
- in this case it is [1,hidden_units,7,7] - you can perform a complex calculation to ascertain this or run a dummy image through
- and check the output in the def forward for conv_block_2
- linear uses matrix multiplication so the matrices must be compatible for this
- last value or 1st matrix must equal first value of last matrix - in this case 490 '''
self.classifier = nn.Sequential(
nn.Flatten(),
nn.Linear(in_features=hidden_units*finalheight*finalwidth,
out_features=output_shape)
)
def forward(self, x):
x = self.conv_block_1(x)
# print(f"Output shape of conv_block_1: {x.shape}")
x = self.conv_block_2(x)
# print(f"Output shape of conv_block_2: {x.shape}")
x = self.classifier(x)
# print(f"Output shape of classifier: {x.shape}")
return x
#torch.manual_seed(42)
CNNmodel = FashionMNISTModelV2(input_shape=1, #this is the number of color channels
hidden_units=10, #this is the number of neurons to use for each layer
output_shape=len(class_names)).to(device)
train_and_test_model(CNNmodel,epochs=3) #can increase epochs to get better accuracy but will take longer
#create a model performance dictionary
def eval_model(model: torch.nn.Module,
data_loader: torch.utils.data.DataLoader,
loss_fn: torch.nn.Module,
accuracy_fn):
"""Returns a dictionary containing the results of model predicting on data_loader."""
loss, acc = 0, 0
torch.manual_seed(42)
torch.cuda.manual_seed(42)
model.to(device)
model.eval()
with torch.inference_mode():
for X, y in tqdm(data_loader):
# Put data on target device
X, y = X.to(device), y.to(device)
# Make predictions
y_pred = model(X)
# Accumulate the loss and acc values per batch
loss += loss_fn(y_pred, y)
acc += accuracy_fn(y_true=y,
y_pred=y_pred.argmax(dim=1))
# Scale loss and acc to find the average loss/acc per batch
loss /= len(data_loader)
acc /= len(data_loader)
return {"model_name": model.__class__.__name__, # only works when model was created with a class
"model_loss": loss.item(),
"model_acc": acc}
# Calculate model 0 results on test dataset use as follows:
"""
torch.manual_seed(42) #optional
model_0_results = eval_model(model=model_0,
data_loader=test_dataloader,
loss_fn=loss_fn,
accuracy_fn=accuracy_fn)
model_0_results
"""
CNNModel_results = eval_model(model=CNNmodel,
data_loader=test_dataloader,
loss_fn=loss_fn,
accuracy_fn=accuracy_fn)
CNNModel_results
# 1. Make predictions with trained model
def make_predictions(model: torch.nn.Module,
data_loader: torch.utils.data.DataLoader):
"""Returns a y_pred tensor containing the results of model predicting on data_loader."""
y_preds = []
model.eval()
with torch.inference_mode():
for X, y in tqdm(data_loader, desc="Making predictions..."):
# Send the data and targets to target device
X, y = X.to(device), y.to(device)
# Do the forward pass
y_logit = model(X)
# Turn predictions from logits -> prediction probabilities -> prediction labels
y_pred = torch.softmax(y_logit.squeeze(), dim=0).argmax(dim=1)
# Put prediction on CPU for evaluation
y_preds.append(y_pred.cpu())
# Concatenate list of predictions into a tensor
return torch.cat(y_preds)
y_pred_tensor = make_predictions(CNNmodel,test_dataloader)
import torchmetrics, mlxtend
from torchmetrics import ConfusionMatrix
from mlxtend.plotting import plot_confusion_matrix
# 2. Setup confusion instance and compare predictions to targets
confmat = ConfusionMatrix(task="multiclass",num_classes=len(class_names))
confmat_tensor = confmat(preds=y_pred_tensor,
target=test_data.targets)
# 3. Plot the confusion matrix
fig, ax = plot_confusion_matrix(
conf_mat=confmat_tensor.numpy(), # matplotlib likes working with numpy
class_names=class_names,
figsize=(10, 7)
)
from pathlib import Path
# Create model dictionary path
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents=True,
exist_ok=True)
# Create model save
MODEL_NAME = "CNNmodel.pth"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME
# Save the model state dict
print(f"Saving model to: {MODEL_SAVE_PATH}")
torch.save(obj=CNNmodel.state_dict(),
f=MODEL_SAVE_PATH)
#load the saved model
# Create a new instance
torch.manual_seed(42)
#1st need to create instance the same as the saved model type
loaded_model_2 = FashionMNISTModelV2(input_shape=1,
hidden_units=10,
output_shape=len(class_names))
# Load in the save state_dict()
loaded_model_2.load_state_dict(torch.load(f=MODEL_SAVE_PATH))
# Send the model to the target device
loaded_model_2.to(device)
import pandas as pd
#using the evaluation dictionaries
compare_results = pd.DataFrame(model_0_results,
model_1_results,
model_2_results)
#add in training times:
compare_results["training_time"] = (model_0_training_time,
model_1_training_time,
model_2_training_time)
compare_results
#visualise in a chart
compare_results.set_index("model_name")["model_acc"].plot(kind="barh")
plt.xlabel="accuracy (%)"
plt.ylabel="model"