Source code for aiida_castep.parsers.castep

"""
Parsers for CASTEP
"""
from contextlib import contextmanager
from copy import deepcopy
from pathlib import Path
from typing import List, Union

import numpy as np
from aiida.common import exceptions
from aiida.orm import ArrayData, BandsData, Dict, TrajectoryData
from aiida.parsers.parser import Parser

from aiida_castep._version import CALC_PARSER_VERSION
from aiida_castep.common import EXIT_CODES_SPEC as calc_exit_code
from aiida_castep.common import OUTPUT_LINKNAMES as out_ln
from aiida_castep.parsers.castep_bin import CastepbinFile
from aiida_castep.parsers.raw_parser import RawParser, units
from aiida_castep.parsers.utils import (
    add_last_if_exists,
    desort_structure,
    get_desort_args,
    structure_from_input,
)

# pylint: disable=invalid-name,too-many-locals,too-many-statements,too-many-branches
__version__ = CALC_PARSER_VERSION

ERR_FILE_WARNING_MSG = ".err files found in workdir"

# Tasks where the bands (eigenvalues) that oen is interested in is not those
# from the SCF calculation. In these cases, we have to get the bands from the .bands
# file instead of the castep_bin/check outputs, since the latter only stored those
# of the electronic ground state calculation (SCF
NON_SCF_BAND_TASKS = ("spectral", "bandstructure", "optics")


[docs]class RetrievedFileManager: """ Wrapper for supplying an unified file-handler interface for the retrieved content """
[docs] def __init__(self, retrieved_node, retrieved_temporary_folder=None): """ Instantiate the manager object """ self.node = retrieved_node if retrieved_temporary_folder is not None: self.tmp_folder = Path(retrieved_temporary_folder) # Store the relative paths as strings self.tmp_content_names = list( map( lambda x: str(x.relative_to(self.tmp_folder)), self.tmp_folder.iterdir(), ) ) else: self.tmp_folder = None self.tmp_content_names = [] self.perm_content_names = self.node.list_object_names() self.all_content_names = self.perm_content_names + self.tmp_content_names
[docs] @contextmanager def open(self, name, mode="r"): """Open a file from either the retrieved node or the temporary folder""" if name in self.perm_content_names: with self.node.open(name, mode=mode) as handle: yield handle elif name in self.tmp_content_names: with open(self.tmp_folder / name, mode=mode) as handle: yield handle else: raise FileNotFoundError(f"Object {name} is not found!")
[docs] def has_file(self, name) -> bool: """Return if we have this file""" return name in self.all_content_names
[docs] def list_object_names(self) -> List[str]: """Return all object names avalaible""" return self.all_content_names
[docs] def get_object_content(self, name, mode="r") -> Union[str, bytes]: with self.open(name, mode=mode) as handle: return handle.read()
[docs]class CastepParser(Parser): """ This is the class for Parsing results from a CASTEP calculation Supported calculations types: singlepoint geom """ _setting_key = "parser_options" @property def castep_input_parameters(self): """Access the original castep input parameters""" return self.node.inputs.parameters.get_dict() @property def castep_task(self): """Task of the calculation""" return self.castep_input_parameters["PARAM"].get("task", "singlepoint")
[docs] def parse(self, **kwargs): """ Receives a dictionary of retrieved nodes.retrieved. Top level logic of operation """ try: retrieved = self.retrieved except exceptions.NotExistent: return self.exit_codes.ERROR_NO_RETRIEVED_FOLDER output_folder = RetrievedFileManager( retrieved, kwargs.get("retrieved_temporary_folder") ) warnings = [] exit_code_1 = None # NOTE parser options not used for not parser_opts = {} # NOT READILY IN USE input_dict = {} # check what is inside the folder filenames = output_folder.list_object_names() # Get calculation options options = self.node.get_options() seedname = options["seedname"] # at least the stdout should exist if options["output_filename"] not in filenames: self.logger.error("Standard output not found") return self.exit_codes.ERROR_NO_OUTPUT_FILE # The calculation is failed if there is any err file. err_filenames = [] for fname in filenames: if not fname.endswith(".err"): continue file_contents = list(filter(None, output_folder.get_object_content(fname).split("\n"))) if "continuing with calculation" not in file_contents[-1]: err_filenames.append(fname) if err_filenames: exit_code_1 = "ERROR_CASTEP_ERROR" # Add the content of err files err_contents = set() for fname in err_filenames: err_contents.add(output_folder.get_object_content(fname)) # Trajectory files has_md_geom = False out_md_geom_name_content = None for suffix in (".geom", ".md"): fname = seedname + suffix if fname in filenames: out_md_geom_name_content = ( fname, output_folder.get_object_content(fname).split("\n"), ) has_md_geom = True break # Handling bands fname = seedname + ".bands" has_bands = fname in filenames if has_bands: out_bands_content = output_folder.get_object_content(fname).split("\n") else: out_bands_content = None out_file = options["output_filename"] out_file_content = output_folder.get_object_content(out_file).split("\n") ###### CALL THE RAW PASSING FUNCTION TO PARSE DATA ####### raw_parser = RawParser( out_lines=out_file_content, input_dict=input_dict, md_geom_info=out_md_geom_name_content, bands_lines=out_bands_content, **parser_opts, ) ( out_dict, trajectory_data, structure_data, bands_data, exit_code_2, ) = raw_parser.parse() # Combine the exit codes use the more specific error exit_code = None for code in calc_exit_code: if code in (exit_code_2, exit_code_1): exit_code = code break # Append the final value of trajectory_data into out_dict last_value_keys = [ "free_energy", "total_energy", "zero_K_energy", "spin_density", "abs_spin_density", "enthalpy", ] for key in last_value_keys: add_last_if_exists(trajectory_data, key, out_dict) # Add warnings from this level out_dict["warnings"].extend(warnings) # Add error messages out_dict["error_messages"] = list(err_contents) ######## --- PROCESSING BANDS DATA -- ######## if has_bands or output_folder.has_file(seedname + ".castep_bin"): # Only use castep_bin if we are interested in SCF kpoints if output_folder.has_file(seedname + ".castep_bin") and ( self.castep_task.lower() not in NON_SCF_BAND_TASKS ): self.logger.info("Using castep_bin file for the bands data.") bands_node = bands_from_castepbin(seedname, output_folder) if not self._has_empty_bands(bands_node): # Set if no other errors out_dict["warnings"].append( "At least one kpoint has no empty bands, energy/forces returned are not reliable." ) if exit_code == "CALC_FINISHED": exit_code = "ERROR_NO_EMPTY_BANDS" else: bands_node = bands_to_bandsdata(**bands_data) self.out(out_ln["bands"], bands_node) ######## --- PROCESSING MULLIKEN DATA --- ######## if not err_filenames: input_structure = self.node.inputs.structure idesort = get_desort_args(input_structure) if len(out_dict.get("charges", [])) > 1: new_charges = np.array(out_dict["charges"])[idesort] out_dict["charges"] = new_charges if len(out_dict.get("spins", [])) > 1: new_spins = np.array(out_dict["spins"])[idesort] out_dict["spins"] = new_spins ######## --- PROCESSING STRUCTURE DATA --- ######## no_optimise = False try: cell = structure_data["cell"] positions = structure_data["positions"] symbols = structure_data["symbols"] except KeyError: # Handle special case where CASTEP founds nothing to optimise, # hence we attached the input geometry as the output for warning in out_dict["warnings"]: if "there is nothing to optimise" in warning: no_optimise = True if no_optimise is True: self.out(out_ln["structure"], deepcopy(self.node.inputs.structure)) else: structure_node = structure_from_input( cell=cell, positions=positions, symbols=symbols ) # Use the output label as the input label input_structure = self.node.inputs.structure structure_node = desort_structure(structure_node, input_structure) structure_node.label = input_structure.label self.out(out_ln["structure"], structure_node) ######### --- PROCESSING TRAJECTORY DATA --- ######## # If there is anything to save # It should... if trajectory_data: # Resorting indices - for recovering the original ordering of the # species in the input structure input_structure = self.node.inputs.structure idesort = get_desort_args(input_structure) # If we have .geom file, save as in a trajectory data if has_md_geom: try: positions = np.asarray(trajectory_data["positions"])[:, idesort] cells = trajectory_data["cells"] # Assume symbols do not change - symbols are the same for all frames symbols = np.asarray(trajectory_data["symbols"])[idesort] stepids = np.arange(len(positions)) except KeyError: out_dict["parser_warning"].append( "Cannot " "extract data from .geom file." ) else: traj = TrajectoryData() traj.set_trajectory( stepids=np.asarray(stepids), cells=np.asarray(cells), symbols=np.asarray(symbols), positions=np.asarray(positions), ) # Save the rest for name, value in trajectory_data.items(): # Skip saving empty arrays if len(value) == 0: continue array = np.asarray(value) # For forces/velocities we also need to resort the array if ("force" in name) or ("velocities" in name): array = array[:, idesort] traj.set_array(name, np.asarray(value)) self.out(out_ln["trajectory"], traj) # Or may there is nothing to optimise? still save a Trajectory data elif no_optimise is True: traj = TrajectoryData() input_structure = self.node.inputs.structure traj.set_trajectory( stepids=np.asarray([1]), cells=np.asarray([input_structure.cell]), symbols=np.asarray( [site.kind_name for site in input_structure.sites] ), positions=np.asarray( [[site.position for site in input_structure.sites]] ), ) # Save the rest for name, value in trajectory_data.items(): # Skip saving empty arrays if len(value) == 0: continue array = np.asarray(value) # For forces/velocities we also need to resort the array if ("force" in name) or ("velocities" in name): array = array[:, idesort] traj.set_array(name, np.asarray(value)) self.out(out_ln["trajectory"], traj) # Otherwise, save data into a ArrayData node else: out_array = ArrayData() for name, value in trajectory_data.items(): # Skip saving empty arrays if len(value) == 0: continue array = np.asarray(value) if ("force" in name) or ("velocities" in name): array = array[:, idesort] out_array.set_array(name, np.asarray(value)) self.out(out_ln["array"], out_array) ######## ---- PROCESSING OUTPUT DATA --- ######## output_params = Dict(dict=out_dict) self.out(out_ln["results"], output_params) # Return the exit code return self.exit_codes.__getattr__(exit_code)
[docs] def _has_empty_bands(self, bands_data: BandsData, thresh=0.005): """ Check for the occupation of the BandsData There should be some empty bands if the calculation is a not a fixed occupation one. Otherwise, the final energy and forces are not reliable. """ # Check if occupation is allowed to vary param = self.node.inputs.parameters.get_dict()["PARAM"] # If it is a fixed occupation calculation we do not need to do anything about it.... fix_occ = ( param.get("fix_occupancy", False) or param.get("metals_method", "dm").lower() == "none" or param.get("elec_method", "dm").lower() == "none" ) if fix_occ: return True _, occ = bands_data.get_bands(also_occupations=True) nspin, nkppts, _ = occ.shape problems = [] for ispin in range(nspin): for ikpts in range(nkppts): if occ[ispin, ikpts, -1] >= thresh: problems.append((ispin, ikpts, occ[ispin, ikpts, -1])) if problems: for ispin, ikpts, val in problems: self.logger.warning( "No empty bands for spin %d, kpoint %d - occ: %.5f", ispin, ikpts, val, ) return False return True
[docs]def bands_to_bandsdata(bands_info, kpoints, bands): """ Convert the result of parser_dot_bands into a BandsData object :param bands_info: A dictionary of the informations of the bands file. contains field such as eferemi, units, cell :param kpoints: An array of the kpoints of the bands, rows are (kindex, kx, ky, kz, weight) :param bands: The actual bands array :return: A BandsData object :rtype: ``aiida.orm.bands.data.array.bands.BandsData`` """ bands_node = BandsData() # Extract the index of the kpoints kpn_array = np.asarray(kpoints) k_index = kpn_array[:, 0] # We need to restore the order of the kpoints k_sort = np.argsort(k_index) # Sort the kpn_array kpn_array = kpn_array[k_sort] _weights = kpn_array[:, -1] kpts = kpn_array[:, 1:-1] bands_node.set_kpoints(kpts, weights=_weights) # Sort the bands to match the order of the kpoints bands_array = np.asarray(bands)[k_sort] # We need to swap the axes from kpt,spin,engs to spin,kpt,engs bands_array = bands_array.swapaxes(0, 1) # Squeeze the first dimension e.g when there is a single spin if bands_array.shape[0] == 1: bands_array = bands_array[0] bands_array = bands_array * units["Eh"] bands_info = dict(bands_info) # Create a copy # Convert the units for the fermi energies if isinstance(bands_info["efermi"], list): bands_info["efermi"] = [x * units["Eh"] for x in bands_info["efermi"]] else: bands_info["efermi"] = bands_info["efermi"] * units["Eh"] bands_node.set_bands(bands_array, units="eV") # PBC is always true as this is PW DFT.... bands_node.set_cell(bands_info["cell"], pbc=(True, True, True)) # Store information from *.bands in the attributes # This is needs as we need to know the number of electrons # and the fermi energy for key, value in bands_info.items(): bands_node.set_attribute(key, value) return bands_node
[docs]def bands_from_castepbin(seedname, fmanager): """ Acquire and prepare bands data from the castep_bin file instead """ with fmanager.open(seedname + ".castep_bin", "rb") as handle: binfile = CastepbinFile(fileobj=handle) bands_node = BandsData() kidx = binfile.kpoints_indices sort_idx = np.argsort(kidx) # Generated sorted arrays kpoints = binfile.kpoints[sort_idx, :] weights = binfile.kpoint_weights[sort_idx].astype(float) eigenvalues = binfile.eigenvalues[:, sort_idx, :] occupancies = binfile.occupancies[:, sort_idx, :] efermi = binfile.fermi_energy bands_node.set_kpoints(kpoints, weights=weights) bands_node.set_bands(eigenvalues, occupations=occupancies, units="eV") bands_node.set_cell(binfile.cell, pbc=(True, True, True)) bands_node.set_attribute("efermi", efermi) return bands_node