from typing import Optional, List
import multiprocessing
import itertools
import pathlib
import logging
import tqdm
from pydantic import Field # , validator, root_validator
import numpy as np
import scipy.linalg
from . import base, molecule, charge, qm, grid, resp
from .charge import MoleculeChargeConstraints
from .resp import RespCharges
from .orientation import Orientation
from .constraint import ESPSurfaceConstraintMatrix
from .utils import require_package
logger = logging.getLogger(__name__)
[docs]class Job(base.Model):
"""Class to manage RESP jobs. It is expected that
all RESP calculations will be run through this class.
"""
molecules: List[molecule.Molecule] = Field(
default_factory=list,
description="Molecules to use for the RESP job"
)
qm_optimization_options: qm.QMGeometryOptimizationOptions = Field(
default=qm.QMGeometryOptimizationOptions(),
description="QM options for geometry optimization"
)
qm_esp_options: qm.QMEnergyOptions = Field(
default=qm.QMEnergyOptions(),
description="QM options for ESP computation"
)
grid_options: grid.GridOptions = Field(
default=grid.GridOptions(),
description="Options for generating grid for ESP computation"
)
resp_options: resp.RespOptions = Field(
default=resp.RespOptions(),
description="Options for fitting ESP for charges"
)
charge_constraints: charge.ChargeConstraintOptions = Field(
default=charge.ChargeConstraintOptions(),
description="Charge constraints"
)
working_directory: pathlib.Path = Field(
default=pathlib.Path("psiresp_working_directory"),
description="Working directory for saving intermediate files"
)
defer_errors: bool = Field(
default=False,
description=("Whether to raise an error immediately, "
"or gather all errors during ESP computation "
"and raise at the end")
)
temperature: float = Field(
default=298.15,
description="Temperature (in Kelvin) to use when Boltzmann-weighting conformers."
)
stage_1_charges: Optional[RespCharges] = Field(
default=None,
description="Stage 1 charges. These need to be computed by calling `run()` or `compute_charges()` directly."
)
stage_2_charges: Optional[RespCharges] = Field(
default=None,
description="Stage 2 charges. These need to be computed by calling `run()` or `compute_charges()` directly."
)
n_processes: Optional[int] = Field(
default=None,
description=("Number of processes to use in multiprocessing "
"during ESP computation. `n_processes=None` uses "
"the number of CPUs.")
)
[docs] @classmethod
def from_smiles(cls, smiles: str, order_by_map_number: bool = False, **kwargs):
mol = molecule.Molecule.from_smiles(smiles, order_by_map_number=order_by_map_number)
return cls(molecules=[mol], **kwargs)
@property
def charges(self):
if self.stage_2_charges is None:
try:
return self.stage_1_charges.charges
except AttributeError:
return self.stage_1_charges
return self.stage_2_charges.charges
@property
def conformers(self):
return list(self.iter_conformers())
@property
def n_conformers(self):
return sum(mol.n_conformers for mol in self.molecules)
@property
def n_orientations(self):
return sum(mol.n_orientations for mol in self.molecules)
@property
def orientations(self):
return list(self.iter_orientations())
[docs] def iter_orientations(self):
for mol in self.molecules:
for conf in mol.conformers:
yield from conf.orientations
[docs] def generate_orientations(self):
"""Generate orientations for every conformer of every molecule"""
for mol in tqdm.tqdm(self.molecules, desc="generating-orientations"):
clear = not mol.keep_original_orientation
mol.generate_orientations(clear_existing_orientations=clear)
[docs] def optimize_geometries(self, client=None, **kwargs):
"""Compute optimized geometries"""
conformers = [conformer
for mol in self.molecules
for conformer in mol.conformers
if mol.optimize_geometry and not conformer.is_optimized]
if not conformers:
return
qcmols = [conf.qcmol for conf in conformers]
results = self.qm_optimization_options.run(client=client,
qcmols=qcmols,
working_directory=self.working_directory,
**kwargs)
for conf, geometry in zip(conformers, results):
conf.set_optimized_geometry(geometry)
[docs] def compute_orientation_energies(self, client=None, **kwargs):
"""Compute wavefunction for each orientation"""
orientations = [orientation
for mol in self.molecules
for conformer in mol.conformers
for orientation in conformer.orientations
if orientation.qc_wavefunction is None]
qcmols = [o.qcmol for o in orientations]
results = self.qm_esp_options.run(client=client,
qcmols=qcmols,
working_directory=self.working_directory,
**kwargs)
for orient, wfn in zip(orientations, results):
orient.qc_wavefunction = wfn
[docs] def compute_esps(self):
"""Compute ESP on a grid for each orientation in a multiprocessing pool"""
orientations = [orientation
for mol in self.molecules
for conformer in mol.conformers
for orientation in conformer.orientations
if orientation.esp is None]
tqorientations = tqdm.tqdm(orientations,
desc="compute-esp")
# create functions for multiprocessing mapping
computer = self._try_compute_esp if self.defer_errors else self._compute_esp
with multiprocessing.Pool(processes=self.n_processes) as pool:
results = pool.map(computer, tqorientations)
# raise errors if any occurred
errors = [r for r in results if not isinstance(r, Orientation)]
if errors:
raise ValueError(*errors)
for orientation, o2 in zip(orientations, results):
# TODO: fix this, it's clumsy
orientation.esp = o2.esp
orientation.grid = o2.grid
def _compute_esp(self, orientation):
"""Compute the grid and ESP for an orientation with the job's grid options"""
require_package("psi4")
if orientation.grid is None:
orientation.compute_grid(grid_options=self.grid_options)
orientation.compute_esp()
assert orientation.esp is not None
return orientation
def _try_compute_esp(self, orientation):
"""Wrap ESP computation in a try/except to defer errors"""
require_package("psi4")
try:
return self._compute_esp(orientation)
except BaseException as e:
return str(e)
[docs] def run(self, client=None, update_molecules: bool = True) -> np.ndarray:
"""Run the whole job"""
# die early on failure to import
# we can't decorate the function because pickle is sad
require_package("psi4")
self.generate_conformers()
self.optimize_geometries(client=client)
self.generate_orientations()
return self.compute_esps_and_charges(client=client, update_molecules=update_molecules)
[docs] def compute_esps_and_charges(self, client=None, update_molecules: bool = True) -> np.ndarray:
require_package("psi4")
self.compute_orientation_energies(client=client)
self.compute_esps()
self.compute_charges(update_molecules=update_molecules)
return self.charges
[docs] def construct_surface_constraint_matrix(self) -> ESPSurfaceConstraintMatrix:
"""
Construct the constraint matrix for each atom,
as generated by the ESP at each grid point
"""
if not self.charge_constraints.split_conformers:
matrices = [
ESPSurfaceConstraintMatrix.from_orientations(
orientations=[o for conf in mol.conformers for o in conf.orientations],
temperature=self.temperature,
)
for mol in self.molecules
]
charges = [mol.charge for mol in self.molecules]
else:
matrices = [
ESPSurfaceConstraintMatrix.from_orientations(
orientations=[o for o in conf.orientations],
temperature=self.temperature,
)
for mol in self.molecules
for conf in mol.conformers
]
charges = [
chg
for mol in self.molecules
for chg in [mol.charge] * mol.n_conformers
]
a_mol = scipy.linalg.block_diag(*[mat.coefficient_matrix for mat in matrices])
a_row = scipy.linalg.block_diag(*[np.ones(mat.constant_vector.shape[0]) for mat in matrices])
a_zeros = np.zeros((a_row.shape[0], a_row.shape[0]))
a_block = np.bmat([[a_mol, a_row.T], [a_row, a_zeros]])
b_vecs = [mat.constant_vector for mat in matrices] + [charges]
if len(b_vecs):
b_block = np.concatenate(b_vecs)
else:
b_block = np.zeros((0, 0))
return ESPSurfaceConstraintMatrix.from_coefficient_matrix(a_block, b_block)
[docs] def generate_molecule_charge_constraints(self) -> MoleculeChargeConstraints:
"""
Gather the charge constraints pertaining to the molecules
in this RESP job
"""
return MoleculeChargeConstraints.from_charge_constraints(self.charge_constraints,
molecules=self.molecules)
[docs] def compute_charges(self, update_molecules=True) -> np.ndarray:
"""
Compute the charges for each molecule. Each Orientation must have had
the ESP computed, and there must be at least one orientation present.
"""
surface_constraints = self.construct_surface_constraint_matrix()
stage_1_constraints = self.generate_molecule_charge_constraints()
if self.resp_options.stage_2:
stage_2_constraints = stage_1_constraints.copy(deep=True)
stage_2_constraints.constrain_methyl_hydrogens_between_conformers = True
stage_1_constraints.prepare_stage_1_constraints()
else:
stage_1_constraints.constrain_methyl_hydrogens_between_conformers = True
self.stage_1_charges = RespCharges(charge_constraints=stage_1_constraints,
surface_constraints=surface_constraints,
restraint_height=self.resp_options.restraint_height_stage_1,
**self.resp_options._base_kwargs)
self.stage_1_charges.solve()
if self.resp_options.stage_2:
stage_2_constraints.prepare_stage_2_constraints()
stage_2_constraints.add_constraints_from_charges(self.stage_1_charges._charges)
self.stage_2_charges = RespCharges(charge_constraints=stage_2_constraints,
surface_constraints=surface_constraints,
restraint_height=self.resp_options.restraint_height_stage_2,
**self.resp_options._base_kwargs)
self.stage_2_charges.solve()
if update_molecules:
self.update_molecule_charges()
return self.charges
[docs] def update_molecule_charges(self):
"""
Update the molecules in the job with the calculated charges
"""
all_charges = {}
default = [None] * len(self.molecules)
for stage, restraint_ in itertools.product([1, 2], ["un", ""]):
restraint = f"{restraint_}restrained"
all_charges[(stage, restraint)] = default
charges = getattr(self, f"stage_{stage}_charges")
if charges is not None:
rcharges = getattr(charges, f"{restraint}_charges")
if rcharges is not None:
all_charges[(stage, restraint)] = rcharges
for (stage, restraint), charges in all_charges.items():
for mol, ch in zip(self.molecules, charges):
setattr(mol, f"stage_{stage}_{restraint}_charges", ch)