Source code for aiida_castep.parsers.raw_parser

"""
Module for parsing .castep file
This module should not rely on any of AiiDA modules
"""

import logging
import re
from collections import defaultdict

import numpy as np

from aiida_castep._version import CALC_PARSER_VERSION
from aiida_castep.common import EXIT_CODES_SPEC
from aiida_castep.parsers.utils import CASTEPOutputParsingError

from .constants import units

# pylint: disable=invalid-name,logging-format-interpolation,too-many-instance-attributes,too-many-branches,too-many-statements,too-many-locals
LOGGER = logging.getLogger("aiida")

__version__ = CALC_PARSER_VERSION

unit_suffix = "_units"

SCF_FAILURE_ERROR = "ERROR_SCF_NOT_CONVERGED"
GEOM_FAILURE_MESSAGE = "Maximum geometry optimization cycle has been reached"
END_NOT_FOUND_ERROR = "ERROR_NO_END_OF_CALCULATION"
INSUFFICENT_TIME_ERROR = "ERROR_TIMELIMIT_REACHED"
STOP_REQUESTED_ERROR = "ERROR_STOP_REQUESTED"


[docs]class RawParser: """An raw parser object to parse the output of CASTEP"""
[docs] def __init__(self, out_lines, input_dict, md_geom_info, bands_lines, **parser_opts): """Instantiate the parser by passing list of the lines""" self.dot_castep_lines = out_lines self.input_dict = input_dict self.md_geom_info = md_geom_info self.bands_lines = bands_lines self.parser_opts = parser_opts self.bands_res = {} self.traj_data = {} self.warnings = [] self.geom_data = {} self.dot_castep_critical_message = [] self.dot_castep_traj = {} self.dot_castep_data = {}
[docs] def parse(self): """ :return: A list of: * out_dict: a dictionary with parsed data. * trajectory_data: dictionary of trajectory data. * structure data: dictionary of cell, positions and symbols. * exit_code: exit code indicating potential problems of the run 2 different keys to check in out_dict: *parser_warning* and *warnings*. """ parser_version = __version__ parser_info = {} parser_info["parser_warnings"] = [] parser_info["parser_info"] = "AiiDA CASTEP basic Parser v{}".format( parser_version ) parser_info["warnings"] = [] exit_code = "UNKNOWN_ERROR" finished_run = False # Use Total time as a mark for completed run for line in self.dot_castep_lines[-20:]: # Check only the last 20 lines # Otherwise unfinished restarts may be seen as finished if "Total time" in line: finished_run = True break self.parse_dot_castep() # Warn if the run is not finished if self.md_geom_info is not None: glines = self.md_geom_info[1] geom_data = self.parse_geom(glines) # For geom file the second energy is the enthalpy while # for MD it is the approx hamiltonian (etotal + ek) if "geom" in self.md_geom_info[0]: geom_data["geom_enthalpy"] = geom_data["hamilt_energy"] # Parse the bands file if self.bands_lines is not None: bands_res = self.parse_dot_bands() else: bands_res = None # Return the most 'specific' error. For example one calculation # may not terminate correctly due unconverged SCF. # In this case, we set the exit code to SCF unconverged. if finished_run: exit_code = "CALC_FINISHED" else: exit_code = "ERROR_NO_END_OF_CALCULATION" for code in EXIT_CODES_SPEC: if code in self.warnings: exit_code = code break # Pick out trajectory data and structure data if self.dot_castep_traj: traj_data = dict(self.dot_castep_traj) self.traj_data = traj_data if self.md_geom_info is not None: # Use the geom file's trajectory instead # some the files are overwritten traj_data.update(geom_data) try: last_cell = traj_data["cells"][-1] last_positions = traj_data["positions"][-1] symbols = traj_data["symbols"] except (KeyError, IndexError): # Cannot find the last geometry data structure_data = {} else: structure_data = dict( cell=last_cell, positions=last_positions, symbols=symbols ) else: traj_data = {} structure_data = {} # A dictionary for the results to be returned results_dict = dict(self.dot_castep_data) results_dict.update(parser_info) # Combine the warnings all_warnings = ( results_dict["warnings"] + parser_info["warnings"] + self.warnings ) # Make the warnings set-like e.g we don't want to repeat messages # Save a bit of the storage space all_warnings = list(set(all_warnings)) results_dict["warnings"] = all_warnings return [results_dict, traj_data, structure_data, bands_res, exit_code]
[docs] def parse_dot_bands(self): """Parse the dot_bands""" bands_info, kpoints, bands = parse_dot_bands(self.bands_lines) self.bands_res = {"bands_info": bands_info, "kpoints": kpoints, "bands": bands} return self.bands_res
[docs] def parse_geom(self, glines=None): """Parse the geom lines""" if glines is None: glines = self.md_geom_info[1] self.geom_data = parse_geom_text_output(self.md_geom_info[1], {}) return self.geom_data
[docs] def parse_dot_castep(self): """Parse the dot-castep file""" parsed_data, trajectory_data, critical_message = parse_castep_text_output( self.dot_castep_lines, {} ) self.dot_castep_data = parsed_data self.dot_castep_traj = trajectory_data self.dot_castep_critical_message = critical_message self.warnings = parsed_data["warnings"] return parsed_data, trajectory_data, critical_message
# Re for getting unit unit_re = re.compile(r"^ output\s+(\w+) unit *: *([^\s]+)") time_re = re.compile(r"^(\w+) time += +([0-9.]+) s$") parallel_re = re.compile(r"^Overall parallel efficiency rating: [\w ]+ \(([0-9]+)%\)") version_re = re.compile(r"CASTEP version ([0-9.]+)")
[docs]def parse_castep_text_output(out_lines, input_dict): """ Parse ouput of .castep :param out_lines: a list of lines from readlines function :param input_dict: Control some variables. Currently support 'n_warning_lines'- number of the lines to include for a general warning. :return: A list of parsed_data, trajectory_data and critical_messages: * parsed_data: dictionary with key values, referring to the last occurring quantities * trajectory_data: key, values of the intermediate scf steps, such as during geometryoptimization * critical_messages: a list with critical messages. If any is found in parsed_data["warnings"] the calculation should be considered as failed. """ pseudo_files = {} parsed_data = {} parsed_data["warnings"] = [] if not input_dict: input_dict = {} # Some parameters can be controlled n_warning_lines = input_dict.get("n_warning_lines", 10) # Initialise storage space for trajectory # Not all is needed. But we parse as much as we can here trajectory_data = defaultdict(list) # For the warnings I use dictionary in the format of # format {<keywords in line>:<message to pass>}, the message to pass # is save in the dictionary and used to set the exit code critical_warnings = { "SCF cycles performed but system has not reached the groundstate": SCF_FAILURE_ERROR, "STOP keyword detected in parameter file. Stop execution.": STOP_REQUESTED_ERROR, "Insufficient time for another iteration": INSUFFICENT_TIME_ERROR, } # Warnings that won't result in a calculation in FAILED state minor_warnings = { "Warning": None, "WARNING": None, "Geometry optimization failed to converge": GEOM_FAILURE_MESSAGE, } # A dictionary witch keys we should check at each line all_warnings = dict(critical_warnings) all_warnings.update(minor_warnings) # Split header and body body_start = None version = None for i, line in enumerate(out_lines): # Find the castep version if version is None: vmatch = version_re.search(line) if vmatch: version = vmatch.group(1) parsed_data["castep_version"] = version # Finding the units we used unit_match = unit_re.match(line) if unit_match: uname = unit_match.group(1) uvalue = unit_match.group(2) parsed_data["unit_" + uname] = uvalue if "Files used for pseudopotentials" in line: for line_ in out_lines[i + 1 :]: if "---" in line_: break try: specie, pp_file = line_.strip().split() except ValueError: break else: pseudo_files.update({specie: pp_file}) if "Total number of ions" in line: parsed_data["num_ions"] = int(line.strip().split("=")[1].strip()) continue if "Point group of crystal" in line: parsed_data["point_group"] = line.strip().split("=")[1].strip() continue if "Space group of crystal" in line: parsed_data["space_group"] = line.strip().split("=")[1].strip() continue if "Cell constraints" in line: parsed_data["cell_constraints"] = line.strip().split(":")[1].strip() continue if "Number of kpoints used" in line: parsed_data["n_kpoints"] = line.strip().split("=")[1].strip() continue if "MEMORY AND SCRATCH DISK ESTIMATES" in line: body_start = i break parsed_data.update(pseudo_pots=pseudo_files) # Parse repeating information skip = 0 body_lines = out_lines[body_start:] iter_parser = get_iter_parser() for count, line in enumerate(body_lines): # Allow skipping certain number of lines if skip > 0: skip -= 1 continue res_tmp = iter_parser.parse(line) if res_tmp: name, value = res_tmp[:2] trajectory_data[name].append(value) continue if "Calculation parallelised over" in line: num_cores = int(line.strip().split()[-2]) parsed_data["parallel_procs"] = num_cores continue if "Stress Tensor" in line: i, stress, pressure = parse_stress_box(body_lines[count : count + 20]) assert len(stress) == 3 if "Symmetrised" in line: prefix = "symm_" else: prefix = "" trajectory_data[prefix + "pressure"].append(pressure) trajectory_data[prefix + "stress"].append(stress) skip = i if "Forces *******" in line: num_lines = parsed_data["num_ions"] + 10 box = body_lines[count : (count + num_lines)] i, forces = parse_force_box(box) # Resolve force names # For backward compatibility symmetrised_forces are stored as forces if "Constrained" in line: force_name = "cons_forces" else: tmp = line.replace("*", " ").strip().lower().replace(" ", "_") if tmp == "symmetrised_forces": force_name = "forces" else: force_name = tmp if not forces: LOGGER.error(f"Cannot parse force lines {box}") trajectory_data[force_name].append(forces) skip = i continue if "Atomic Populations (Mulliken)" in line: end_index = [ index for index, line in enumerate(body_lines) if "Bond" in line ][0] box = body_lines[count:end_index] i, charges, spins = parse_popn_box(box) parsed_data["charges"] = charges parsed_data["spins"] = spins skip = i continue for warn_key in all_warnings: if warn_key in line: message = all_warnings[warn_key] if message is None: message = body_lines[count : count + n_warning_lines] message = "\n".join(message) parsed_data["warnings"].append(message) # Parse the end a few lines for line in body_lines[-50:]: time_line = time_re.match(line) # Save information about time usage if time_line: time_name = time_line.group(1).lower() + "_time" parsed_data[time_name] = float(time_line.group(2)) continue para_line = parallel_re.match(line) if para_line: parsed_data["parallel_efficiency"] = int(para_line.group(1)) continue #### END OF LINE BY LINE PARSING ITERATION #### # remove unrelated units units_to_delete = [] for key in parsed_data: if "unit_" in key: unit_for = key.split("_", 1)[1] delete = True # Check the thing this unit refers do exists for i in trajectory_data: if i == key: continue if unit_for in i: delete = False for i in parsed_data: if i == key: continue if unit_for in i: delete = False if delete is True: units_to_delete.append(key) for key in units_to_delete: parsed_data.pop(key) # set geom convergence state if GEOM_FAILURE_MESSAGE in parsed_data["warnings"]: parsed_data["geom_unconverged"] = True else: parsed_data["geom_unconverged"] = None return parsed_data, trajectory_data, list(critical_warnings.values())
[docs]class LineParser: """ Parser for a line """
[docs] def __init__(self, conditions): """initialize the Parser by passing the conditions""" self._cond = conditions
[docs] def parse(self, line): """ Return parsing results :returns: Result of the first matched Matcher object or None if no match is found """ out = None for c in self._cond: out, _ = c.match_pattern(line) if out: break return out
[docs]class Matcher: """ Class of the condition to match the line """
[docs] def __init__(self, regex, name, convfunc=None): """ Initialize a Matcher object. :param string regex: Pattern to be matched :param string name: Name of the results """ self.regex = re.compile(regex) self.convfunc = convfunc self.name = name
[docs] def match_pattern(self, line): """ Match pattern :returns: (out, match) Out is a dicationary of {self.name: <matched_value>}. and match is a re.MatchObject or None """ match = self.regex.match(line) if match: value = match.group(1) if self.convfunc: value = self.convfunc(value) out = (self.name, value) else: out = None return out, match
[docs]class UnitMatcher(Matcher): """ The pattern of a UnitMatcher should have two groups with second group being the unit. The first group will be converted to float """
[docs] def match_pattern(self, line): """ Match the pattern """ out, match = super().match_pattern(line) if out: unit = match.group(2) conv = self.convfunc if self.convfunc else float out = (out[0], conv(out[1]), unit) return out, match
[docs]def get_iter_parser(): """ Generate a LineParser object to parse repeating outputs """ tail1 = r" *= *([0-9.+-eE]+) +(\w+)" mfree = UnitMatcher(r"^Final free energy \(E-TS\)" + tail1, "free_energy") mtotal = UnitMatcher(r"^Final energy, E" + tail1, "total_energy") mtotal2 = UnitMatcher(r"^Final energy" + tail1, "total_energy") mzeroK = UnitMatcher(r"^NB est. 0K energy \(E-0.5TS\)" + tail1, "zero_K_energy") spin = UnitMatcher(r"^Integrated Spin Density" + tail1, "spin_density") absspin = UnitMatcher(r"^Integrated \|Spin Density\|" + tail1, "abs_spin_density") enthalpy = UnitMatcher( r"^ *\w+: finished iteration +\d+ +with enthalpy" + tail1, "enthalpy" ) parser = LineParser([mfree, mtotal, mtotal2, mzeroK, spin, absspin, enthalpy]) return parser
[docs]def parse_geom_text_output(out_lines, input_dict) -> dict: """ Parse output of .geom file :param out_lines: a list of lines from the readline function. :param dict input_dict: not in use at the moment. :return: key, value of the trajectories of cell, atoms, force etc """ _ = input_dict txt = out_lines Hartree = units["Eh"] # eV Bohr = units["a0"] # A kB = units["kB"] # eV/K hBar = units["hbar"] # in eV eV = units["e"] # in J cell_list = [] species_list = [] geom_list = [] forces_list = [] energy_list = [] hamilt_list = [] kinetic_list = [] pressure_list = [] temperature_list = [] velocity_list = [] time_list = [] # For a specific image current_pos = [] current_species = [] current_forces = [] current_velocity = [] current_cell = [] in_header = False for line in txt: if "begin header" in line.lower(): in_header = True continue if "end header" in line.lower(): in_header = False continue if in_header: continue # Skip header lines sline = line.split() if len(sline) == 1: try: time_list.append(float(sline[0])) except ValueError: continue elif "<-- E" in line: energy_list.append(float(sline[0])) # Total energy hamilt_list.append(float(sline[1])) # Hamitonian (MD) # Kinetic energy is not blank in GEOM OPT runs if len(sline) == 5: kinetic_list.append(float(sline[2])) # Kinetic (MD) continue elif "<-- h" in line: current_cell.append(list(map(float, sline[:3]))) continue elif "<-- R" in line: current_pos.append(list(map(float, sline[2:5]))) current_species.append(sline[0]) elif "<-- F" in line: current_forces.append(list(map(float, sline[2:5]))) elif "<-- V" in line: current_velocity.append(list(map(float, sline[2:5]))) elif "<-- T" in line: temperature_list.append(float(sline[0])) elif "<-- P" in line: pressure_list.append(float(sline[0])) elif not line.strip() and current_cell: cell_list.append(current_cell) species_list.append(current_species) geom_list.append(current_pos) forces_list.append(current_forces) current_cell = [] current_species = [] current_pos = [] current_forces = [] if current_velocity: velocity_list.append(current_velocity) current_velocity = [] # Append the last image if applicable (when reached the end of the file) if current_cell: cell_list.append(current_cell) species_list.append(current_species) geom_list.append(current_pos) forces_list.append(current_forces) if current_velocity: velocity_list.append(current_velocity) current_velocity = [] if len(species_list) == 0: raise CASTEPOutputParsingError("No data found in geom file") out = dict( cells=np.array(cell_list) * Bohr, positions=np.array(geom_list) * Bohr, forces=np.array(forces_list) * Hartree / Bohr, geom_total_energy=np.array(energy_list) * Hartree, symbols=species_list[0], ) # optional lists unit_V = Hartree * Bohr / hBar unit_K = Hartree / kB # temperature in K unit_P = Hartree / (Bohr * 1e-10) ** 3 * eV unit_s = hBar / Hartree opt = { "velocities": (velocity_list, unit_V), "temperatures": (temperature_list, unit_K), "pressures": (pressure_list, unit_P), "hamilt_energy": (hamilt_list, Hartree), "times": (time_list, unit_s), "kinetic_energy": (kinetic_list, Hartree), } for key, value in opt.items(): if value[0]: out.update({key: np.array(value[0]) * value[1]}) return out
force_match = re.compile( r"^ +\* +(\w+) +([0-9]+) +([0-9.\-+]+) +([0-9.\-+]+) +([0-9.\-+]+) +\*" ) stress_match = re.compile(r"^ +\* +(\w+) +([0-9.\-+]+) +([0-9.\-+]+) +([0-9.\-+]+) +\*")
[docs]def parse_force_box(lines): """ Parse a box of the forces :param lines: a list of upcoming lines :return: A list of number of lines read and the forces """ forces = [] i = 0 for i, line in enumerate(lines): if "Forces" in line: continue if "***********************" in line: break # remove the cons'd tags line = line.replace("(cons'd)", " ") match = force_match.match(line) if match: forces.append([float(match.group(i)) for i in range(3, 6)]) return i, forces
[docs]def parse_stress_box(lines): """ Parse a box of the stress :param lines: a list of upcoming lines :return: a list of [number of lines read, stress_tensor, pressure] """ stress = [] pressure = None i = 0 for i, line in enumerate(lines): if "Stress" in line: continue if "Cartisian components" in line: unit = line.strip().split()[-2] unit = unit[1:-2] continue if "**********************" in line: break match = stress_match.match(line) if match: stress.append([float(match.group(i)) for i in range(2, 5)]) elif "Pressure" in line: lsplit = line.strip().split() pressure = float(lsplit[-2]) return i, stress, pressure
[docs]def parse_popn_box(lines): """ Parse the Mulliken population box :param lines: a list of upcoming lines :return a list of [number of lines read, charges, spins] """ charges = [] spins = [] i = 0 spin_polarised = False for i, line in enumerate(lines): if line.strip() == "": break elif "Spin" in line: spin_polarised = True continue line = line.strip().split() if line[0].isalpha() and i > 2: if spin_polarised: charges.append(float(line[-2])) spins.append(float(line[-1])) else: charges.append(float(line[-1])) return i, charges, spins
[docs]def parse_dot_bands(bands_lines): """ Parse an CASTEP bands file Extract Kpoints and each bands for each kpoints. This is a generic parsing function. Return python builtin types. The problems is the order of the kpoints written is parallelisation dependent and may not be the same as that specified in *kpoints_list*. However, CASTEP does write the index of the kpoints. We reorder the kpoints and the bands to match the original order in the input file. Note that many other quantities indexed by kpoints, such as the optical matrix elements, follows the order of appearance in the `bands` file. :param bands_lines: A list of lines to be parsed parse :return: A list of bands_info, kpoints and bands: * bands_info: A dictionary for information of bands. * kpoints: A list of kpoints. In the format [kpoint index, coordinats x 3 kpoint weight] * bands: A list of bands. Each band has a list of actual eigenvalues for each spin components. E.g nkpoints, nspins, neigns. Note that the atomic units are used in the bands file """ i_finish = None cell = [] bands_info = {} i = 0 for i, line in enumerate(bands_lines): if not line.strip(): continue if "Number of k-points" in line: nkps = line.strip().split()[-1] bands_info["nkpts"] = int(float(nkps)) continue if "Number of spin components" in line: nspin = line.strip().split()[-1] bands_info["nspins"] = int(float(nspin)) continue if "Number of electrons" in line: nelec = line.strip().split()[-1] bands_info["nelecs"] = float(nelec) continue if "Number of eigenvalues" in line: neigns = line.strip().split()[-1] bands_info["neigns"] = int(float(neigns)) continue if ( "Fermi energ" in line ): # NOTE possible different format with spin polarisation? tokens = line.strip().split() # Check if there are two fermi energies if re.match(r"^[-.0-9eE]+$", tokens[-2]): efermi = [float(tokens[-2]), float(tokens[-1])] else: efermi = float(tokens[-1]) bands_info["efermi"] = efermi continue if "Unit cell" in line: i_finish = i + 3 continue # Added the cell if i_finish: cell.append([float(n) for n in line.strip().split()]) if i == i_finish: break bands_info["cell"] = cell # Now parse the body kpoints = [] bands = [] this_band = [] this_spin = [] for line in bands_lines[i + 1 :]: if "K-point" in line: # We are not at the first kpoints if kpoints: # Save the result from previous block this_band.append(this_spin) bands.append(this_band) this_spin = [] this_band = [] kline = line.strip().split() kpoints.append([float(n) for n in kline[1:]]) continue if "Spin component" in line: # Check if we are at the second spin if this_spin: this_band.append(this_spin) this_spin = [] continue ls = line.strip() if not ls: continue this_spin.append(float(ls)) # Save the last set of results this_band.append(this_spin) bands.append(this_band) # Do some sanity checks assert int(nkps) == len(kpoints), "Missing kpoints" assert len(bands) == len(kpoints), "Missing bands for certain kpoints" for n, b in enumerate(bands): assert len(b) == int(nspin), f"Missing spins for kpoint {n + 1}" for i, s in enumerate(b): assert len(s) == int( neigns ), "Missing eigenvalues " "for kpoint {} spin {}".format(n + 1, i + 1) return bands_info, kpoints, bands