Source code for qcportal.internal_jobs.models

import time
from datetime import datetime, timedelta
from enum import Enum
from typing import Any

from dateutil.parser import parse as date_parser
from pydantic import BaseModel, ConfigDict, field_validator, PrivateAttr
from tqdm import tqdm

from qcportal.base_models import QueryProjModelBase
from qcportal.utils import seconds_to_hms
from ..base_models import QueryIteratorBase


[docs] class InternalJobStatusEnum(str, Enum): """ The state of a record object. The states which are available are a finite set. """ complete = "complete" waiting = "waiting" running = "running" error = "error" cancelled = "cancelled" @classmethod def _missing_(cls, name): """Attempts to find the correct status in a case-insensitive way If a string being converted to an InternalJobStatusEnum is missing, then this function will convert the case and try to find the appropriate status. """ name = name.lower() # Search this way rather than doing 'in' since we are comparing # a string to an enum for status in cls: if name == status: return status
[docs] class InternalJob(BaseModel): model_config = ConfigDict(extra="forbid") id: int name: str status: InternalJobStatusEnum added_date: datetime scheduled_date: datetime started_date: datetime | None last_updated: datetime | None ended_date: datetime | None runner_hostname: str | None runner_uuid: str | None repeat_delay: int | None serial_group: str | None progress: int progress_description: str | None = None function: str kwargs: dict[str, Any] after_function: str | None after_function_kwargs: dict[str, Any] | None result: Any user: str | None _client: Any = PrivateAttr(None) _refresh_url: str | None = PrivateAttr(None) def __init__(self, client=None, refresh_url=None, **kwargs): BaseModel.__init__(self, **kwargs) self._client = client self._refresh_url = refresh_url
[docs] def refresh(self): """ Updates the data of this object with information from the server """ if self._client is None: raise RuntimeError("Client is not set") if self._refresh_url is None: server_data = self._client.get_internal_job(self.id) else: server_data = self._client.make_request("get", self._refresh_url, InternalJob) for k, v in server_data: setattr(self, k, v)
[docs] def watch(self, interval: float = 2.0, timeout: float | None = None): """ Watch an internal job for completion Will poll every `interval` seconds until the job is finished (complete, error, cancelled, etc). Parameters ---------- interval Time (in seconds) between polls on the server timeout Max amount of time (in seconds) to wait. If None, will wait forever. Returns ------- """ if self.status not in [InternalJobStatusEnum.waiting, InternalJobStatusEnum.running]: return begin_time = time.time() end_time = None if timeout is not None: end_time = begin_time + timeout pbar = tqdm(initial=self.progress, total=100, desc=self.progress_description) while True: t = time.time() self.refresh() pbar.update(self.progress - pbar.n) pbar.set_description(self.progress_description) if end_time is not None and t >= end_time: raise TimeoutError("Timed out waiting for job to complete") if self.status == InternalJobStatusEnum.error: print("Internal job resulted in an error:") print(self.result) break elif self.status not in [InternalJobStatusEnum.waiting, InternalJobStatusEnum.running]: print(f"Internal job final status: {self.status.value}") break curtime = time.time() if end_time is not None: # sleep the normal interval, or up to the timeout time time.sleep(min(interval, end_time - curtime + 0.1)) else: time.sleep(interval)
@property def duration(self) -> timedelta: if self.started_date is None: return timedelta() if self.ended_date is None: return datetime.now() - self.started_date return self.ended_date - self.started_date @property def duration_str(self) -> str: return seconds_to_hms(self.duration.total_seconds())
[docs] class InternalJobQueryFilters(QueryProjModelBase): job_id: list[int] | None = None name: list[str] | None = None user: list[int | str] | None = None runner_hostname: list[str] | None = None status: list[InternalJobStatusEnum] | None = None last_updated_before: datetime | None = None last_updated_after: datetime | None = None added_before: datetime | None = None added_after: datetime | None = None scheduled_before: datetime | None = None scheduled_after: datetime | None = None
[docs] @field_validator( "last_updated_before", "last_updated_after", "added_before", "added_after", "scheduled_before", "scheduled_after", mode="before", ) @classmethod def parse_dates(cls, v): if isinstance(v, str): return date_parser(v) return v
[docs] class InternalJobQueryIterator(QueryIteratorBase[InternalJob]): """ Iterator for internal job queries This iterator transparently handles batching and pagination over the results of an internal job query. """ def __init__(self, client, query_filters: InternalJobQueryFilters): """ Construct an iterator Parameters ---------- client QCPortal client object used to contact/retrieve data from the server query_filters The actual query information to send to the server """ batch_limit = client.api_limits["get_internal_jobs"] // 4 QueryIteratorBase.__init__(self, client, query_filters, batch_limit) def _request(self) -> list[InternalJob]: ij_dicts = self._client.make_request( "post", "api/v1/internal_jobs/query", list[dict[str, Any]], body=self._query_filters, ) return [InternalJob(client=self, **ij_dict) for ij_dict in ij_dicts]