Inference Batch Jobs

What are Inference Batch Jobs?

Inference batch jobs allow for automation of inference jobs on large datasets. Instead of calling our Inference API multiple times in a sequential manner, batch jobs operate on a dataset and leverage scaling and optimizations to efficiently process predictions on large numbers of images and videos.

Batch jobs can be integrated with webhooks that automatically send prediction results to a specified public HTTPs endpoint. Future integration expansions will also allow for the scheduling of cron jobs, setting up of notification and alert systems based on certain batch job events etc.

Getting Started

To get started with batch jobs, you will first need to set up a few requirements as shown below. Once the requirements have been met, you can use our Python SDK or CLI to manage the creation and scheduling of batch jobs and their resources.

With our CLI, you can quickly configure and schedule a batch job by running the following commands:

$ datature projects auth

which authenticates your project on Nexus, followed by:

$ datature batch jobs create

which will list a series of prompts to set up the necessary resources for the batch job.

Dataset Requirements

To upload a dataset for batch inference, you will need to upload a single Newline-Delimited JSON (NDJSON) file. Each line in the file should represent one item to be predicted, with the following format:

{"kind": "Image", "url": "https://signed.url/image1.jpg"}
{"kind": "Image", "url": "https://signed.url/image2.jpg"}
{"kind": "Image", "url": "https://signed.url/image3.jpg"}
...

Currently, only images are supported. If you wish to invoke an inference batch job on videos, you will need to first split the videos into individual frames, where each frame is a single line entry.

Deployment Requirements

To execute a batch job, you will first need to train a model that will be used in a deployment to handle the batch job inference. Check out our quick guides to training a model on Nexus.

Compute resources for batch jobs can be allocated based on model requirements and inference traffic demands. The full list of available types and quantity of resources that you can allocate is displayed in our Predefined Instance Types

Result Delivery Requirements

Prediction results from batch jobs can be delivered to chosen endpoints via webhooks. The endpoints must be HTTPS, and must be a fully-qualified domain name that resolves to an IPv4 address in public space.

We provide code snippets for setting up endpoints using cloud functions such as AWS Lambda:

import base64
import binascii
import hmac
import json
import logging
import os
from typing import Dict

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def verify_signature(secret_key: str, headers: Dict[str, str], body: str) -> bool:
    secret_key_bytes = base64.b64decode(secret_key)
    
    received_signature = headers["datature-webhook-signature"].encode("ascii")
    request_id = headers["datature-webhook-request-id"]
    request_time = headers["datature-webhook-request-time"]

    message = (
        f"Datature-Webhook-Request-ID={request_id}&"
        f"Datature-Webhook-Request-Time={request_time}&"
        f"RawBody={body}"
    ).encode("utf-8")

    calculated_signature = binascii.hexlify(hmac.digest(secret_key_bytes, message, "sha256"))

    return hmac.compare_digest(calculated_signature, received_signature)


def lambda_handler(event, context):
    # Get the Http Method from API Request
    http_method = event["requestContext"]["http"]["method"]

    # Get the Path details from API Request
    path = event["requestContext"]["http"]["path"]

    if http_method == "POST":
        headers = event.get("headers", {})
        body = event.get("body", {})

        # Use the generated webhook secret key to validate authenticity of results
        # You can choose to save the webhook secret key as an environment variable,
        # or you can directly read it from a file.
        secret_key = os.getenv("SECRET_KEY")
        # or secret_key = "..."
        
        if verify_signature(secret_key, headers, body):
            # This simply logs the results to AWS CloudWatch
            # You can modify this to perform postprocessing on the results,
            # or save them to a database
            logger.info(json.loads(body))
            
            # Return any 2xx status to indicate successful receipt of results
            return {"statusCode": 204, "body": "Results received successfully!"}
          
        # Return 401 if the provided secret key does not match
        return {"statusCode": 401, "body": "Unauthorized Access: Invalid secret key!"}

    return {"statusCode": 204, "body": "This is a passthrough response for non-POST methods."}