Job

The Job component is a powerful tool designed to streamline the process of running offline tasks, making it an indispensable part for AI application development. Ideal for batch processing tasks such as video clips processing, generating embeddings from extensive text sources like Wikipedia, or generate images of your favorate charactors, the Job component simplifies complex operations into manageable, efficient tool.

Create a Job

You can create a job via the dashboard or the Lepton CLI.

Create a Job via the dashboard

Open the Job page on dashbaord and click on the Create Job button, you will see the job creation form.

Image

Then fill in the fields according to your needs. Here is a references for the fields:

  • Job Name: The name of the job.

  • Resource Shape: The resource shape of the job.

  • Worker: The number of workers for the job. For example, if you want to run a job with 4 workers, you can set the worker to 4.

  • Container Image: The container image to sepcify the runtime environment for the job. You can use the default container image or specify your own container image.

  • Run Command: The command to run in the job. Your command will be prepended with /bin/bash -c. For example, if the run command is python3 main.py, the actual command that will be run in the job is /bin/bash -c "python3 main.py".

  • Container Ports: (Optional)The ports to expose within the job. For example, if you have a tensorboard running in the job at port 6006, you can set the container ports to 6006. And set the host ports to 6006 as well. Then you can access the tensorboard via http://your_job_ip:6006.

  • Environment Variables: (Optional)The environment variables that will be injected into the Pod. You can use this to specify the environment variables that your AI application needs. Here is a list of environment variables provided by default:

    NameDescriptionValue
    LEPTON_JOB_TOTAL_WORKERSThe total number of workers for the job1
    LEPTON_JOB_WORKER_INDEXThe index of the worker, starting at 00

    If you'd like to run a job with certain libraries where it requires specific environment variables such as MASTER_ADDR and MASTER_PORT in pytorch, you can load and execute prepared script to setup the environment variables by adding the following command in the Run Command field:

    wget -O init.sh https://raw.githubusercontent.com/leptonai/scripts/main/lepton_env_to_pytorch.sh;
    chmod +x init.sh;
    source init.sh;
    
  • File System Mount: (Optiona)the file system that will be mounted to the Pod. You can access the file system in the Pod if specified.

Once you have filled in the fields, click on the "Create" button to create the job.

Create a Job via the Lepton Job CLI

You can also create a job via the Lepton CLI. Here is an example of creating a job via the Lepton Job CLI with a sample job sepcs file:

wget -O job_spec.json https://raw.githubusercontent.com/leptonai/leptonai/main/leptonai/cli/job_spec_example.json
lep job create -f job_spec.json -n my_job

For more details, please refer to the Lepton CLI reference.

View Jobs

Once the job is created, you can view the job in the Lepton Dashboard. Do notice a storage-sync job will be shown here as well if you've triggered upload from cloud job under the file system.

Image

View Job Details

You can view the details of a job by clicking on the job name in the job list. In the job details page, you can view the job status, logs, and other details. You can also acces the worker of the job by clicking on the "Terminal" button.

Image

Example: Distributed training with Pytorch

Here is an example of running a distributed pytorch job with 2 workers with a python file train.py and a shell script train.sh:

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.distributed as dist
from torchvision import datasets, transforms
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler

class MNISTModel(nn.Module):
    def __init__(self):
        super(MNISTModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

def train(rank, world_size):
    print(f"Running on rank {rank}.")
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    train_loader = DataLoader(dataset, batch_size=64, sampler=sampler)

    model = MNISTModel().to(rank)
    model = DDP(model, device_ids=[rank])
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    model.train()
    for epoch in range(1, 11):
        sampler.set_epoch(epoch)
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(rank), target.to(rank)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()

            if batch_idx % 10 == 0:
                print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

    if rank == 0:
        torch.save(model.module.state_dict(), "mnist_model.pth")
        print("Model saved as mnist_model.pth")

    dist.destroy_process_group()

def main():
    world_size = torch.cuda.device_count()
    torch.multiprocessing.spawn(train, args=(world_size,), nprocs=world_size, join=True)

if __name__ == "__main__":
    main()

Save the above script as train.py and train.sh. Then upload it to the Lepton file system via command:

# upload files to the file system under root directory
lep storage upload train.py /train.py 
lep storage upload train.sh /train.sh

Then create a job with the following job specs:

{
  "resource_shape": "gpu.a10.6xlarge",
  "container": {
    "command": [
      "/bin/bash",
      "-c",
      "chmod +x /mnt/train.sh; /mnt/train.sh"
    ]
  },
  "completions": 3,
  "parallelism": 3,
  "envs": [],
  "mounts": [
    {
      "path": "/",
      "mount_path": "/mnt"
    }
  ],
  "ttl_seconds_after_finished": 259200,
  "intra_job_communication": true
}

Then create the job via the Lepton CLI with the job specs file:

lep job create -f job_spec.json -n pytorch_job

Once the job is created, you can view the job in the Lepton Dashboard. You can view the job details and logs in the job details page. You can also access the worker of the job by clicking on the "Terminal" button.

FAQ

Q: I got error message saying "Permission denied" from the log, how to resolved it? A: Make sure the file(eg. a bash script) has the execute permission by ls -al | grep your_script.sh. You can do chmod +x your_script.sh to add the execute permission to the file before uploading it to the file system.