Job

The Job component is a powerful tool designed to streamline the process of running offline tasks, making it an indispensable part for AI application development. Ideal for batch processing tasks such as video clips processing, generating embeddings from extensive text sources like Wikipedia, or generate images of your favorate charactors, the Job component simplifies complex operations into manageable, efficient tool.

Create a Job

You can create a job via the dashboard or the Lepton CLI.

Create a Job via the dashboard

Open the Job page on dashbaord and click on the Create Job button, you will see the job creation form.

Image

Then fill in the fields according to your needs. Here is a references for the fields:

  • Job Name: The name of the job.
  • Resource Shape: The resource shape of the job.
  • Worker: The number of workers for the job. For example, if you want to run a job with 4 workers, you can set the worker to 4.
  • Node Groups: (Optional)The node groups to run the job. You can specify the node groups to run the job. If you don't specify the node groups, the job will run on the on-demand capacity.
  • Container Image: The container image to sepcify the runtime environment for the job. You can use the default container image or specify your own container image. If you are using private image, you'll need to specify the private image registry auth as well.
  • Private Image Registry Auth: (Optional)The private image registry auth to pull the container image from the private image registry. You can use this to pull the container image from the private image registry.
  • Run Command: The command to run in the job. Your command will be prepended with /bin/bash -c. For example, if the run command is python3 main.py, the actual command that will be run in the job is /bin/bash -c "python3 main.py".
  • Container Ports: (Optional)The ports to expose within the job. For example, if you have a tensorboard running in the job at port 6006, you can set the container ports to 6006. And set the host ports to 6006 as well. Then you can access the tensorboard via http://your_job_ip:6006.

Advanced Options

There are some advanced options that you can apply on the job to meet customized scheduling requirements:

  • Environment Variables: (Optional)The environment variables that will be injected into the Pod. You can use this to specify the environment variables that your AI application needs. Here is a list of environment variables provided by default:

    NameDescriptionValue
    LEPTON_JOB_TOTAL_WORKERSThe total number of workers for the job1
    LEPTON_JOB_WORKER_INDEXThe index of the worker, starting at 00

    If you'd like to run a job with certain libraries where it requires specific environment variables such as MASTER_ADDR and MASTER_PORT in pytorch, you can load and execute prepared script to setup the environment variables by adding the following command in the Run Command field:

    wget -O init.sh https://raw.githubusercontent.com/leptonai/scripts/main/lepton_env_to_pytorch.sh;
    chmod +x init.sh;
    source init.sh;
    
  • File System Mount: (Optiona)the file system that will be mounted to the Pod. You can access the file system in the Pod if specified.

  • Shared Memory Size: (Optional)The shared memory size for the job. You can use this to specify the shared memory size for the job.

  • Max Replica Faliure Retry: (Optional)The maximum number of replica failure retries for the job. You can use this to specify the maximum number of replica failure retries for the job. Each replica(worker) will be retried for the maximum number of times if it fails.

  • Max Job Faliure Retry: (Optional)The maximum number of job failure retries for the job. You can use this to specify the maximum number of job failure retries for the job. The job will be retried for the maximum number of times if it fails.

Once you have filled in the fields, click on the "Create" button to create the job.

Create a Job via the Lepton Job CLI

You can also create a job via the Lepton CLI. Here is an example of creating a job via the Lepton Job CLI with a sample job sepcs file:

wget -O job_spec.json https://raw.githubusercontent.com/leptonai/leptonai/main/leptonai/cli/job_spec_example.json
lep job create -f job_spec.json -n my_job

For more details, please refer to the Lepton CLI reference.

View Jobs

Once the job is created, you can view the job in the Lepton Dashboard. Do notice a storage-sync job will be shown here as well if you've triggered upload from cloud job under the file system.

Image

View Job Details

You can view the details of a job by clicking on the job name in the job list. In the job details page, you can view the job status, logs, and other details. You can also acces the worker of the job by clicking on the "Terminal" button.

Image

Example: Distributed training with Pytorch

Here is an example of running a distributed pytorch job with 2 workers with a python file train.py and a shell script train.sh:

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.distributed as dist
from torchvision import datasets, transforms
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler

class MNISTModel(nn.Module):
    def __init__(self):
        super(MNISTModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

def train(rank, world_size):
    print(f"Running on rank {rank}.")
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    train_loader = DataLoader(dataset, batch_size=64, sampler=sampler)

    model = MNISTModel().to(rank)
    model = DDP(model, device_ids=[rank])
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    model.train()
    for epoch in range(1, 11):
        sampler.set_epoch(epoch)
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(rank), target.to(rank)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()

            if batch_idx % 10 == 0:
                print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

    if rank == 0:
        torch.save(model.module.state_dict(), "mnist_model.pth")
        print("Model saved as mnist_model.pth")

    dist.destroy_process_group()

def main():
    world_size = torch.cuda.device_count()
    torch.multiprocessing.spawn(train, args=(world_size,), nprocs=world_size, join=True)

if __name__ == "__main__":
    main()

Save the above script as train.py and train.sh. Then upload it to the Lepton file system via command:

# upload files to the file system under root directory
lep storage upload train.py /train.py
lep storage upload train.sh /train.sh

Then create a job with the following job specs:

{
  "resource_shape": "gpu.a10.6xlarge",
  "container": {
    "command": [
      "/bin/bash",
      "-c",
      "chmod +x /mnt/train.sh; /mnt/train.sh"
    ]
  },
  "completions": 3,
  "parallelism": 3,
  "envs": [],
  "mounts": [
    {
      "path": "/",
      "mount_path": "/mnt"
    }
  ],
  "ttl_seconds_after_finished": 259200,
  "intra_job_communication": true
}

Then create the job via the Lepton CLI with the job specs file:

lep job create -f job_spec.json -n pytorch_job

Once the job is created, you can view the job in the Lepton Dashboard. You can view the job details and logs in the job details page. You can also access the worker of the job by clicking on the "Terminal" button.

Example: Running jobs with conda environment

Conda is known for its ability to create isolated environments for different projects. Here is an example of running a job with conda environment management:

Let's say we have a Pod running with a conda installed image and file system mounted at /mnt. We can create a conda environment with pytorch and pack it to the file system.

# Create a conda environment with pytorch
conda create -n foo python=3.10.12
conda activate foo
conda install pytorch torchvision torchaudio pytorch-cuda=12.1 -c pytorch -c nvidia

# Pack the conda environment to file system
pip install conda-pack
conda pack -n foo -o /mnt/foo.tar.gz

The foo.tar.gz file contains the conda environment. You can load the conda environment in the job by adding the following command in the Run Command field during job creation. Making sure conda is installed in the container image.

# Load the conda environment
mkdir -p foo
cp /mnt/foo.tar.gz ./
tar -xzf foo.tar.gz -C foo

# Activate the conda environment
source foo/bin/activate

# Verify that the environment was installed
conda list

Once the job is created, you can view the job in the Web UI.

FAQ

Q: I got error message saying "Permission denied" from the log, how to resolved it? A: Make sure the file(eg. a bash script) has the execute permission by ls -al | grep your_script.sh. You can do chmod +x your_script.sh to add the execute permission to the file before uploading it to the file system.