Queue backend abstraction manager.

import json
import logging
import sched
import socket
import time
import uuid
from typing import Any, Callable, Dict, List, Optional, Union

from pydantic import BaseModel, validator

import qcengine as qcng
from qcfractal.extras import get_information

from import get_molecule
from .adapters import build_queue_adapter
from .compress import compress_results

__all__ = ["QueueManager"]

class QueueStatistics(BaseModel):
    Queue Manager Job statistics

    # Dynamic quantities
    total_successful_tasks: int = 0
    total_failed_tasks: int = 0
    total_worker_walltime: float = 0.0
    total_task_walltime: float = 0.0
    maximum_possible_walltime: float = 0.0  # maximum_workers * time_delta, experimental
    active_task_slots: int = 0

    # Static Quantities
    max_concurrent_tasks: int = 0
    cores_per_task: int = 0
    memory_per_task: float = 0.0
    last_update_time: float = None

    def __init__(self, **kwargs):
        if kwargs.get("last_update_time", None) is None:
            kwargs["last_update_time"] = time.time()

    def total_completed_tasks(self) -> int:
        return self.total_successful_tasks + self.total_failed_tasks

    def theoretical_max_consumption(self) -> float:
        """In Core Hours"""
        return self.max_concurrent_tasks * self.cores_per_task * (time.time() - self.last_update_time) / 3600

    def active_cores(self) -> int:
        return self.active_task_slots * self.cores_per_task

    def active_memory(self) -> float:
        return self.active_task_slots * self.memory_per_task

    @validator("cores_per_task", pre=True)
    def cores_per_tasks_none(cls, v):
        if v is None:
            v = 1
        return v

[docs]class QueueManager: """ This object maintains a computational queue and watches for finished tasks for different queue backends. Finished tasks are added to the database and removed from the queue. Attributes ---------- client : FractalClient A FractalClient connected to a server. queue_adapter : QueueAdapter The DBAdapter class for queue abstraction errors : dict A dictionary of current errors logger : logging.logger. Optional, Default: None A logger for the QueueManager """ def __init__( self, client: "FractalClient", queue_client: "BaseAdapter", logger: Optional[logging.Logger] = None, max_tasks: int = 200, queue_tag: Optional[Union[str, List[str]]] = None, manager_name: str = "unlabeled", update_frequency: Union[int, float] = 2, verbose: bool = True, server_error_retries: Optional[int] = 1, stale_update_limit: Optional[int] = 10, cores_per_task: Optional[int] = None, memory_per_task: Optional[float] = None, nodes_per_task: Optional[int] = None, cores_per_rank: Optional[int] = 1, scratch_directory: Optional[str] = None, retries: Optional[int] = 2, configuration: Optional[Dict[str, Any]] = None, ): """ Parameters ---------- client : FractalClient A FractalClient connected to a server queue_client : BaseAdapter The DBAdapter class for queue abstraction logger : Optional[logging.Logger], optional A logger for the QueueManager max_tasks : int, optional The maximum number of tasks to hold at any given time queue_tag : str, optional Allows managers to pull from specific tags manager_name : str, optional The cluster the manager belongs to update_frequency : Union[int, float], optional The frequency to check for new tasks in seconds verbose : bool, optional Whether or not to have the manager be verbose (logger level debug and up) server_error_retries : Optional[int], optional How many times finished jobs are attempted to be pushed to the server in in the event of a server communication error. After number of attempts, the failed jobs are dropped from this manager and considered "stale" Set to `None` to keep retrying stale_update_limit : Optional[int], optional Number of stale update attempts to keep around If this limit is ever hit, the server initiates as shutdown as best it can since communication with the server has gone wrong too many times. Set to `None` for unlimited cores_per_task : Optional[int], optional How many CPU cores per computation task to allocate for QCEngine None indicates "use however many you can detect" memory_per_task : Optional[float], optional How much memory, in GiB, per computation task to allocate for QCEngine None indicates "use however much you can consume" nodes_per_task : Optional[int], optional How many nodes to use per task. Used only for node-parallel tasks cores_per_rank: Optional[int], optional How many CPUs per rank of an MPI application. Used only for node-parallel tasks scratch_directory : Optional[str], optional Scratch directory location to do QCEngine compute None indicates "wherever the system default is"' retries : Optional[int], optional Number of retries that QCEngine will attempt for RandomErrors detected when running its computations. After this many attempts (or on any other type of error), the error will be raised. configuration : Optional[Dict[str, Any]], optional A JSON description of the settings used to create this object for the database. """ # Setup logging if logger: self.logger = logger else: self.logger = logging.getLogger("QueueManager") self.name_data = {"cluster": manager_name, "hostname": socket.gethostname(), "uuid": str(uuid.uuid4())} self._name = self.name_data["cluster"] + "-" + self.name_data["hostname"] + "-" + self.name_data["uuid"] self.client = client self.cores_per_task = cores_per_task self.memory_per_task = memory_per_task self.nodes_per_task = nodes_per_task or 1 self.scratch_directory = scratch_directory self.retries = retries self.cores_per_rank = cores_per_rank self.configuration = configuration self.queue_adapter = build_queue_adapter( queue_client, logger=self.logger, cores_per_task=self.cores_per_task, memory_per_task=self.memory_per_task, nodes_per_task=self.nodes_per_task, scratch_directory=self.scratch_directory, cores_per_rank=self.cores_per_rank, retries=self.retries, verbose=verbose, ) self.max_tasks = max_tasks self.queue_tag = queue_tag self.verbose = verbose self.statistics = QueueStatistics( max_concurrent_tasks=self.max_tasks, cores_per_task=(cores_per_task or 0), memory_per_task=(memory_per_task or 0), update_frequency=update_frequency, ) self.scheduler = None self.update_frequency = update_frequency self.periodic = {} = 0 self.exit_callbacks = [] # Server response/stale job handling self.server_error_retries = server_error_retries self.stale_update_limit = stale_update_limit self._stale_updates_tracked = 0 self._stale_payload_tracking = [] self.n_stale_jobs = 0 # QCEngine data self.available_programs = qcng.list_available_programs() self.available_procedures = qcng.list_available_procedures() # Display a warning if there are non-node-parallel programs and >1 node_per_task if self.nodes_per_task > 1: for name in self.available_programs: program = qcng.get_program(name) if not program.node_parallel: self.logger.warning( "Program {} is not node parallel," " but manager will use >1 node per task".format(name) ) # Print out configuration"QueueManager:")" Version: {}\n".format(get_information("version"))) if self.verbose:" Name Information:")" Cluster: {}".format(self.name_data["cluster"]))" Hostname: {}".format(self.name_data["hostname"]))" UUID: {}\n".format(self.name_data["uuid"]))" Queue Adapter:")" {}\n".format(self.queue_adapter)) if self.verbose:" QCEngine:")" Version: {}".format(qcng.__version__))" Task Cores: {}".format(self.cores_per_task))" Task Mem: {}".format(self.memory_per_task))" Task Nodes: {}".format(self.nodes_per_task))" Cores per Rank: {}".format(self.cores_per_rank))" Scratch Dir: {}".format(self.scratch_directory))" Programs: {}".format(self.available_programs))" Procedures: {}\n".format(self.available_procedures)) # DGAS Note: Note super happy about how this if/else turned out. Looking for alternatives. if self.connected(): # Pull server info self.server_info = client.server_information() self.server_name = self.server_info["name"] self.server_version = self.server_info["version"] self.server_query_limit = self.server_info["query_limit"] if self.max_tasks > self.server_query_limit: self.max_tasks = self.server_query_limit self.logger.warning( "Max tasks was larger than server query limit of {}, reducing to match query limit.".format( self.server_query_limit ) ) self.heartbeat_frequency = self.server_info["heartbeat_frequency"] # Tell the server we are up and running payload = self._payload_template() payload["data"]["operation"] = "startup" payload["data"]["configuration"] = self.configuration self.client._automodel_request("queue_manager", "put", payload) if self.verbose:" Connected:")" Version: {}".format(self.server_version))" Address: {}".format(self.client.address))" Name: {}".format(self.server_name))" Queue tag: {}".format(self.queue_tag))" Username: {}\n".format(self.client.username)) else:" QCFractal server information:")" Not connected, some actions will not be available") def _payload_template(self): meta = { **self.name_data.copy(), # Version info "qcengine_version": qcng.__version__, "manager_version": get_information("version"), # User info "username": self.client.username, # Pull info "programs": self.available_programs, "procedures": self.available_procedures, "tag": self.queue_tag, # Statistics "total_worker_walltime": self.statistics.total_worker_walltime, "total_task_walltime": self.statistics.total_task_walltime, "active_tasks": self.statistics.active_task_slots, "active_cores": self.statistics.active_cores, "active_memory": self.statistics.active_memory, } return {"meta": meta, "data": {}} ## Accessors
[docs] def name(self) -> str: """ Returns the Managers full name. """ return self._name
[docs] def connected(self) -> bool: """ Checks the connection to the server. """ return self.client is not None
[docs] def assert_connected(self) -> None: """ Raises an error for functions that require a server connection. """ if self.connected() is False: raise AttributeError("Manager is not connected to a server, this operations is not available.")
## Start/stop functionality
[docs] def start(self) -> None: """ Starts up all IOLoops and processes. """ self.assert_connected() self.scheduler = sched.scheduler(time.time, time.sleep) heartbeat_time = int(0.4 * self.heartbeat_frequency) def scheduler_update(): self.update() self.scheduler.enter(self.update_frequency, 1, scheduler_update) def scheduler_heartbeat(): self.heartbeat() self.scheduler.enter(heartbeat_time, 1, scheduler_heartbeat)"QueueManager successfully started.\n") self.scheduler.enter(0, 1, scheduler_update) self.scheduler.enter(0, 2, scheduler_heartbeat)
[docs] def stop(self, signame="Not provided", signum=None, stack=None) -> None: """ Shuts down all IOLoops and periodic updates. """"QueueManager received shutdown signal: {}.\n".format(signame)) # Cancel all events if self.scheduler is not None: for event in self.scheduler.queue: self.scheduler.cancel(event) # Push data back to the server self.shutdown() # Close down the adapter self.close_adapter() # Call exit callbacks for func, args, kwargs in self.exit_callbacks: func(*args, **kwargs)"QueueManager stopping gracefully.\n")
[docs] def close_adapter(self) -> bool: """ Closes down the underlying adapter. """ return self.queue_adapter.close()
## Queue Manager functions
[docs] def heartbeat(self) -> None: """ Provides a heartbeat to the connected Server. """ self.assert_connected() payload = self._payload_template() payload["data"]["operation"] = "heartbeat" try: self.client._automodel_request("queue_manager", "put", payload) self.logger.debug("Heartbeat was successful.") except IOError: self.logger.warning("Heartbeat was not successful.")
[docs] def shutdown(self) -> Dict[str, Any]: """ Shutdown the manager and returns tasks to queue. """ self.assert_connected() self.update(new_tasks=False, allow_shutdown=False) payload = self._payload_template() payload["data"]["operation"] = "shutdown" try: response = self.client._automodel_request("queue_manager", "put", payload, timeout=5) response["success"] = True shutdown_string = "Shutdown was successful, {} tasks returned to master queue." except IOError: # TODO something as we didnt successfully add the data self.logger.warning("Shutdown was not successful. This may delay queued tasks.") response = {"nshutdown": 0, "success": False} shutdown_string = "Shutdown was not successful, {} tasks not returned." nshutdown = response["nshutdown"] if self.n_stale_jobs: shutdown_string = shutdown_string.format( f"{min(0, nshutdown-self.n_stale_jobs)} active and {nshutdown} stale" ) else: shutdown_string = shutdown_string.format(nshutdown) response["info"] = shutdown_string return response
[docs] def add_exit_callback(self, callback: Callable, *args: List[Any], **kwargs: Dict[Any, Any]) -> None: """Adds additional callbacks to perform when closing down the server. Parameters ---------- callback : callable The function to call at exit *args Arguments to call with the function. **kwargs Kwargs to call with the function. """ self.exit_callbacks.append((callback, args, kwargs))
def _post_update(self, payload_data, allow_shutdown=True): """Internal function to post payload update""" payload = self._payload_template() # Update with data payload["data"] = payload_data try: self.client._automodel_request("queue_manager", "post", payload, full_return=True) except IOError: # Trapped behavior elsewhere raise except Exception as fatal: # Non IOError, something has gone very wrong self.logger.error( "An error was detected which was not an expected requests-type error. The manager " "will attempt shutdown as best it can. Please report this error to the QCFractal " "developers as this block should not be " "seen outside of debugging modes. Error is as follows\n{}".format(fatal) ) try: if allow_shutdown: self.shutdown() finally: raise fatal def _update_stale_jobs(self, allow_shutdown=True): """ Attempt to post the previous payload failures """ clear_indices = [] for index, (results, attempts) in enumerate(self._stale_payload_tracking): try: self._post_update(results)"Successfully pushed jobs from {attempts+1} updates ago")"Tasks pushed: " + str(list(results.keys()))) clear_indices.append(index) except IOError: # Tried and failed attempts += 1 # Case: Still within the retry limit if self.server_error_retries is None or self.server_error_retries > attempts: self._stale_payload_tracking[index][-1] = attempts self.logger.warning(f"Could not post jobs from {attempts} updates ago, will retry on next update.") # Case: Over limit else: self.logger.warning( f"Could not post jobs from {attempts} ago and over attempt limit, marking " f"jobs as stale." ) self.n_stale_jobs += len(results) clear_indices.append(index) self._stale_updates_tracked += 1 # Cleanup clear indices for index in clear_indices[::-1]: self._stale_payload_tracking.pop(index) # Check stale limiters if ( self.stale_update_limit is not None and (len(self._stale_payload_tracking) + self._stale_updates_tracked) > self.stale_update_limit ): self.logger.error("Exceeded number of stale updates allowed! Attempting to shutdown gracefully...") # Log all not-quite stale jobs to stale for (results, _) in self._stale_payload_tracking: self.n_stale_jobs += len(results) try: if allow_shutdown: self.shutdown() finally: raise RuntimeError("Exceeded number of stale updates allowed!")
[docs] def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: """Examines the queue for completed tasks and adds successful completions to the database while unsuccessful are logged for future inspection. Parameters ---------- new_tasks: bool, optional, Default: True Try to get new tasks from the server allow_shutdown: bool, optional, Default: True Allow function to attempt graceful shutdowns in the case of stale job or fatal error limits. Does not prevent errors from being raise, but mostly used to prevent infinite loops when update is called from `shutdown` itself """ self.assert_connected() self._update_stale_jobs(allow_shutdown=allow_shutdown) results = self.queue_adapter.acquire_complete() # Compress the stdout/stderr/error outputs results = compress_results(results) # Stats fetching for running tasks, as close to the time we got the jobs as we can last_time = self.statistics.last_update_time now = self.statistics.last_update_time = time.time() time_delta_seconds = now - last_time try: self.statistics.active_task_slots = self.queue_adapter.count_active_task_slots() log_efficiency = True except NotImplementedError: log_efficiency = False timedelta_worker_walltime = time_delta_seconds * self.statistics.active_cores / 3600 timedelta_maximum_walltime = ( time_delta_seconds * self.statistics.max_concurrent_tasks * self.statistics.cores_per_task / 3600 ) self.statistics.total_worker_walltime += timedelta_worker_walltime self.statistics.maximum_possible_walltime += timedelta_maximum_walltime # Process jobs n_success = 0 n_fail = 0 n_result = len(results) task_cpu_hours = 0 error_payload = [] if n_result: # For logging failure_messages = {} try: self._post_update(results, allow_shutdown=allow_shutdown) task_status = {k: "sent" for k in results.keys()} except IOError: if self.server_error_retries is None or self.server_error_retries > 0: self.logger.warning("Post complete tasks was not successful. Attempting again on next update.") self._stale_payload_tracking.append([results, 0]) task_status = {k: "deferred" for k in results.keys()} else: self.logger.warning("Post complete tasks was not successful. Data may be lost.") self.n_stale_jobs += len(results) task_status = {k: "unknown_error" for k in results.keys()} -= n_result for key, result in results.items(): wall_time_seconds = 0 if result.success: n_success += 1 if hasattr(result.provenance, "wall_time"): wall_time_seconds = float(result.provenance.wall_time) task_status[key] += " / success" else: task_status[key] += f" / failed: {result.error.error_type}" failure_messages[key] = result.error # Try to get the wall time in the most fault-tolerant way try: wall_time_seconds = float(result.input_data.get("provenance", {}).get("wall_time", 0)) except AttributeError: # Trap the result.input_data is None, but let other attribute errors go if result.input_data is None: wall_time_seconds = 0 else: raise except TypeError: # Trap wall time corruption, e.g. float(None) # Other Result corruptions will raise an error correctly wall_time_seconds = 0 task_cpu_hours += wall_time_seconds * self.statistics.cores_per_task / 3600 n_fail = n_result - n_success # Now print out all the info"Processed {len(results)} tasks: {n_success} succeeded / {n_fail} failed).")"Task ids, submission status, calculation status below") for task_id, status_msg in task_status.items():" Task {task_id} : {status_msg}") if n_fail:"The following tasks failed with the errors:") for task_id, error_info in failure_messages.items():"Error message for task id {task_id}")" Error type: " + str(error_info.error_type))" Backtrace: \n" + str(error_info.error_message)) open_slots = max(0, self.max_tasks - # Crunch Statistics self.statistics.total_failed_tasks += n_fail self.statistics.total_successful_tasks += n_success self.statistics.total_task_walltime += task_cpu_hours na_format = "" float_format = ",.2f" if self.statistics.total_completed_tasks == 0: task_stats_str = "Task statistics unavailable until first tasks return" worker_stats_str = None else: success_rate = self.statistics.total_successful_tasks / self.statistics.total_completed_tasks * 100 success_format = float_format task_stats_str = ( f"Task Stats: Processed={self.statistics.total_completed_tasks}, " f"Failed={self.statistics.total_failed_tasks}, " f"Success={success_rate:{success_format}}%" ) worker_stats_str = ( f"Worker Stats (est.): Core Hours Used={self.statistics.total_worker_walltime:{float_format}}" ) # Handle efficiency calculations if log_efficiency: # Efficiency calculated as: # sum_task(task_wall_time * nthread / task) # ------------------------------------------------------------- if self.statistics.total_task_walltime == 0 or self.statistics.maximum_possible_walltime == 0: efficiency_of_running = "(N/A yet)" efficiency_of_potential = "(N/A yet)" efficiency_format = na_format else: efficiency_of_running = ( self.statistics.total_task_walltime / self.statistics.total_worker_walltime * 100 ) efficiency_of_potential = ( self.statistics.total_worker_walltime / self.statistics.maximum_possible_walltime * 100 ) efficiency_format = float_format worker_stats_str += f", Core Usage Efficiency: {efficiency_of_running:{efficiency_format}}%" if self.verbose: worker_stats_str += ( f", Core Usage vs. Max Resources Requested: " f"{efficiency_of_potential:{efficiency_format}}%" ) if worker_stats_str is not None: if (new_tasks is False) or (open_slots == 0): return True # Get new tasks payload = self._payload_template() payload["data"]["limit"] = open_slots try: new_tasks = self.client._automodel_request("queue_manager", "get", payload) except IOError: # TODO something as we didnt successfully get data self.logger.warning("Acquisition of new tasks was not successful.") return False"Acquired {} new tasks.".format(len(new_tasks))) # Add new tasks to queue self.queue_adapter.submit_tasks(new_tasks) += len(new_tasks) return True
[docs] def await_results(self) -> bool: """A synchronous method for testing or small launches that awaits task completion. Returns ------- bool Return True if the operation completed successfully """ self.assert_connected() self.update() self.queue_adapter.await_results() self.update(new_tasks=False) return True
[docs] def list_current_tasks(self) -> List[Any]: """Provides a list of tasks currently in the queue along with the associated keys. Returns ------- ret : list of tuples All tasks currently still in the database """ return self.queue_adapter.list_tasks()
[docs] def test(self, n=1) -> bool: """ Tests all known programs with simple inputs to check if the Adapter is correctly instantiated. """ from qcfractal import testing"Testing requested, generating tasks") task_base = json.dumps( { "spec": { "function": "qcengine.compute", "args": [ { "molecule": get_molecule("hooh.json").dict(encoding="json"), "driver": "energy", "model": {}, "keywords": {}, }, "program", ], "kwargs": {}, }, "parser": "single", } ) programs = { "rdkit": {"method": "UFF", "basis": None}, "torchani": {"method": "ANI1", "basis": None}, "psi4": {"method": "HF", "basis": "sto-3g"}, } tasks = [] found_programs = [] for program, model in programs.items(): if testing.has_module(program):"Found program {}, adding to testing queue.".format(program)) else: self.logger.warning("Could not find program {}, skipping tests.".format(program)) continue for x in range(n): task = json.loads(task_base) program_id = program + str(x) task["id"] = program_id task["spec"]["args"][0]["model"] = model task["spec"]["args"][0]["keywords"] = {"e_convergence": (x * 1.0e-6 + 1.0e-6)} task["spec"]["args"][1] = program tasks.append(task) found_programs.append(program_id) self.queue_adapter.submit_tasks(tasks)"Testing tasks submitting, awaiting results.\n") self.queue_adapter.await_results() results = self.queue_adapter.acquire_complete()"Testing results acquired.") missing_programs = results.keys() - set(found_programs) if len(missing_programs): self.logger.error("Not all tasks were retrieved, missing programs {}.".format(missing_programs)) raise ValueError("Testing failed, not all tasks were retrieved.") else:"All tasks retrieved successfully.") failures = 0 fail_report = {} for k, result in results.items(): if result.success:" {} - PASSED".format(k)) else: self.logger.error(" {} - FAILED!".format(k)) failed_program = "Return Mangled!" # This should almost never be seen, but is in place as a fallback for program in programs.keys(): if program in k: failed_program = program break if failed_program not in fail_report: fail_report[failed_program] = ( f"On test {k}:" f"\nException Type: {result.error.error_type}" f"\nException Message: {result.error.error_message}" ) failures += 1 if failures: self.logger.error("{}/{} tasks failed!".format(failures, len(results))) self.logger.error( f"A sample error from each program to help:\n" + "\n".join([e for e in fail_report.values()]) ) return False else:"All tasks completed successfully!") return True