Source code for qcfractal.interface.collections.torsiondrive_dataset

"""
QCPortal Database ODM
"""
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union

import pandas as pd

from ..models import ObjectId, OptimizationSpecification, ProtoModel, QCSpecification, TorsionDriveInput
from ..models.torsiondrive import TDKeywords
from ..visualization import custom_plot
from .collection import BaseProcedureDataset
from .collection_utils import register_collection

if TYPE_CHECKING:  # pragma: no cover
    from ..models import Molecule


class TDEntry(ProtoModel):
    """Data model for the `reactions` list in Dataset"""

    name: str
    initial_molecules: Set[ObjectId]
    td_keywords: TDKeywords
    attributes: Dict[str, Any]
    object_map: Dict[str, ObjectId] = {}


class TDEntrySpecification(ProtoModel):
    name: str
    description: Optional[str]
    optimization_spec: OptimizationSpecification
    qc_spec: QCSpecification


[docs]class TorsionDriveDataset(BaseProcedureDataset):
[docs] class DataModel(BaseProcedureDataset.DataModel): records: Dict[str, TDEntry] = {} history: Set[str] = set() specs: Dict[str, TDEntrySpecification] = {}
[docs] class Config(BaseProcedureDataset.DataModel.Config): pass
def _internal_compute_add(self, spec: Any, entry: Any, tag: str, priority: str) -> ObjectId: service = TorsionDriveInput( initial_molecule=entry.initial_molecules, keywords=entry.td_keywords, optimization_spec=spec.optimization_spec, qc_spec=spec.qc_spec, ) return self.client.add_service([service], tag=tag, priority=priority).ids[0]
[docs] def add_specification( self, name: str, optimization_spec: OptimizationSpecification, qc_spec: QCSpecification, description: Optional[str] = None, overwrite: bool = False, ) -> None: """ Parameters ---------- name : str The name of the specification optimization_spec : OptimizationSpecification A full optimization specification for TorsionDrive qc_spec : QCSpecification A full quantum chemistry specification for TorsionDrive description : str, optional A short text description of the specification overwrite : bool, optional Overwrite existing specification names """ spec = TDEntrySpecification( name=name, optimization_spec=optimization_spec, qc_spec=qc_spec, description=description ) return self._add_specification(name, spec, overwrite=overwrite)
[docs] def add_entry( self, name: str, initial_molecules: List["Molecule"], dihedrals: List[Tuple[int, int, int, int]], grid_spacing: List[int], dihedral_ranges: Optional[List[Tuple[int, int]]] = None, energy_decrease_thresh: Optional[float] = None, energy_upper_limit: Optional[float] = None, attributes: Dict[str, Any] = None, save: bool = True, ) -> None: """ Parameters ---------- name : str The name of the entry, will be used for the index initial_molecules : List[Molecule] The list of starting Molecules for the TorsionDrive dihedrals : List[Tuple[int, int, int, int]] A list of dihedrals to scan over grid_spacing : List[int] The grid spacing for each dihedrals dihedral_ranges: Optional[List[Tuple[int, int]]] The range limit of each dihedrals to scan, within [-180, 360] energy_decrease_thresh: Optional[float] The threshold of energy decrease to trigger activating grid points energy_upper_limit: Optional[float] The upper limit of energy relative to current global minimum to trigger activating grid points attributes : Dict[str, Any], optional Additional attributes and descriptions for the entry save : bool, optional If true, saves the collection after adding the entry. If this is False be careful to call save after all entries are added, otherwise data pointers may be lost. """ self._check_entry_exists(name) # Fast skip if attributes is None: attributes = {} # Build new objects molecule_ids = self.client.add_molecules(initial_molecules) td_keywords = TDKeywords( dihedrals=dihedrals, grid_spacing=grid_spacing, dihedral_ranges=dihedral_ranges, energy_decrease_thresh=energy_decrease_thresh, energy_upper_limit=energy_upper_limit, ) entry = TDEntry(name=name, initial_molecules=molecule_ids, td_keywords=td_keywords, attributes=attributes) self._add_entry(name, entry, save)
[docs] def counts( self, entries: Union[str, List[str]], specs: Optional[Union[str, List[str]]] = None, count_gradients: bool = False, ) -> pd.DataFrame: """Counts the number of optimization or gradient evaluations associated with the TorsionDrives. Parameters ---------- entries : Union[str, List[str]] The entries to query for specs : Optional[Union[str, List[str]]], optional The specifications to query for count_gradients : bool, optional If True, counts the total number of gradient calls. Warning! This can be slow for large datasets. Returns ------- DataFrame The queried counts. """ if isinstance(specs, str): specs = [specs] if isinstance(entries, str): entries = [entries] # Query all of the specs and make sure they are valid if specs is None: specs = list(self.df.columns) else: new_specs = [] for spec in specs: new_specs.append(self.query(spec)) # Remap names specs = new_specs # Count functions def count_gradient_evals(td): if td.status != "COMPLETE": return None total_grads = 0 for key, optimizations in td.get_history().items(): for opt in optimizations: total_grads += len(opt.trajectory) return total_grads def count_optimizations(td): if td.status != "COMPLETE": return None return sum(len(v) for v in td.optimization_history.values()) # Loop over the data and apply the count function ret = [] for col in specs: data = self.df[col] if entries: data = data[entries] if count_gradients: cnts = data.apply(lambda td: count_gradient_evals(td)) else: cnts = data.apply(lambda td: count_optimizations(td)) ret.append(cnts) ret = pd.DataFrame(ret).transpose() ret.dropna(inplace=True, how="all") # ret = pd.DataFrame([ret[x].astype(int) for x in ret.columns]).transpose() return ret
[docs] def visualize( self, entries: Union[str, List[str]], specs: Union[str, List[str]], relative: bool = True, units: str = "kcal / mol", digits: int = 3, use_measured_angle: bool = False, return_figure: Optional[bool] = None, ) -> "plotly.Figure": """ Parameters ---------- entries : Union[str, List[str]] A single or list of indices to plot. specs : Union[str, List[str]] A single or list of specifications to plot. relative : bool, optional Shows relative energy, lowest energy per scan is zero. units : str, optional The units of the plot. digits : int, optional Rounds the energies to n decimal places for display. use_measured_angle : bool, optional If True, the measured final angle instead of the constrained optimization angle. Can provide more accurate results if the optimization was ill-behaved, but pulls additional data from the server and may take longer. return_figure : Optional[bool], optional If True, return the raw plotly figure. If False, returns a hosted iPlot. If None, return a iPlot display in Jupyter notebook and a raw plotly figure in all other circumstances. Returns ------- plotly.Figure The requested figure. """ show_spec = True if isinstance(specs, str): specs = [specs] show_spec = False if isinstance(entries, str): entries = [entries] # Query all of the specs and make sure they are valid formatted_spec_names = [] for spec in specs: formatted_spec_names.append(self.query(spec)) traces = [] ranges = [] # Loop over specifications for spec in formatted_spec_names: # Loop over indices (groups colors by entry) for index in entries: # Plot the figure using the torsiondrives plotting function fig = self.df.loc[index, spec].visualize( relative=relative, units=units, digits=digits, use_measured_angle=use_measured_angle, return_figure=True, ) ranges.append(fig.layout.xaxis.range) trace = fig.data[0] # Pull out the underlying scatterplot if show_spec: trace.name = f"{index}-{spec}" else: trace.name = f"{index}" traces.append(trace) title = "TorsionDriveDataset 1-D Plot" if show_spec is False: title += f" [spec={formatted_spec_names[0]}]" if relative: ylabel = f"Relative Energy [{units}]" else: ylabel = f"Absolute Energy [{units}]" custom_layout = { "title": title, "yaxis": {"title": ylabel, "zeroline": True}, "xaxis": { "title": "Dihedral Angle [degrees]", "zeroline": False, "range": [min(x[0] for x in ranges), max(x[1] for x in ranges)], }, } return custom_plot(traces, custom_layout, return_figure=return_figure)
register_collection(TorsionDriveDataset)