Files
onepanel/db/yaml/workflows/pytorch-mnist-training/20201221194344.yaml

217 lines
7.7 KiB
YAML

metadata:
name: "PyTorch Training"
kind: Workflow
version: 20201221194344
action: update
source: "https://github.com/onepanelio/templates/blob/master/workflows/pytorch-mnist-training/"
labels:
"created-by": "system"
framework: pytorch
spec:
arguments:
parameters:
- name: epochs
value: '10'
- displayName: Node pool
hint: Name of node pool or group to run this workflow task
type: select.nodepool
name: sys-node-pool
value: "{{.DefaultNodePoolOption}}"
visibility: public
required: true
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: train-model
template: train-model
- name: train-model
# Indicates that we want to push files in /mnt/output to object storage
outputs:
artifacts:
- name: output
path: /mnt/output
optional: true
script:
image: onepanel/dl:0.17.0
command:
- python
- '-u'
source: |
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
from torch.utils.tensorboard import SummaryWriter
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(model, device, train_loader, optimizer, epoch, batch_size, writer):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 10 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
writer.add_scalar('training loss', loss.item(), epoch)
def test(model, device, test_loader, epoch, writer):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
loss = test_loss / len(test_loader.dataset)
accuracy = correct / len(test_loader.dataset)
print('\nTest set: Average loss: {}, Accuracy: {}\n'.format(
loss, accuracy))
# Store metrics for this task
metrics = [
{'name': 'accuracy', 'value': accuracy},
{'name': 'loss', 'value': loss}
]
with open('/tmp/sys-metrics.json', 'w') as f:
json.dump(metrics, f)
def main(params):
writer = SummaryWriter(log_dir='/mnt/output/tensorboard')
use_cuda = torch.cuda.is_available()
torch.manual_seed(params['seed'])
device = torch.device('cuda' if use_cuda else 'cpu')
train_kwargs = {'batch_size': params['batch_size']}
test_kwargs = {'batch_size': params['test_batch_size']}
if use_cuda:
cuda_kwargs = {'num_workers': 1,
'pin_memory': True,
'shuffle': True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
dataset1 = datasets.MNIST('/mnt/data', train=True, download=True,
transform=transform)
dataset2 = datasets.MNIST('/mnt/data', train=False,
transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=params['lr'])
scheduler = StepLR(optimizer, step_size=1, gamma=params['gamma'])
for epoch in range(1, params['epochs'] + 1):
train(model, device, train_loader, optimizer, epoch, params['batch_size'], writer)
test(model, device, test_loader, epoch, writer)
scheduler.step()
# Save model
torch.save(model.state_dict(), '/mnt/output/model.pt')
writer.close()
if __name__ == '__main__':
params = {
'seed': 1,
'batch_size': 64,
'test_batch_size': 1000,
'epochs': {{workflow.parameters.epochs}},
'lr': 0.001,
'gamma': 0.7,
}
main(params)
volumeMounts:
# TensorBoard sidecar will automatically mount these volumes
# The `data` volume is mounted for saving datasets
# The `output` volume is mounted to save model output and share TensorBoard logs
- name: data
mountPath: /mnt/data
- name: output
mountPath: /mnt/output
nodeSelector:
beta.kubernetes.io/instance-type: '{{workflow.parameters.sys-node-pool}}'
sidecars:
- name: tensorboard
image: tensorflow/tensorflow:2.3.0
command:
- sh
- '-c'
env:
- name: ONEPANEL_INTERACTIVE_SIDECAR
value: 'true'
args:
# Read logs from /mnt/output - this directory is auto-mounted from volumeMounts
- tensorboard --logdir /mnt/output/tensorboard
ports:
- containerPort: 6006
name: tensorboard
volumeClaimTemplates:
# Provision volumes for storing data and output
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 2Gi
- metadata:
name: output
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 2Gi