Source code for aiida_castep.workflows.relax

"""
Module for Relaxation WorkChain

CHANGELOG:

- 0.1.0: Added ``bypass`` option to skip convergence checking.
  The user is still responsible to setting the correct inputs.
"""

from __future__ import absolute_import

from aiida.engine import WorkChain, while_, ToContext, append_
from aiida.common import AttributeDict
import aiida.orm as orm
from aiida.orm.nodes.data.base import to_aiida_type

from aiida.common.exceptions import NotExistent, MultipleObjectsError
from aiida_castep.common import INPUT_LINKNAMES as IN_LINKS
from aiida_castep.common import OUTPUT_LINKNAMES as OUT_LINKS
from aiida_castep.calculations.helper import CastepHelper
from aiida_castep.calculations.tools import flat_input_param_validator
from .base import CastepBaseWorkChain

# pylint: disable=protected-access,no-member,import-outside-toplevel

__version__ = '0.1.0'


[docs]class CastepRelaxWorkChain(WorkChain): """ WorkChain to relax structures. Restart the relaxation calculation until the structure is fully relaxed. Each CASTEP relaxation may finish without error with not fully relaxed structure if the number of iteration is exceeded (*geom_max_iter*). This workchain try to restart such calculations (wrapped in CastepBaseWorkChain) until the structure is fully relaxed ``relax_options`` is a Dict of the options avaliable fields are: - restart_mode: mode of restart, choose from ``reuse`` (default), ``structure``, ``continuation``. - bypass: Bypass relaxation control - e.g. no checking of the convergence. Can be used for doing singlepoint calculation. """ _max_meta_iterations = 10
[docs] @classmethod def define(cls, spec): """Define this workchain""" # Apply superclass specifications super(CastepRelaxWorkChain, cls).define(spec) spec.expose_inputs(CastepBaseWorkChain, namespace='base', exclude=('calc', )) spec.expose_inputs(CastepBaseWorkChain._calculation_class, namespace='calc', exclude=['structure']) spec.input( 'clean_workdir', valid_type=orm.Bool, serializer=to_aiida_type, required=False, help= ('Wether to clean the workdir of the calculations at the end of the workchain. ' 'The default is not performing any cleaning.')) spec.input('calc.parameters', valid_type=orm.Dict, serializer=to_aiida_type, help='Input parameters, flat format is allowed.', validator=flat_input_param_validator) spec.input('structure', valid_type=orm.StructureData, help='Structure to be used for relaxation.', required=True) spec.input('relax_options', valid_type=orm.Dict, serializer=to_aiida_type, required=False, help='Options for relaxation.') spec.expose_outputs(CastepBaseWorkChain, exclude=['output_structure']) spec.output('output_structure', valid_type=orm.StructureData, required=False, help='The relaxed structure.') spec.outline( cls.setup, while_(cls.should_run_relax)(cls.run_relax, cls.inspect_relax), cls.result, cls.finalize, ) spec.exit_code(101, 'ERROR_SUB_PROCESS_FAILED_RELAX', 'Subprocess lauched has failed in the relax stage') spec.exit_code( 102, 'ERROR_CONVERGE_NOT_REACHED', 'Geometry optimisation is not converged but the maximum iteration is exceeded.' )
[docs] def setup(self): """Initialize internal parameters""" self.ctx.iteration = 0 self.ctx.is_converged = 0 self.ctx.current_cell_volume = None self.ctx.current_structure = self.inputs.structure # A dictionary used to update the default inputs self.ctx.calc_update = {} # Update to the calc namespace self.ctx.base_update = {} # Update to the baes namespace self.ctx.inputs = AttributeDict(self.inputs) relax_options = self.inputs.get('relax_options', None) if relax_options is None: relax_options = {} else: relax_options = self.inputs.relax_options.get_dict() self.ctx.max_meta_iterations = relax_options.pop( 'max_meta_iterations', self._max_meta_iterations) restart_mode = relax_options.pop('restart_mode', 'reuse') assert restart_mode in [ "reuse", "continuation", "structure" ], "Invalid restart mode: {}".format(restart_mode) self.ctx.bypass_relax = relax_options.pop('bypass', False) self.ctx.restart_mode = restart_mode self.ctx.relax_options = relax_options
[docs] def should_run_relax(self): """Decide whether another iteration should be run""" return not self.ctx.is_converged and self.ctx.iteration < self.ctx.max_meta_iterations
[docs] def run_relax(self): """Run the relaxation""" self.ctx.iteration += 1 link_label = 'iteration_{}'.format(self.ctx.iteration) # Assemble the inputs inputs = AttributeDict( self.exposed_inputs(CastepBaseWorkChain, namespace='base', agglomerate=False)) inputs.calc = AttributeDict( self.exposed_inputs(CastepBaseWorkChain._calculation_class, namespace='calc')) inputs.calc.structure = self.ctx.current_structure # Update the inputs inputs.calc.update(self.ctx.calc_update) inputs.update(self.ctx.base_update) # In case metadata is not defined at all if 'metadata' in inputs: inputs.metadata = AttributeDict(inputs['metadata']) inputs.metadata['call_link_label'] = link_label if 'label' not in inputs.metadata: inputs.metadata['label'] = self.inputs.metadata.get( 'label', '') else: inputs['metadata'] = { 'call_link_label': link_label, 'label': self.inputs.metadata.get('label', '') } running = self.submit(CastepBaseWorkChain, **inputs) self.report('launching CastepBaseWorkChain<{}> Iteration #{}'.format( running.pk, self.ctx.iteration)) return ToContext(workchains=append_(running))
[docs] def inspect_relax(self): """ Inspet the relaxation results, check if convergence is reached. """ if self.ctx.get('bypass_relax', False): self.report("Bypass mode, convergence checking skipped") self.ctx.is_converged = True return None workchain = self.ctx.workchains[-1] if not workchain.is_finished_ok: self.report( 'Relaxation CastepBaseWorkChain failed with exit status {}'. format(workchain.exit_status)) return self.exit_codes.ERROR_SUB_PROCESS_FAILED_RELAX try: structure = workchain.outputs.output_structure except AttributeError: self.report( 'Relaxation CastepBaseWorkChain finished but no output structure found' ) return self.exit_codes.ERROR_SUB_PROCESS_FAILED_RELAX # Check if the geometric convergence has reached output_parameters = workchain.outputs.output_parameters.get_dict() if output_parameters.get('geom_unconverged') is True: # Assign restart mode and folder if self.ctx.restart_mode in ['reuse', 'continuation']: # The input link to the Base WorkChain determine to mode of continuation # Parameters are updated automatically self.ctx.base_update['{}_folder'.format( self.ctx.restart_mode)] = workchain.outputs.remote_folder # Unless we use the continuation mode, the structure should be set to the output structure if self.ctx.restart_mode != 'continuation': self.ctx.current_structure = structure self._push_parameters(workchain) self.report( 'Relaxation CastepBaseWorkChain finished but not converged') else: self.ctx.is_converged = True self.report('Geometry optimisation is converged') return None
[docs] def result(self): """Attach the output parameters and structure of the last workchain to the outputs.""" if self.ctx.is_converged: self.report('workchain completed after {} iterations'.format( self.ctx.iteration)) exit_code = None else: self.report( 'maximum number of meta iterations exceeded but relaxation is not converged' ) exit_code = self.exit_codes.ERROR_CONVERGE_NOT_REACHED workchain = self.ctx.workchains[-1] if 'output_structure' in workchain.outputs: structure = workchain.outputs.output_structure self.out('output_structure', structure) self.out_many(self.exposed_outputs(workchain, CastepBaseWorkChain)) return exit_code
[docs] def finalize(self): """ Finalize the workchain. Clean the remote working directories of the called calcjobs """ # Check if we are cleaning the working directory clean_workdir = self.inputs.get('clean_workdir', None) if clean_workdir is not None: clean_workdir = clean_workdir.value if not clean_workdir: return # Proceed with cleaning the calculations try: cleaned_calcs = [] qbd = orm.QueryBuilder() qbd.append(orm.WorkChainNode, filters={'id': self.node.pk}) qbd.append(orm.WorkChainNode, filters={ 'id': { 'in': [node.pk for node in self.ctx.workchains] } }) qbd.append(orm.CalcJobNode) # Find the CalcJobs to clean if qbd.count() > 0: calcjobs = [tmp[0] for tmp in qbd.all()] else: self.report('Cannot found called CalcJobNodes to clean.') return # Clean the remote directories one by one for calculation in calcjobs: try: calculation.outputs.remote_folder._clean() # pylint: disable=protected-access cleaned_calcs.append(calculation.pk) except BaseException: pass if cleaned_calcs: self.report( 'cleaned remote folders of calculations: {}'.format( ' '.join(map(str, cleaned_calcs)))) # pylint: disable=not-callable except BaseException as exception: self.report( 'Exception occurred during the cleaning of the remote contents: {}' .format(exception.args))
[docs] def _push_parameters(self, workchain): """ Push the parameters for completed calculation to the current inputs """ query = orm.QueryBuilder() query.append(orm.WorkChainNode, filters={'id': workchain.pk}, tag='work') query.append(orm.Dict, with_incoming='work', tag='output_dict', edge_filters={'label': OUT_LINKS['results']}) query.append(orm.CalcJobNode, with_outgoing='output_dict', filters={'attributes.exit_status': 0}, tag='final_calc') query.append(orm.Dict, with_outgoing='final_calc', edge_filters={'label': IN_LINKS['parameters']}, project=['attributes']) try: last_param = query.one()[0] except (MultipleObjectsError, NotExistent): self.report( 'Cannot found the input node for the last Calculation called in BaseWorkChain' ) return self.exit_codes.ERROR_SUB_PROCESS_FAILED_RELAX # Compare with the input parameters of this one helper = CastepHelper() orig_in_param = self.inputs.calc[IN_LINKS['parameters']].get_dict() orig_in_param, _ = helper._from_flat_dict(orig_in_param) # Pop out any continuation related keywords for param in [last_param, orig_in_param]: for key in ['reuse', 'continuation']: param['PARAM'].pop(key, None) if orig_in_param != last_param: self.report( 'Pushed the input parameters of the last completed calculation to the next iteration' ) self.ctx.calc_update[IN_LINKS['parameters']] = orm.Dict( dict=last_param) return None
[docs]class CastepAlterRelaxWorkChain(CastepRelaxWorkChain): """ A relaxation workflow that alternates between fixed cell and unfixed cell This is meidate the problem in CASTEP where if the cell is partially constraints the convergence would be very slow. To overcome this problem, the structure should be relaxed with cell constraints then restart with fixed cell and repeat. Following fields can be used in ``relax_options`` :var_cell_iter_max: Maximum iterations in variable cell relaxation, default to 10 :fix_cell_iter_max: Maximum iterations in fixed cell relaxation, default to 20 """ _default_fix_cell_iter_max = 20 _default_var_cell_iter_max = 10 _max_meta_iterations = 11
[docs] @classmethod def define(cls, spec): """Define the workchain""" super(CastepAlterRelaxWorkChain, cls).define(spec) spec.exit_code(201, 'ERROR_NO_CELL_CONS_SET', 'NO cell_constraints find in the input')
[docs] def setup(self): super(CastepAlterRelaxWorkChain, self).setup() input_parameters = self.inputs.calc.parameters.get_dict() # Find the inital cell constraint cell_constraints = input_parameters.get('cell_constraints') if not cell_constraints: cell_constraints = input_parameters['CELL'].get('cell_constraints') if not cell_constraints: return self.exit_codes.ERROR_NO_CELL_CONS_SET self.ctx.init_cell_cons = cell_constraints # Set the iteration limit self.ctx.var_cell_iter_max = self.ctx.relax_options.pop( 'var_cell_iter_max', self._default_var_cell_iter_max) self.ctx.fix_cell_iter_max = self.ctx.relax_options.pop( 'fix_cell_iter_max', self._default_fix_cell_iter_max) self.ctx.restart_mode = 'reuse' self.ctx.is_fixed_cell = False return None
[docs] def inspect_relax(self): """ Inspet the relaxation results, check if convergence is reached. """ if self.ctx.get('bypass_relax', False): self.report("Bypass mode, convergence checking skipped") self.ctx.is_converged = True return None workchain = self.ctx.workchains[-1] if not workchain.is_finished_ok: self.report( 'Relaxation CastepBaseWorkChain failed with exit status {}'. format(workchain.exit_status)) return self.exit_codes.ERROR_SUB_PROCESS_FAILED_RELAX try: structure = workchain.outputs.output_structure except AttributeError: self.report( 'Relaxation CastepBaseWorkChain finished but not output structure' ) return self.exit_codes.ERROR_SUB_PROCESS_FAILED_RELAX # Check if the geometric convergence has reached output_parameters = workchain.outputs.output_parameters.get_dict() unconv = output_parameters.get('geom_unconverged') if unconv is True or self.ctx.is_fixed_cell: # Assign restart mode and folder if self.ctx.restart_mode in ['reuse', 'continuation']: self.ctx.base_update['{}_folder'.format( self.ctx.restart_mode)] = workchain.outputs.remote_folder # Unless we use the continuation mode, the structure should be set to the output structure if self.ctx.restart_mode != 'continuation': self.ctx.current_structure = structure if self.ctx.is_fixed_cell: self.set_cons_and_imax(self.ctx.init_cell_cons, self.ctx.var_cell_iter_max) self.ctx.is_fixed_cell = False # Give verbose information if unconv is True: self.report( 'Fixed cell relax not conerged, restore cell constraints anyway' ) else: self.report( 'Fixed cell relax is converged, restart with inital cell constraints' ) else: # Set the constraint to fully fixed cell self.set_cons_and_imax(['0 0 0', '0 0 0'], self.ctx.fix_cell_iter_max) self.ctx.is_fixed_cell = True self.report( 'Variable cell relax not converged. Turning cell constraints off' ) # Variable cell + converged geometry else: self.ctx.is_converged = True self.report('Geometry optimisation is converged') return None
[docs] def set_cons_and_imax(self, cell_cons, iter_max): """Set the cell constraints""" # Load the current configuration from the context last_param = self.ctx.calc_update.get('parameters', None) # If not, create from the inputs if not last_param: last_param = self.inputs.calc.parameters.get_dict() # Keep the original format of the inptus. if 'CELL' in last_param: last_param['CELL']['cell_constraints'] = cell_cons else: last_param['cell_constraints'] = cell_cons # Keep the original format of the inptus. if 'PARAM' in last_param: last_param['PARAM']['geom_max_iter'] = iter_max else: last_param['geom_max_iter'] = iter_max self.ctx.calc_update['parameters'] = last_param