"""
Models for the REST interface
"""
import functools
import re
import warnings
from typing import Any, Dict, List, Optional, Tuple, Union
from pydantic import Field, constr, root_validator, validator
from qcelemental.util import get_base_docs
from .common_models import KeywordSet, Molecule, ObjectId, ProtoModel, KVStore
from .gridoptimization import GridOptimizationInput
from .records import ResultRecord
from .task_models import PriorityEnum, TaskRecord
from .torsiondrive import TorsionDriveInput
__all__ = [
"ComputeResponse",
"rest_model",
"QueryStr",
"QueryObjectId",
"QueryListStr",
"ResultResponse",
"CollectionSubresourceGETResponseMeta",
]
### Utility functions
__rest_models = {}
def register_model(name: str, rest: str, body: ProtoModel, response: ProtoModel) -> None:
"""
Registers a new REST model.
Parameters
----------
name : str
A regular expression describing the rest endpoint.
rest : str
The REST endpoint type.
body : ProtoModel
The REST query body model.
response : ProtoModel
The REST query response model.
"""
rest = rest.upper()
if (name in __rest_models) and (rest in __rest_models[name]):
raise KeyError(f"Model name {name} already registered.")
if name not in __rest_models:
__rest_models[name] = {}
__rest_models[name][rest] = (body, response)
@functools.lru_cache(1000, typed=True)
def rest_model(resource: str, rest: str) -> Tuple[ProtoModel, ProtoModel]:
"""
Acquires a REST Model.
Parameters
----------
resource : str
The REST endpoint resource name.
rest : str
The REST endpoint type: GET, POST, PUT, DELETE
Returns
-------
Tuple[ProtoModel, ProtoModel]
The (body, response) models of the REST request.
"""
rest = rest.upper()
matches = []
for model_re in __rest_models.keys():
if re.fullmatch(model_re, resource):
try:
matches.append(__rest_models[model_re][rest])
except KeyError:
pass # Could have different regexes for different endpoint types
if len(matches) == 0:
raise KeyError(f"REST Model for endpoint {resource} could not be found.")
if len(matches) > 1:
warnings.warn(
f"Multiple REST models were matched for {rest} request at endpoint {resource}. "
f"The following models will be used: {matches[0][0]}, {matches[0][1]}.",
RuntimeWarning,
)
return matches[0]
### Generic Types and Common Models
nullstr = constr(regex="null")
QueryStr = Optional[Union[List[str], str]]
QueryInt = Optional[Union[List[int], int]]
QueryObjectId = Optional[Union[List[ObjectId], ObjectId]]
QueryNullObjectId = Optional[Union[List[ObjectId], ObjectId, List[nullstr], nullstr]]
QueryListStr = Optional[List[str]]
[docs]class ResponsePOSTMeta(ResponseMeta):
"""
Standard Fractal Server response metadata for POST/add type requests.
"""
n_inserted: int = Field(
...,
description="The number of new objects amongst the inputs which did not exist already, and are now in the "
"database.",
)
duplicates: Union[List[str], List[Tuple[str, str]]] = Field(
...,
description="The Ids of the objects which already exist in the database amongst the set which were passed in.",
)
validation_errors: List[str] = Field(
..., description="All errors with validating submitted objects will be documented here."
)
class QueryFilter(ProtoModel):
"""
Standard Fractal Server metadata for column filtering
"""
include: QueryListStr = Field(
None,
description="Return only these columns. Expert-level object. Only one of include and exclude may be specified.",
)
exclude: QueryListStr = Field(
None,
description="Return all but these columns. Expert-level object. Only one of include and exclude may be specified.",
)
@root_validator
def check_include_or_exclude(cls, values):
include = values.get("include")
exclude = values.get("exclude")
if (include is not None) and (exclude is not None):
raise ValueError("Only one of include and exclude may be specified.")
return values
class QueryMetaFilter(QueryMeta, QueryFilter):
"""
Fractal Server metadata for Database queries allowing for filtering and pagination
"""
class ComputeResponse(ProtoModel):
"""
The response model from the Fractal Server when new Compute or Services are added.
"""
ids: List[Optional[ObjectId]] = Field(..., description="The Id's of the records to be computed.")
submitted: List[ObjectId] = Field(
..., description="The object Ids which were submitted as new entries to the database."
)
existing: List[ObjectId] = Field(..., description="The list of object Ids which already existed in the database.")
def __str__(self) -> str:
return f"ComputeResponse(nsubmitted={len(self.submitted)} nexisting={len(self.existing)})"
def __repr__(self) -> str:
return f"<{self}>"
def merge(self, other: "ComputeResponse") -> "ComputeResponse":
"""Merges two ComputeResponse objects together. The first takes precedence and order is maintained.
Parameters
----------
other : ComputeResponse
The compute response to merge
Returns
-------
ComputeResponse
The merged compute response
"""
return ComputeResponse(
ids=(self.ids + other.ids),
submitted=(self.submitted + other.submitted),
existing=(self.existing + other.existing),
)
common_docs = {
EmptyMeta: str(get_base_docs(EmptyMeta)),
ResponseMeta: str(get_base_docs(ResponseMeta)),
ResponseGETMeta: str(get_base_docs(ResponseGETMeta)),
ResponsePOSTMeta: str(get_base_docs(ResponsePOSTMeta)),
QueryMeta: str(get_base_docs(QueryMeta)),
QueryMetaFilter: str(get_base_docs(QueryMetaFilter)),
ComputeResponse: str(get_base_docs(ComputeResponse)),
}
### Information
class InformationGETBody(ProtoModel):
pass
class InformationGETResponse(ProtoModel):
class Config(ProtoModel.Config):
extra = "allow"
register_model("information", "GET", InformationGETBody, InformationGETResponse)
### KVStore
[docs]class KVStoreGETBody(ProtoModel):
class Data(ProtoModel):
id: QueryObjectId = Field(None, description="Id of the Key/Value Storage object to get.")
meta: EmptyMeta = Field({}, description=common_docs[EmptyMeta])
data: Data = Field(..., description="Data of the KV Get field: consists of Id of the Key/Value object to fetch.")
[docs]class KVStoreGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
data: Dict[str, KVStore] = Field(..., description="The entries of Key/Value object requested.")
register_model("kvstore", "GET", KVStoreGETBody, KVStoreGETResponse)
### Molecule response
[docs]class MoleculeGETBody(ProtoModel):
class Data(ProtoModel):
id: QueryObjectId = Field(None, description="Exact Id of the Molecule to fetch from the database.")
molecule_hash: QueryStr = Field(
None,
description="Hash of the Molecule to search for in the database. Can be computed from the Molecule object "
"directly without direct access to the Database itself.",
)
molecular_formula: QueryStr = Field(
None,
description="Query is made based on simple molecular formula. This is based on just the formula itself and "
"contains no connectivity information.",
)
meta: QueryMeta = Field(QueryMeta(), description=common_docs[QueryMeta])
data: Data = Field(
...,
description="Data fields for a Molecule query.", # Because Data is internal, this may not document sufficiently
)
[docs]class MoleculeGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
data: List[Molecule] = Field(..., description="The List of Molecule objects found by the query.")
register_model("molecule", "GET", MoleculeGETBody, MoleculeGETResponse)
[docs]class MoleculePOSTBody(ProtoModel):
meta: EmptyMeta = Field({}, description=common_docs[EmptyMeta])
data: List[Molecule] = Field(..., description="A list of :class:`Molecule` objects to add to the Database.")
[docs]class MoleculePOSTResponse(ProtoModel):
meta: ResponsePOSTMeta = Field(..., description=common_docs[ResponsePOSTMeta])
data: List[ObjectId] = Field(
...,
description="A list of Id's assigned to the Molecule objects passed in which serves as a unique identifier "
"in the database. If the Molecule was already in the database, then the Id returned is its "
"existing Id (entries are not duplicated).",
)
register_model("molecule", "POST", MoleculePOSTBody, MoleculePOSTResponse)
### Keywords
[docs]class KeywordGETBody(ProtoModel):
class Data(ProtoModel):
id: QueryObjectId = None
hash_index: QueryStr = None
meta: QueryMeta = Field(QueryMeta(), description=common_docs[QueryMeta])
data: Data = Field(
...,
description="The formal query for a Keyword fetch, contains ``id`` or ``hash_index`` for the object to fetch.",
)
[docs]class KeywordGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
data: List[KeywordSet] = Field(
..., description="The :class:`KeywordSet` found from in the database based on the query."
)
register_model("keyword", "GET", KeywordGETBody, KeywordGETResponse)
[docs]class KeywordPOSTBody(ProtoModel):
meta: EmptyMeta = Field(
{}, description="There is no metadata with this, so an empty metadata is sent for completion."
)
data: List[KeywordSet] = Field(..., description="The list of :class:`KeywordSet` objects to add to the database.")
[docs]class KeywordPOSTResponse(ProtoModel):
data: List[Optional[ObjectId]] = Field(
...,
description="The Ids assigned to the added :class:`KeywordSet` objects. In the event of duplicates, the Id "
"will be the one already found in the database.",
)
meta: ResponsePOSTMeta = Field(..., description=common_docs[ResponsePOSTMeta])
register_model("keyword", "POST", KeywordPOSTBody, KeywordPOSTResponse)
### Collections
[docs]class CollectionGETBody(ProtoModel):
class Data(ProtoModel):
collection: str = Field(
None, description="The specific collection to look up as its identified in the database."
)
name: str = Field(None, description="The common name of the collection to look up.")
@validator("collection")
def cast_to_lower(cls, v):
if v:
v = v.lower()
return v
meta: QueryFilter = Field(
None,
description="Additional metadata to make with the query. Collections can only have an ``include/exclude`` key in its "
"meta and therefore does not follow the standard GET metadata model.",
)
data: Data = Field(..., description="Information about the Collection to search the database with.")
[docs]class CollectionGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
data: List[Dict[str, Optional[Any]]] = Field(
..., description="The Collection objects returned by the server based on the query."
)
@validator("data")
def ensure_collection_name_in_data_get_res(cls, v):
for col in v:
if "name" not in col or "collection" not in col:
raise ValueError("Dicts in 'data' must have both 'collection' and 'name'")
return v
register_model("collection", "GET", CollectionGETBody, CollectionGETResponse)
[docs]class CollectionPOSTBody(ProtoModel):
class Meta(ProtoModel):
overwrite: bool = Field(
False,
description="The existing Collection in the database will be updated if this is True, otherwise will "
"remain unmodified if it already exists.",
)
class Data(ProtoModel):
id: str = Field(
"local", # Auto blocks overwriting in a socket
description="The Id of the object to assign in the database. If 'local', then it will not overwrite "
"existing keys. There should be very little reason to ever touch this.",
)
collection: str = Field(
..., description="The specific identifier for this Collection as it will appear in database."
)
name: str = Field(..., description="The common name of this Collection.")
class Config(ProtoModel.Config):
extra = "allow"
@validator("collection")
def cast_to_lower(cls, v):
return v.lower()
meta: Meta = Field(
Meta(),
description="Metadata to specify how the Database should handle adding this Collection if it already exists. "
"Metadata model for adding Collections can only accept ``overwrite`` as a key to choose to update "
"existing Collections or not.",
)
data: Data = Field(..., description="The data associated with this Collection to add to the database.")
[docs]class CollectionPOSTResponse(ProtoModel):
data: Union[str, None] = Field(
...,
description="The Id of the Collection uniquely pointing to it in the Database. If the Collection was not added "
"(e.g. ``overwrite=False`` for existing Collection), then a None is returned.",
)
meta: ResponsePOSTMeta = Field(..., description=common_docs[ResponsePOSTMeta])
register_model("collection", "POST", CollectionPOSTBody, CollectionPOSTResponse)
class CollectionDELETEBody(ProtoModel):
meta: EmptyMeta
class CollectionDELETEResponse(ProtoModel):
meta: ResponseMeta
register_model("collection/[0-9]+", "DELETE", CollectionDELETEBody, CollectionDELETEResponse)
### Collection views
class CollectionSubresourceGETResponseMeta(ResponseMeta):
"""
Response metadata for collection views functions.
"""
msgpacked_cols: List[str] = Field(..., description="Names of columns which were serialized to msgpack-ext.")
class CollectionEntryGETBody(ProtoModel):
class Data(ProtoModel):
subset: QueryStr = Field(
None,
description="Not implemented. " "See qcfractal.interface.collections.dataset_view.DatasetView.get_entries",
)
meta: EmptyMeta = Field(EmptyMeta(), description=common_docs[EmptyMeta])
data: Data = Field(..., description="Information about which entries to return.")
class CollectionEntryGETResponse(ProtoModel):
meta: CollectionSubresourceGETResponseMeta = Field(
..., description=str(get_base_docs(CollectionSubresourceGETResponseMeta))
)
data: Optional[bytes] = Field(..., description="Feather-serialized bytes representing a pandas DataFrame.")
register_model("collection/[0-9]+/entry", "GET", CollectionEntryGETBody, CollectionEntryGETResponse)
class CollectionMoleculeGETBody(ProtoModel):
class Data(ProtoModel):
indexes: List[int] = Field(
None,
description="List of molecule indexes to return (returned by get_entries). "
"See qcfractal.interface.collections.dataset_view.DatasetView.get_molecules",
)
meta: EmptyMeta = Field(EmptyMeta(), description=common_docs[EmptyMeta])
data: Data = Field(..., description="Information about which molecules to return.")
class CollectionMoleculeGETResponse(ProtoModel):
meta: CollectionSubresourceGETResponseMeta = Field(
..., description=str(get_base_docs(CollectionSubresourceGETResponseMeta))
)
data: Optional[bytes] = Field(..., description="Feather-serialized bytes representing a pandas DataFrame.")
register_model("collection/[0-9]+/molecule", "GET", CollectionMoleculeGETBody, CollectionMoleculeGETResponse)
class CollectionValueGETBody(ProtoModel):
class Data(ProtoModel):
class QueryData(ProtoModel):
name: str
driver: str
native: bool
queries: List[QueryData] = Field(
None,
description="List of queries to match against values columns. "
"See qcfractal.interface.collections.dataset_view.DatasetView.get_values",
)
subset: QueryStr
meta: EmptyMeta = Field(EmptyMeta(), description=common_docs[EmptyMeta])
data: Data = Field(..., description="Information about which values to return.")
class CollectionValueGETResponse(ProtoModel):
class Data(ProtoModel):
values: bytes = Field(..., description="Feather-serialized bytes representing a pandas DataFrame.")
units: Dict[str, str] = Field(..., description="Units of value columns.")
meta: CollectionSubresourceGETResponseMeta = Field(
..., description=str(get_base_docs(CollectionSubresourceGETResponseMeta))
)
data: Optional[Data] = Field(..., description="Values and units.")
register_model("collection/[0-9]+/value", "GET", CollectionValueGETBody, CollectionValueGETResponse)
class CollectionListGETBody(ProtoModel):
class Data(ProtoModel):
pass
meta: EmptyMeta = Field(EmptyMeta(), description=common_docs[EmptyMeta])
data: Data = Field(..., description="Empty for now.")
class CollectionListGETResponse(ProtoModel):
meta: CollectionSubresourceGETResponseMeta = Field(
..., description=str(get_base_docs(CollectionSubresourceGETResponseMeta))
)
data: Optional[bytes] = Field(..., description="Feather-serialized bytes representing a pandas DataFrame.")
register_model("collection/[0-9]+/list", "GET", CollectionListGETBody, CollectionListGETResponse)
### Result
[docs]class ResultGETBody(ProtoModel):
class Data(ProtoModel):
id: QueryObjectId = Field(
None,
description="The exact Id to fetch from the database. If this is set as a search condition, there is no "
"reason to set anything else as this will be unique in the database, if it exists.",
)
task_id: QueryObjectId = Field(
None,
description="The exact Id of the task which carried out this Result's computation. If this is set as a "
"search condition, there is no reason to set anything else as this will be unique in the "
"database, if it exists. See also :class:`TaskRecord`.",
)
program: QueryStr = Field(
None,
description="Results will be searched to match the quantum chemistry software which carried out the "
"calculation.",
)
molecule: QueryObjectId = Field(
None, description="Results will be searched to match the Molecule Id which was computed on."
)
driver: QueryStr = Field(
None,
description="Results will be searched to match what class of computation was done. "
"See :class:`DriverEnum` for valid choices and more information.",
)
method: QueryStr = Field(
None,
description="Results will be searched to match the quantum chemistry method executed to compute the value.",
)
basis: QueryStr = Field(
None,
description="Results will be searched to match specified basis sets which were used to compute the values.",
)
keywords: QueryNullObjectId = Field(
None,
description="Results will be searched based on which :class:`KeywordSet` was used to run the computation.",
)
status: QueryStr = Field(
"COMPLETE",
description="Results will be searched based on where they are in the compute pipeline. See the "
":class:`RecordStatusEnum` for valid statuses and more information.",
)
@validator("keywords", each_item=True, pre=True)
def validate_keywords(cls, v):
if v is None:
v = "null"
return v
@validator("basis", each_item=True, pre=True)
def validate_basis(cls, v):
if (v is None) or (v == ""):
v = "null"
return v
meta: QueryMetaFilter = Field(QueryMetaFilter(), description=common_docs[QueryMetaFilter])
data: Data = Field(
..., description="The keys with data to search the database on for individual quantum chemistry computations."
)
[docs]class ResultGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
# Either a record or dict depending if projection
data: Union[List[ResultRecord], List[Dict[str, Any]]] = Field(
...,
description="Results found from the query. This is a list of :class:`ResultRecord` in most cases, however, "
"if a projection was specified in the GET request, then a dict is returned with mappings based "
"on the projection.",
)
@validator("data", pre=True)
def ensure_list_of_dict(cls, v):
if isinstance(v, dict):
return [v]
return v
register_model("result", "GET", ResultGETBody, ResultGETResponse)
### Wavefunction data
class WavefunctionStoreGETBody(ProtoModel):
class Data(ProtoModel):
id: ObjectId = Field(None, description="Id of the Wavefunction Key/Value Storage object to get.")
meta: QueryMetaFilter = Field(QueryMetaFilter(), description=common_docs[QueryMetaFilter])
data: Data = Field(
...,
description="Data of the Wavefunction Get field: consists of a ObjectId of the Wavefunction object to fetch.",
)
class WavefunctionStoreGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
data: Dict[str, Any] = Field(..., description="The entries of the Wavefunction object requested.")
register_model("wavefunctionstore", "GET", WavefunctionStoreGETBody, WavefunctionStoreGETResponse)
### Procedures
[docs]class ProcedureGETBody(ProtoModel):
class Data(ProtoModel):
id: QueryObjectId = Field(
None,
description="The exact Id to fetch from the database. If this is set as a search condition, there is no "
"reason to set anything else as this will be unique in the database, if it exists.",
)
task_id: QueryObjectId = Field(
None,
description="The exact Id of a task which is carried out by this Procedure. If this is set as a "
"search condition, there is no reason to set anything else as this will be unique in the "
"database, if it exists. See also :class:`TaskRecord`.",
)
procedure: QueryStr = Field(None, description="Procedures will be searched based on the name of the procedure.")
program: QueryStr = Field(
None,
description="Procedures will be searched based on the program which is the main manager of the procedure",
)
hash_index: QueryStr = Field(
None,
description="Procedures will be searched based on a hash of the defined procedure. This is something which "
"can be generated by the Procedure spec itself and does not require server access to compute. "
"This should be unique in the database so there should be no reason to set anything else "
"if this is set as a query.",
)
status: QueryStr = Field(
"COMPLETE",
description="Procedures will be searched based on where they are in the compute pipeline. See the "
":class:`RecordStatusEnum` for valid statuses.",
)
meta: QueryMetaFilter = Field(QueryMetaFilter(), description=common_docs[QueryMetaFilter])
data: Data = Field(..., description="The keys with data to search the database on for Procedures.")
[docs]class ProcedureGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
data: List[Dict[str, Optional[Any]]] = Field(
..., description="The list of Procedure specs found based on the query."
)
register_model("procedure", "GET", ProcedureGETBody, ProcedureGETResponse)
### Task Queue
[docs]class TaskQueueGETBody(ProtoModel):
class Data(ProtoModel):
id: QueryObjectId = Field(
None,
description="The exact Id to fetch from the database. If this is set as a search condition, there is no "
"reason to set anything else as this will be unique in the database, if it exists.",
)
hash_index: QueryStr = Field(
None,
description="Tasks will be searched based on a hash of the defined Task. This is something which can "
"be generated by the Task spec itself and does not require server access to compute. "
"This should be unique in the database so there should be no reason to set anything else "
"if this is set as a query.",
)
program: QueryStr = Field(
None, description="Tasks will be searched based on the program responsible for executing this task."
)
status: QueryStr = Field(
None,
description="Tasks will be search based on where they are in the compute pipeline. See the "
":class:`RecordStatusEnum` for valid statuses.",
)
base_result: QueryStr = Field(
None,
description="The exact Id of the Result which this Task is linked to. If this is set as a "
"search condition, there is no reason to set anything else as this will be unique in the "
"database, if it exists. See also :class:`ResultRecord`.",
)
tag: QueryStr = Field(None, description="Tasks will be searched based on their associated tag.")
manager: QueryStr = Field(
None, description="Tasks will be searched based on the manager responsible for executing the task."
)
meta: QueryMetaFilter = Field(QueryMetaFilter(), description=common_docs[QueryMetaFilter])
data: Data = Field(..., description="The keys with data to search the database on for Tasks.")
[docs]class TaskQueueGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
data: Union[List[TaskRecord], List[Dict[str, Any]]] = Field(
...,
description="Tasks found from the query. This is a list of :class:`TaskRecord` in most cases, however, "
"if a projection was specified in the GET request, then a dict is returned with mappings based "
"on the projection.",
)
register_model("task_queue", "GET", TaskQueueGETBody, TaskQueueGETResponse)
[docs]class TaskQueuePOSTBody(ProtoModel):
class Meta(ProtoModel):
procedure: str = Field(..., description="Name of the procedure which the Task will execute.")
program: str = Field(..., description="The program which this Task will execute.")
tag: Optional[str] = Field(
None,
description="Tag to assign to this Task so that Queue Managers can pull only Tasks based on this entry."
"If no Tag is specified, any Queue Manager can pull this Task.",
)
priority: Union[PriorityEnum, None] = Field(None, description=str(PriorityEnum.__doc__))
class Config(ProtoModel.Config):
extra = "allow"
@validator("priority", pre=True)
def munge_priority(cls, v):
if isinstance(v, str):
v = PriorityEnum[v.upper()]
return v
meta: Meta = Field(..., description="The additional specification information for the Task to add to the Database.")
data: List[Union[ObjectId, Molecule]] = Field(
...,
description="The list of either Molecule objects or Molecule Id's (those already in the database) to submit as "
"part of this Task.",
)
[docs]class TaskQueuePOSTResponse(ProtoModel):
meta: ResponsePOSTMeta = Field(..., description=common_docs[ResponsePOSTMeta])
data: ComputeResponse = Field(..., description="Data returned from the server from adding a Task.")
register_model("task_queue", "POST", TaskQueuePOSTBody, TaskQueuePOSTResponse)
[docs]class TaskQueuePUTBody(ProtoModel):
class Data(ProtoModel):
id: QueryObjectId = Field(
None,
description="The exact Id to target in database. If this is set as a search condition, there is no "
"reason to set anything else as this will be unique in the database, if it exists.",
)
base_result: QueryObjectId = Field( # TODO: Validate this description is correct
None,
description="The exact Id of a result which this Task is slated to write to. If this is set as a "
"search condition, there is no reason to set anything else as this will be unique in the "
"database, if it exists. See also :class:`ResultRecord`.",
)
new_tag: Optional[str] = Field(
None,
description="Change the tag of an existing or regenerated task to be this value",
)
new_priority: Union[str, int, None] = Field(
None,
description="Change the priority of an existing or regenerated task to this value",
)
class Meta(ProtoModel):
operation: str = Field(..., description="The specific action you are taking as part of this update.")
@validator("operation")
def cast_to_lower(cls, v):
return v.lower()
meta: Meta = Field(..., description="The instructions to pass to the target Task from ``data``.")
data: Data = Field(..., description="The information which contains the Task target in the database.")
[docs]class TaskQueuePUTResponse(ProtoModel):
class Data(ProtoModel):
n_updated: int = Field(..., description="The number of tasks which were changed.")
meta: ResponseMeta = Field(..., description=common_docs[ResponseMeta])
data: Data = Field(..., description="Information returned from attempting updates of Tasks.")
register_model("task_queue", "PUT", TaskQueuePUTBody, TaskQueuePUTResponse)
### Service Queue
[docs]class ServiceQueueGETBody(ProtoModel):
class Data(ProtoModel):
id: QueryObjectId = Field(
None,
description="The exact Id to fetch from the database. If this is set as a search condition, there is no "
"reason to set anything else as this will be unique in the database, if it exists.",
)
procedure_id: QueryObjectId = Field( # TODO: Validate this description is correct
None,
description="The exact Id of the Procedure this Service is responsible for executing. If this is set as a "
"search condition, there is no reason to set anything else as this will be unique in the "
"database, if it exists.",
)
hash_index: QueryStr = Field(
None,
description="Services are searched based on a hash of the defined Service. This is something which can "
"be generated by the Service spec itself and does not require server access to compute. "
"This should be unique in the database so there should be no reason to set anything else "
"if this is set as a query.",
)
status: QueryStr = Field(
None,
description="Services are searched based on where they are in the compute pipeline. See the "
":class:`RecordStatusEnum` for valid statuses.",
)
meta: QueryMeta = Field(QueryMeta(), description=common_docs[QueryMeta])
data: Data = Field(..., description="The keys with data to search the database on for Services.")
[docs]class ServiceQueueGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
data: List[Dict[str, Optional[Any]]] = Field(
..., description="The return of Services found in the database mapping their Ids to the Service spec."
)
register_model("service_queue", "GET", ServiceQueueGETBody, ServiceQueueGETResponse)
[docs]class ServiceQueuePOSTBody(ProtoModel):
class Meta(ProtoModel):
tag: Optional[str] = Field(
None,
description="Tag to assign to the Tasks this Service will generate so that Queue Managers can pull only "
"Tasks based on this entry. If no Tag is specified, any Queue Manager can pull this Tasks "
"created by this Service.",
)
priority: Union[str, int, None] = Field(
None,
description="Priority given to this Tasks created by this Service. Higher priority will be pulled first.",
)
meta: Meta = Field(
...,
description="Metadata information for the Service for the Tag and Priority of Tasks this Service will create.",
)
data: List[Union[TorsionDriveInput, GridOptimizationInput]] = Field(
..., description="A list the specification for Procedures this Service will manage and generate Tasks for."
)
[docs]class ServiceQueuePOSTResponse(ProtoModel):
meta: ResponsePOSTMeta = Field(..., description=common_docs[ResponsePOSTMeta])
data: ComputeResponse = Field(..., description="Data returned from the server from adding a Service.")
register_model("service_queue", "POST", ServiceQueuePOSTBody, ServiceQueuePOSTResponse)
[docs]class ServiceQueuePUTBody(ProtoModel):
class Data(ProtoModel):
id: QueryObjectId = Field(None, description="The Id of the Service.")
procedure_id: QueryObjectId = Field(None, description="The Id of the Procedure that the Service is linked to.")
class Meta(ProtoModel):
operation: str = Field(..., description="The update action to perform.")
@validator("operation")
def cast_to_lower(cls, v):
return v.lower()
meta: Meta = Field(..., description="The instructions to pass to the targeted Service.")
data: Data = Field(..., description="The information which contains the Service target in the database.")
[docs]class ServiceQueuePUTResponse(ProtoModel):
class Data(ProtoModel):
n_updated: int = Field(..., description="The number of services which were changed.")
meta: ResponseMeta = Field(..., description=common_docs[ResponseMeta])
data: Data = Field(..., description="Information returned from attempting updates of Services.")
register_model("service_queue", "PUT", ServiceQueuePUTBody, ServiceQueuePUTResponse)
### Queue Manager
# Add the new QueueManagerMeta to the docs
common_docs[QueueManagerMeta] = str(get_base_docs(QueueManagerMeta))
[docs]class QueueManagerGETBody(ProtoModel):
class Data(ProtoModel):
limit: int = Field(..., description="Max number of Queue Managers to get from the server.")
meta: QueueManagerMeta = Field(..., description=common_docs[QueueManagerMeta])
data: Data = Field(
...,
description="A model of Task request data for the Queue Manager to fetch. Accepts ``limit`` as the maximum "
"number of tasks to pull.",
)
[docs]class QueueManagerGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
data: List[Dict[str, Optional[Any]]] = Field(
..., description="A list of tasks retrieved from the server to compute."
)
register_model("queue_manager", "GET", QueueManagerGETBody, QueueManagerGETResponse)
[docs]class QueueManagerPOSTBody(ProtoModel):
meta: QueueManagerMeta = Field(..., description=common_docs[QueueManagerMeta])
data: Dict[ObjectId, Any] = Field(..., description="A Dictionary of tasks to return to the server.")
[docs]class QueueManagerPOSTResponse(ProtoModel):
meta: ResponsePOSTMeta = Field(..., description=common_docs[ResponsePOSTMeta])
data: bool = Field(..., description="A True/False return on if the server accepted the returned tasks.")
register_model("queue_manager", "POST", QueueManagerPOSTBody, QueueManagerPOSTResponse)
[docs]class QueueManagerPUTBody(ProtoModel):
class Data(ProtoModel):
operation: str
configuration: Optional[Dict[str, Any]] = None
meta: QueueManagerMeta = Field(..., description=common_docs[QueueManagerMeta])
data: Data = Field(
...,
description="The update action which the Queue Manager requests the Server take with respect to how the "
"Queue Manager is tracked.",
)
[docs]class QueueManagerPUTResponse(ProtoModel):
meta: Dict[str, Any] = Field({}, description=common_docs[EmptyMeta])
# Order on Union[] is important. Union[bool, Dict[str, int]] -> True if the input dict is not empty since
# Python can resolve dict -> bool since it passes a `is` test. Will not cast bool -> dict[str, int], so make Dict[]
# check first
data: Union[Dict[str, int], bool] = Field(
...,
description="The response from the Server attempting to update the Queue Manager's server-side status. "
"Response type is a function of the operation made from the PUT request.",
)
register_model("queue_manager", "PUT", QueueManagerPUTBody, QueueManagerPUTResponse)
## advanced procedures queries
class OptimizationFinalResultBody(ProtoModel):
class Data(ProtoModel):
optimization_ids: QueryObjectId = Field(
None, description="List of optimization procedure Ids to fetch their final results from the database."
)
# TODO: not yet supported
meta: QueryMetaFilter = Field(QueryMetaFilter(), description=common_docs[QueryMetaFilter])
data: Data = Field(..., description="The keys with data to search the database on for Procedures.")
class OptimizationAllResultBody(ProtoModel):
class Data(ProtoModel):
optimization_ids: QueryObjectId = Field(
None, description="List of optimization procedure Ids to fetch their ALL their results from the database."
)
# TODO: not yet supported
meta: QueryMetaFilter = Field(QueryMetaFilter(), description=common_docs[QueryMetaFilter])
data: Data = Field(..., description="The keys with data to search the database on for Procedures.")
class OptimizationInitialMoleculeBody(ProtoModel):
class Data(ProtoModel):
optimization_ids: QueryObjectId = Field(
None, description="List of optimization procedure Ids to fetch their initial molecules from the database."
)
# TODO: not yet supported
meta: QueryMetaFilter = Field(QueryMetaFilter(), description=common_docs[QueryMetaFilter])
data: Data = Field(..., description="The keys with data to search the database on for Procedures.")
class OptimizationFinalMoleculeBody(ProtoModel):
class Data(ProtoModel):
optimization_ids: QueryObjectId = Field(
None, description="List of optimization procedure Ids to fetch their final molecules from the database."
)
# TODO: not yet supported
meta: QueryMetaFilter = Field(QueryMetaFilter(), description=common_docs[QueryMetaFilter])
data: Data = Field(..., description="The keys with data to search the database on for Procedures.")
class ResultResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
# Either a record or dict depending if projection
data: Union[Dict[str, ResultRecord], Dict[str, Any]] = Field(
..., description="A List of Results found from the query per optimization id."
)
class ListResultResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
# Either a record or dict depending if projection
data: Union[Dict[str, List[ResultRecord]], Dict[str, Any]] = Field(
..., description="A List of Results found from the query per optimization id."
)
class ListMoleculeResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
# Either a record or dict depending if projection
data: Union[Dict[str, Molecule], Dict[str, Any]] = Field(
..., description="A List of Molecules found from the query per optimization id."
)
register_model(r"optimization/final_result", "GET", OptimizationFinalResultBody, ResultResponse)
register_model(r"optimization/all_results", "GET", OptimizationAllResultBody, ListResultResponse)
register_model(r"optimization/initial_molecule", "GET", OptimizationAllResultBody, ListMoleculeResponse)
register_model(r"optimization/final_molecule", "GET", OptimizationAllResultBody, ListMoleculeResponse)
class ManagerInfoGETBody(ProtoModel):
class Data(ProtoModel):
name: QueryStr = Field(None, description="Name(s) of managers to query for.")
status: QueryStr = Field(
None,
description="Managers will be searched based on status. See :class:`ManagerStatusEnum` for valid statuses.",
)
meta: QueryMeta = Field(QueryMeta(), description=common_docs[QueryMeta])
data: Data = Field(..., description="The keys with data to search the database on for Managers.")
class ManagerInfoGETResponse(ProtoModel):
meta: ResponseGETMeta = Field(..., description=common_docs[ResponseGETMeta])
data: List[Dict[str, Any]] = Field(..., description="Information about the requested managers")
register_model(r"manager", "GET", ManagerInfoGETBody, ManagerInfoGETResponse)