Source code for aiida_castep.calculations.castep

"""
Calculations of CASTEP
"""

import re
import time
from fnmatch import fnmatch
from subprocess import call, check_output
from textwrap import TextWrapper

import aiida.orm as orm
from aiida.common import CalcInfo, CodeInfo, InputValidationError
from aiida.common.folders import Folder
from aiida.engine import CalcJob, ProcessBuilder, run_get_node
from aiida.manage.manager import get_manager
from aiida.orm.nodes.data.base import to_aiida_type

from aiida_castep._version import CALC_PARSER_VERSION

from ..common import EXIT_CODES_SPEC, INPUT_LINKNAMES, OUTPUT_LINKNAMES
from .inpgen import CastepInputGenerator
from .tools import (
    castep_input_summary,
    check_restart,
    input_param_validator,
    update_parameters,
    use_pseudos_from_family,
)
from .utils import get_castep_ion_line

__version__ = CALC_PARSER_VERSION

inp_ln = INPUT_LINKNAMES
out_ln = OUTPUT_LINKNAMES
ecodes = EXIT_CODES_SPEC

# Define the version of the calculation

__all__ = ["CastepCalculation", "submit_test"]


[docs]class CastepCalculation(CalcJob, CastepInputGenerator): """ Class representing a generic CASTEP calculation - This class should work for all types of calculations. """ # Create a dict of the defaults _DEFAULTS = { "seedname": "aiida", "symlink_usage": True, "parent_folder_name": "parent", "parser_name": "castep.castep", "use_kpoints": True, "withmpi": True, } _DEFAULTS["input_filename"] = _DEFAULTS["seedname"] + ".cell" _DEFAULTS["output_filename"] = _DEFAULTS["seedname"] + ".castep" _default_retrieve_list = [ "*.err", "*.den_fmt", "*.elf_fmt", "*-out.cell", "*.pdos_bin", ] # Some class methods retrieve_dict = { "phonon": [".phonon"], "phonon+efield": [".phonon", ".efield"], "magres": [".magres"], "transitionstatesearch": [".ts"], "molecular dynamics": [".md"], "moleculardynamics": [".md"], "geometryoptimisation": [".geom"], "geometryoptimization": [".geom"], "spectral": [".ome_bin", ".dome_bin"], } # NOT CURRENTLY USED _acceptable_tasks = [ "singlepoint", "geometryoptimization", "geometryoptimisation", ] _copied_attributes = [ "jobresource_param", "custom_scheduler_commands", "max_wallclock_seconds", ] _write_headers = True _cell_links = [ inp_ln["parameters"], inp_ln["structure"], inp_ln["settings"], inp_ln["kpoints"], ] _param_links = [inp_ln["parameters"]] # Extra kpoints - CASTEP has many calculation mode that take extra kpoints _extra_kpoints = { "spectral": { # name XX_kpoints_list "task": ("spectral",), "need_weights": True, # Whether the explicit kpoints need weights or not }, "bs": { "task": ("bandstructure",), # task where the kpoints will be used "need_weigthts": False, }, "phonon": { "task": ("phonon", "phonon+efield"), "need_weights": False, }, "phonon_fine": { "task": ("phonon", "phonon+efield"), "need_weights": False, }, "supercell": { "task": ("phonon",), "need_weights": True, }, "magres": { "task": ("magres",), "need_weights": True, }, "optics": { "task": ("optics",), "need_weights": True, }, "elnes": { "task": ("elnes",), "need_weights": True, }, }
[docs] @classmethod def define(cls, spec): super().define(spec) # Initialise interal params, saved as metadata.options for key, value in cls._DEFAULTS.items(): port_name = "metadata.options." + key spec.input(port_name, default=value) spec.input( "metadata.options.retrieve_list", valid_type=list, default=cls._default_retrieve_list, ) # Begin defining the input nodes spec.input( inp_ln["structure"], valid_type=orm.StructureData, help="The input structure", ) spec.input( inp_ln["settings"], valid_type=orm.Dict, serializer=to_aiida_type, required=False, help="A node for additional settings", ) spec.input( inp_ln["parameters"], valid_type=orm.Dict, serializer=to_aiida_type, validator=input_param_validator, help="A node that defines the input parameters", ) spec.input( inp_ln["parent_calc_folder"], valid_type=orm.RemoteData, help="Use a remote folder as the parent folder. Useful for restarts.", required=False, ) spec.input_namespace( "pseudos", help=( "Use nodes for the pseudopotentails of one of" "the element in the structure. You should pass a" "a dictionary specifying the pseudpotential node for" "each kind such as {O: <PsudoNode>}" ), dynamic=True, ) spec.input( inp_ln["kpoints"], valid_type=orm.KpointsData, required=False, help="Use a node defining the kpoints for the calculation", ) # Define additional kpoints for different tasks for key, value in cls._extra_kpoints.items(): tasks = ", ".join(value["task"]) spec.input( key + "_" + inp_ln["kpoints"], valid_type=orm.KpointsData, required=False, help=f"Extra kpoints input for task: {tasks}", ) # Define the exit codes for smsg, (code, msg, inv) in ecodes.items(): spec.exit_code(code, smsg, message=msg, invalidates_cache=inv) # Define the output nodes spec.output( out_ln["results"], required=True, valid_type=orm.Dict, help="Parsed results in a dictionary format.", ) spec.outputs.dynamic = True # Define the default inputs, enable CalcJobNode to use .res spec.default_output_node = out_ln["results"]
[docs] def prepare_for_submission(self, folder): """ Routine to be called when create the input files and other stuff :param folder: a aiida.common.folders.Folder subclass where the plugin should put all its files. :param inputdict: a dictionary with the input nodes, as they would be returned by get_inputs_dict (without the Code!) """ self.prepare_inputs() local_copy_list = [] remote_copy_list = [] remote_symlink_list = [] require_parent = False for k in self.param_dict: if str(k).lower() in ["reuse", "continuation"]: require_parent = True break parent_calc_folder = self.inputs.get("parent_calc_folder") if parent_calc_folder is None and require_parent: raise InputValidationError( "No parent calculation folder passed" " for restart calculation using reuse/continuation" ) ############################## # END OF INITIAL INPUT CHECK # ############################## # Generate input file self.prepare_inputs(reset=True) if self._write_headers is True: cell_nodes = [] for name, inp in self.inputs.items(): if name in self._cell_links and inp: cell_nodes.append([name, inp]) # process pseudos for name, pseudo in self.inputs.pseudos.items(): cell_nodes.append([f"pseudo__{name}", pseudo]) self.cell_file.header = self._generate_header_lines(cell_nodes) param_nodes = [] for name, inp in self.inputs.items(): if name in self._param_links and inp: param_nodes.append([name, inp]) self.param_file.header = self._generate_header_lines(param_nodes) local_copy_list.extend(self.local_copy_list_to_append) seedname = self.inputs.metadata.options.seedname cell_fn = seedname + ".cell" param_fn = seedname + ".param" with folder.open(cell_fn, mode="w") as incell: incell.write(self.cell_file.get_string()) with folder.open(param_fn, mode="w") as inparam: inparam.write(self.param_file.get_string()) # IMPLEMENT OPERATIONS FOR RESTART symlink = self.inputs.metadata.options.symlink_usage parent_calc_folder = self.inputs.get("parent_calc_folder", None) if parent_calc_folder: comp_uuid = parent_calc_folder.computer.uuid remote_path = parent_calc_folder.get_remote_path() if symlink: remote_list = remote_symlink_list else: remote_list = remote_copy_list remote_list.append( ( comp_uuid, remote_path, self.inputs.metadata.options.parent_folder_name, ) ) calcinfo = CalcInfo() calcinfo.uuid = self.uuid # COPY/SYMLINK LISTS calcinfo.local_copy_list = local_copy_list calcinfo.remote_copy_list = remote_copy_list calcinfo.remote_symlink_list = remote_symlink_list # SET UP extra CMDLINE arguments cmdline_params = self.settings_dict.pop("CMDLINE", []) # Extra parameters are added after the seed for CASTEP calcinfo.cmdline_params = [seedname] + list(cmdline_params) # CASTEP don't have any STDOUT etc when running calculations # Error is shown in the *.err file # Construct codeinfo instance codeinfo = CodeInfo() codeinfo.cmdline_params = [seedname] + list(cmdline_params) codeinfo.code_uuid = self.inputs.code.uuid calcinfo.codes_info = [codeinfo] # Retrieve by default the .castep file and the bands file calcinfo.retrieve_list = [] calcinfo.retrieve_list.append(seedname + ".castep") calcinfo.retrieve_list.append(seedname + ".bands") settings_retrieve_list = self.settings_dict.pop("ADDITIONAL_RETRIEVE_LIST", []) calcinfo.retrieve_list.extend(settings_retrieve_list) calcinfo.retrieve_temporary_list = [] calcinfo.retrieve_temporary_list.extend( self.settings_dict.pop("ADDITIONAL_RETRIEVE_TEMPORARY_LIST", []) ) calculation_mode = self.param_file.get("task", "singlepoint") # If we are doing geometryoptimisation retrieved the geom file and -out.cell file # dictionary for task specific file retrieve task_extra = self.retrieve_dict.get(calculation_mode.lower(), []) for suffix in task_extra: settings_retrieve_list.append(seedname + suffix) # Retrieve output cell file if requested if self.param_file.get("write_cell_structure"): settings_retrieve_list.append(seedname + "-out.cell") calcinfo.retrieve_list += settings_retrieve_list calcinfo.retrieve_list += self._default_retrieve_list # Remove parser options in the setting dictionary # At the moment parser options are not used here if self.settings_dict: raise InputValidationError( "The following keys have been found in " "the settings input node, but were not understood: {}".format( ",".join(list(self.settings_dict.keys())) ) ) return calcinfo
# Attach the input summary method
[docs] @staticmethod def get_castep_input_summary(builder): """Summarize the input for a builder""" return castep_input_summary(builder)
[docs] @classmethod def submit_test(cls, *args, **kwargs): """Test submission with a builder of inputs""" if args and isinstance(args[0], ProcessBuilder): return submit_test(args[0]) return submit_test(cls, **kwargs)
[docs] @classmethod def check_restart(cls, builder, verbose=False): """Check the existence of restart file is needed""" check_restart(builder, verbose)
[docs] @classmethod def dryrun_test(cls, inputs, castep_exe="castep.serial", verbose=True): """ Do a dryrun test in a folder with prepared builder or inputs """ if isinstance(inputs, ProcessBuilder): res = cls.submit_test(inputs) else: res = cls.submit_test(cls, **inputs) folder = Folder(res[1]) dry_run_node = res[0] seedname = dry_run_node.get_option("seedname") def _print(inp): if verbose: print(inp) # Do a dryrun try: output = check_output([castep_exe, "-v"], universal_newlines=True) except OSError: _print(f"CASTEP executable '{castep_exe}' is not found") return None # Now start dryrun _print( "Running with {}".format( check_output(["which", castep_exe], universal_newlines=True) ) ) _print(output) _print("Starting dryrun...") call([castep_exe, "--dryrun", seedname], cwd=folder.abspath) # Check if any *err files contents = folder.get_content_list() for fname in contents: if fnmatch(fname, "*.err"): with folder.open(fname) as fhandle: _print(f"Error found in {fname}:\fname") _print(fhandle.read()) raise InputValidationError("Error found during dryrun") # Gather information from the dryrun file dryrun_results = {} out_file = seedname + ".castep" with folder.open(out_file) as fhandle: for line in fhandle: mth = re.match(r"\s*k-Points For SCF Sampling:\s+(\d+)\s*", line) if mth: dryrun_results["num_kpoints"] = int(mth.group(1)) _print(f"Number of k-points: {mth.group(1)}") mth = None continue mth = re.match( r"\| Approx\. total storage required" r" per process\s+([0-9.]+)\sMB\s+([0-9.]+)", line, ) if mth: dryrun_results["memory_MB"] = float(mth.group(1)) dryrun_results["disk_MB"] = float(mth.group(2)) _print(f"RAM: {mth.group(1)} MB, DISK: {mth.group(2)} MB") mth = None continue return folder, dryrun_results
[docs] def _prepare_cell_file(self): """Add extra kpoints information to the calculation""" # First, call the base method super()._prepare_cell_file() param = self.inputs.get(inp_ln["parameters"]).get_dict() task = param["PARAM"].get("task", "singlepoint") # Check if we have more kpoints for kpn_name, kpn_settings in self._extra_kpoints.items(): extra_kpns = self.inputs.get(kpn_name + "_" + inp_ln["kpoints"]) # No need to proceed if it is not defined if extra_kpns is None: continue self._include_extra_kpoints(extra_kpns, kpn_name, kpn_settings) # Warn if this kpoint will not be used by the task if task not in kpn_settings["task"]: self.report( "Warning: kpoints for {} will not be used for task {}".format( kpn_name, task ) )
[docs] @staticmethod def update_paraemters(inputs, *args, **kwargs): """Update the paramters for a given input dictionary/builder""" return update_parameters(inputs, *args, **kwargs)
[docs] @staticmethod def use_pseudos_from_family(inputs, family_name): use_pseudos_from_family(inputs, family_name)
[docs] def _generate_header_lines(self, other_nodes=None): """ Generate header lines to go into param and cell files :param other_nodes: A list of pairs of (linkname, node) """ profile = get_manager().get_profile() if not profile: return None wrapper = TextWrapper(initial_indent="# ", subsequent_indent="# ") time_str = time.strftime("%H:%M:%S %d/%m/%Y %Z") lines = [ f"##### Generated by aiida_castep {time_str} #####", "# author: Bonan Zhu (zhubonan@outlook.com)", "# " "# AiiDA User: {}".format(orm.User.objects.get_default().get_full_name()), f"# AiiDA profile: {profile.name}", "# Information of the calculation node", # "# type: {}".format(self.get_name()), # "# pk: {}".format(self.pk), # "# uuid: {}".format(self.uuid), "# label: {}".format(self.inputs.metadata.get("label")), "# description:", ] description = self.inputs.metadata.get("description") if description: lines.extend(wrapper.wrap(description)) lines.append("") # additional information of the input nodes if other_nodes: lines.append("## Information of input nodes used:") for name, node in other_nodes: node_lines = [ "# ", f"# type: {node}", f"# pk: {node.pk}", f"# linkname: {name}", f"# uuid: {node.uuid}", f"# label: {node.label}", "# description:", ] _desc = node.description if _desc: node_lines.extend(wrapper.wrap(_desc)) node_lines.append("") lines.extend(node_lines) lines.append("# END OF HEADER") return lines
class TaskSpecificCalculation(CastepCalculation): """ Class for Calculations that only allow certain tasks """ _acceptable_tasks = [] def prepare_for_submission(self, folder): in_dict = self.inputs[INPUT_LINKNAMES["parameters"]].get_dict() # Check if task is correctly set all_tasks = [t.lower() for t in self._acceptable_tasks] if in_dict["PARAM"]["task"].lower() not in all_tasks: raise InputValidationError( "Wrong TASK value {}" " set in PARAM".format(in_dict["PARAM"]["task"].lower()) ) return super().prepare_for_submission(folder) class CastepTSCalculation(TaskSpecificCalculation): """ CASTEP calculation for transition state search. Use an extra input product structure. """ _acceptable_tasks = ["transitionstatesearch"] @classmethod def define(cls, spec): super().define(spec) spec.input( inp_ln["prod_structure"], valid_type=orm.StructureData, required=True, help="Product structure for transition state search.", ) def _prepare_cell_file(self): """ Extend the prepare_cell_filer method to include product structure """ super()._prepare_cell_file() p_structure = self.inputs[inp_ln["prod_structure"]] pdt_position_list = [] for site in p_structure.sites: kind = p_structure.get_kind(site.kind_name) name = kind.symbol line = get_castep_ion_line(name, site.position) pdt_position_list.append(line) self.cell_file["POSITIONS_ABS_PRODUCT"] = pdt_position_list
[docs]def submit_test(arg, **kwargs): """This essentially test the submition""" # Deal with passing an process builder if isinstance(arg, ProcessBuilder): inputs = arg inputs["metadata"]["store_provenance"] = False inputs["metadata"]["dry_run"] = True output_node = run_get_node(inputs).node inputs["metadata"]["store_provenance"] = True inputs["metadata"]["dry_run"] = False else: inputs = kwargs inputs["metadata"]["store_provenance"] = False inputs["metadata"]["dry_run"] = True output_node = run_get_node(arg, **inputs).node inputs["metadata"]["store_provenance"] = True inputs["metadata"]["dry_run"] = False return output_node, output_node.dry_run_info["folder"]