Job Management
create
create
Creates a new batch job with specified resources that runs batch inference on a specified dataset and sends the prediction results to a specified webhook endpoint URL.
Arguments
Attribute | Type | Description |
---|---|---|
name | str | Name of the batch job. |
job_options | JobOptions | Batch job configuration. |
Return
Job
object containing the metadata of the newly-scheduled batch job with the following structure:
Job(
id='batchjob_4e5566d0-6538-441a-9fa5-f3495516646a',
object='batch_job',
name='my-batch-job-08-15',
project_id='proj_ca5fe71e7592bbcf7705ea36e4f29ed4',
spec=JobSpec(
start_at_time=1723734419985,
stop_at_time=1723993619985,
dataset_id='dataset_652ef566-af5c-4d52-b1e4-ec8ec6dc4b8e',
result_delivery=ResultDelivery(
destinations=[Webhook(
webhook_id='webhook_ed981529-0160-4704-8794-e7702a8de470'
)]
),
deployment=ExistingDeployment(
deployment_id='deploy_3286bd71-9a8b-42bb-8ba6-c07549b81367'
),
),
status=JobStatus(
overview='Waiting',
message='Waiting',
update_date=1723734509417,
items=JobItemStatus(
gathered=50,
failed_gather=0,
preprocessed=0,
failed_process=0,
predicted=0,
failed_predict=0,
delivered=0,
failed_deliver=0
),
reason=None
),
create_date=1723734419985,
update_date=1723734509434
)
Attribute | Type | Description |
---|---|---|
id | str | Batch job ID as a string. |
object | str | Object type of the batch job. |
name | str | Name of the batch job. |
project_id | str | Identifier of the project associated with the batch job. |
spec | JobSpec | Configuration object of the batch job. |
status | JobStatus | Status object of the batch job. |
create_date | int | Creation timestamp of the batch job, in milliseconds. |
update_date | int | Last updated timestamp of the batch job, in milliseconds. |
Examples
Create a new batch job with a new deployment:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.create(
"job_name",
{
"dataset_id": "dataset_652ef566-af5c-4d52-b1e4-ec8ec6dc4b8e",
"webhook_id"="webhook_ed981529-0160-4704-8794-e7702a8de470",
"artifact_id": "artifact_3286bd71-9a8b-42bb-8ba6-c07549b81367",
"start_at_time": 1723734419985,
"stop_at_time": 1723993619985,
}
)
Create a new batch job with an existing deployment:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.create(
"job_name",
{
"dataset_id": "dataset_652ef566-af5c-4d52-b1e4-ec8ec6dc4b8e",
"webhook_id"="webhook_ed981529-0160-4704-8794-e7702a8de470",
"deployment_id": "deploy_3286bd71-9a8b-42bb-8ba6-c07549b81367",
"start_at_time": 1723734419985,
"stop_at_time": 1723993619985,
}
)
list
list
Lists all created batch jobs in the project.
Arguments
Name | Type | Description |
---|---|---|
pagination | dict | A dictionary containing the limit of the number of batch jobs to be returned in each page (defaults to 1000), and the page cursor for page selection (defaults to the first page) |
Return
PaginationResponse
object containing a list of Job
objects with page navigation data, with the following structure:
PaginationResponse(
next_page=None,
prev_page=None,
data=[
Job(
id='batchjob_4e5566d0-6538-441a-9fa5-f3495516646a',
object='batch_job',
name='my-batch-job-08-15',
project_id='proj_ca5fe71e7592bbcf7705ea36e4f29ed4',
spec=JobSpec(
start_at_time=1723734419985,
stop_at_time=1723993619985,
dataset_id='dataset_652ef566-af5c-4d52-b1e4-ec8ec6dc4b8e',
result_delivery=ResultDelivery(
destinations=[Webhook(
webhook_id='webhook_ed981529-0160-4704-8794-e7702a8de470'
)]
),
deployment=ExistingDeployment(
deployment_id='deploy_3286bd71-9a8b-42bb-8ba6-c07549b81367'
),
),
status=JobStatus(
overview='Reached',
message='Finished',
update_date=1723734509417,
items=JobItemStatus(
gathered=50,
failed_gather=0,
preprocessed=50,
failed_process=0,
predicted=50,
failed_predict=0,
delivered=50,
failed_deliver=0
),
reason=None
),
create_date=1723734419985,
update_date=1723734509434
)
],
)
Attribute | Type | Description |
---|---|---|
next_page | str | Page ID of the next page. |
prev_page | str | Page ID of the prev page. |
data | List[Job] | List of batch job metadata. |
Examples
- Default listing of batch jobs (shows first 1000 batch jobs):
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.list()
- View the next page of results:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
next_page = project.batch.jobs.list()["next_page"]
project.batch.jobs.list({"page": next_page})
- View the previous page of results:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
prev_page = project.batch.jobs.list({
"page": "ZjYzYmJkM2FjN2UxOTA4ZmU0ZjE0Yjk5Mg"}
)["prev_page"]
project.batch.jobs.list({"page": prev_page})
- List a specific page of jobs that returns 2 jobs on that page:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.list({
"limit": 2,
"page": "ZjYzYmJkM2FjN2UxOTA4ZmU0ZjE0Yjk5Mg"
})
get
get
Retrieves a specific batch job by ID.
Arguments
Name | Type | Description |
---|---|---|
job_id | str | The batch job ID as a string. |
Return
Job
object containing the metadata of the retrieved batch job with the following structure:
Job(
id='batchjob_4e5566d0-6538-441a-9fa5-f3495516646a',
object='batch_job',
name='my-batch-job-08-15',
project_id='proj_ca5fe71e7592bbcf7705ea36e4f29ed4',
spec=JobSpec(
start_at_time=1723734419985,
stop_at_time=1723993619985,
dataset_id='dataset_652ef566-af5c-4d52-b1e4-ec8ec6dc4b8e',
result_delivery=ResultDelivery(
destinations=[Webhook(
webhook_id='webhook_ed981529-0160-4704-8794-e7702a8de470'
)]
),
deployment=ExistingDeployment(
deployment_id='deploy_3286bd71-9a8b-42bb-8ba6-c07549b81367'
),
),
status=JobStatus(
overview='Reached',
message='Finished',
update_date=1723734509417,
items=JobItemStatus(
gathered=50,
failed_gather=0,
preprocessed=50,
failed_process=0,
predicted=50,
failed_predict=0,
delivered=50,
failed_deliver=0
),
reason=None
),
create_date=1723734419985,
update_date=1723734509434
)
Attribute | Type | Description |
---|---|---|
id | str | Batch job ID as a string. |
object | str | Object type of the batch job. |
name | str | Name of the batch job. |
project_id | str | Identifier of the project associated with the batch job. |
spec | JobSpec | Configuration object of the batch job. |
status | JobStatus | Status object of the batch job. |
create_date | int | Creation timestamp of the batch job, in milliseconds. |
update_date | int | Last updated timestamp of the batch job, in milliseconds. |
Examples
Retrieve batch job by batch job ID:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.get("batchjob_4e5566d0-6538-441a-9fa5-f3495516646a")
cancel
cancel
Cancels a scheduled or running batch job by ID.
Arguments
Name | Type | Description |
---|---|---|
job_id | str | The batch job ID as a string. |
Return
Job
object containing the metadata of the completed batch job with the following structure:
Job(
id='batchjob_4e5566d0-6538-441a-9fa5-f3495516646a',
object='batch_job',
name='my-batch-job-08-15',
project_id='proj_ca5fe71e7592bbcf7705ea36e4f29ed4',
spec=JobSpec(
start_at_time=1723734419985,
stop_at_time=1723993619985,
dataset_id='dataset_652ef566-af5c-4d52-b1e4-ec8ec6dc4b8e',
result_delivery=ResultDelivery(
destinations=[Webhook(
webhook_id='webhook_ed981529-0160-4704-8794-e7702a8de470'
)]
),
deployment=ExistingDeployment(
deployment_id='deploy_3286bd71-9a8b-42bb-8ba6-c07549b81367'
),
),
status=JobStatus(
overview='Failed',
message='Batch job cancelled',
update_date=1723734509417,
items=JobItemStatus(
gathered=50,
failed_gather=0,
preprocessed=50,
failed_process=0,
predicted=50,
failed_predict=0,
delivered=50,
failed_deliver=0
),
reason="BatchJob not completed before defined stop time due to requested cancellation at 1724083830669"
),
create_date=1723734419985,
update_date=1723734509434
)
Attribute | Type | Description |
---|---|---|
id | str | Batch job ID as a string. |
object | str | Object type of the batch job. |
name | str | Name of the batch job. |
project_id | str | Identifier of the project associated with the batch job. |
spec | JobSpec | Configuration object of the batch job. |
status | JobStatus | Status object of the batch job. |
create_date | int | Creation timestamp of the batch job, in milliseconds. |
update_date | int | Last updated timestamp of the batch job, in milliseconds. |
Examples
Cancel a batch job:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.cancel("batchjob_f7d8aec2-7e2b-4d2c-a103-c8dd575c29c7")
delete
delete
Deletes a specific batch job by ID.
Arguments
Name | Type | Description |
---|---|---|
job_id | str | The batch job ID as a string. |
Return
DeleteResponse
object that describe the deletion status of the batch job, with the following structure:
DeleteResponse(
id='batchjob_f7d8aec2-7e2b-4d2c-a103-c8dd575c29c7',
deleted=True
)
Attribute | Type | Description |
---|---|---|
id | str | The batch job ID as a string. |
deleted | bool | Whether the batch job has been successfully deleted or not. |
Examples
Delete a specified batch job:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.delete("batchjob_f7d8aec2-7e2b-4d2c-a103-c8dd575c29c7")
wait_until_done
wait_until_done
Waits for the batch job to be completed.
Arguments
Name | Type | Description |
---|---|---|
job_id | str | The batch job ID as a string. |
Return
Job
object containing the metadata of the completed batch job with the following structure:
Job(
id='batchjob_4e5566d0-6538-441a-9fa5-f3495516646a',
object='batch_job',
name='my-batch-job-08-15',
project_id='proj_ca5fe71e7592bbcf7705ea36e4f29ed4',
spec=JobSpec(
start_at_time=1723734419985,
stop_at_time=1723993619985,
dataset_id='dataset_652ef566-af5c-4d52-b1e4-ec8ec6dc4b8e',
result_delivery=ResultDelivery(
destinations=[Webhook(
webhook_id='webhook_ed981529-0160-4704-8794-e7702a8de470'
)]
),
deployment=ExistingDeployment(
deployment_id='deploy_3286bd71-9a8b-42bb-8ba6-c07549b81367'
),
),
status=JobStatus(
overview='Reached',
message='Finished',
update_date=1723734509417,
items=JobItemStatus(
gathered=50,
failed_gather=0,
preprocessed=50,
failed_process=0,
predicted=50,
failed_predict=0,
delivered=50,
failed_deliver=0
),
reason=None
),
create_date=1723734419985,
update_date=1723734509434
)
Attribute | Type | Description |
---|---|---|
id | str | Batch job ID as a string. |
object | str | Object type of the batch job. |
name | str | Name of the batch job. |
project_id | str | Identifier of the project associated with the batch job. |
spec | JobSpec | Configuration object of the batch job. |
status | JobStatus | Status object of the batch job. |
create_date | int | Creation timestamp of the batch job, in milliseconds. |
update_date | int | Last updated timestamp of the batch job, in milliseconds. |
Examples
Waits for the batch job to run inference on all dataset items and send the prediction results to the specified webhook endpoint URL:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.wait_until_done("batchjob_652ef566-af5c-4d52-b1e4-ec8ec6dc4b8e")
get_logs
get_logs
Retrieve item-level logs from a specified batch job, capped at 1000 entries. To retrieve all logs with a total entry count >1000, use get_all_logs
instead.
Arguments
Name | Type | Description |
---|---|---|
job_id | str | The batch job ID as a string. |
logs_filter | LogsFilter | Dictionary to specify conditions for logs filtering. |
Return
JobLogs
object containing batch job log entries and statuses with the following structure:
JobLogs(
id='batchjoblog_3d561cfa-9a53-4eb3-81c1-4193ae27f4fc',
object='batch_job_log',
project_id='proj_ca5fe71e7592bbcf7705ea36e4f29ed4',
entries=[
JobLogEntry(
kind='ItemPredicted',
create_time=1723080300011,
id='1e0ad7896a4aa2dd47eda727',
level='Info',
item='https://my-endpoint.url/blood_cell/36.jpg',
status='Delivered',
time_ms=9819.226,
reason=''
)
]
)
Attribute | Type | Description |
---|---|---|
id | str | Batch job ID as a string. |
object | str | Object type of the batch job. |
project_id | str | Identifier of the project associated with the batch job. |
entries | List[JobLogEntry] | List of log entries for the batch job. |
Examples
- Get all logs from a specified batch job:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.get_logs("batchjob_4e5566d0-6538-441a-9fa5-f3495516646a")
- Get all logs after a specified time:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.get_logs(
"batchjob_4e5566d0-6538-441a-9fa5-f3495516646a",
{"after_time": 1723781869354}
)
- Get the last log entry:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
project.batch.jobs.get_logs(
"batchjob_4e5566d0-6538-441a-9fa5-f3495516646a",
{"max_entries": 1}
)
get_all_logs
get_all_logs
Retrieve item-level logs from a specified batch job. Similar to get_logs
but returns a Generator to iterate through all logs if the entry count >1000.
Arguments
Name | Type | Description |
---|---|---|
job_id | str | The batch job ID as a string. |
logs_filter | LogsFilter | Dictionary to specify conditions for logs filtering. |
Return
Generator object containing batch job log entries and statuses with the following structure:
Generator(
JobLogEntry(
kind='ItemPredicted',
create_time=1723080300011,
id='1e0ad7896a4aa2dd47eda727',
level='Info',
item='https://my-endpoint.url/blood_cell/36.jpg',
status='Delivered',
time_ms=9819.226,
reason=''
)
)
Name | Type | Description |
---|---|---|
kind | str | Type of log entry. |
create_time | int | Creation timestamp of the log entry, in milliseconds. |
id | str | Log entry ID. |
level | str | Severity level of the log entry. |
item | str | Dataset item associated with the log entry. |
status | str | Status of the log entry. |
time_ms | int | Time taken for the event described by the log entry, in milliseconds. |
reason | Optional[str] | Reason for the error in the log entry, if any. Defaults to None . |
Examples
- Get all logs from a specified batch job and count the total number of logs:
from datature.nexus import Client
project = Client("5aa41e8ba........").get_project("proj_b705a........")
count = 0
for entry in project.batch.jobs.get_all_logs("batchjob_4e5566d0-6538-441a-9fa5-f3495516646a"):
print(entry)
count += 1
print(f"Total Log Count: {count}"
Updated 4 months ago