Source code for aiida_castep.utils

"""
Utility module with useful functions
"""
import io
from pathlib import Path

import numpy as np
from aiida.repository.common import FileType

# pylint: disable=import-outside-toplevel, too-many-locals


[docs]def band_array_ensure_ndim(array): """ Ensure there are three dimensions for a band-like array. Sometimes, we general band array with (nk, nb) without the first dimension. This function ensures that there are three dimensions (ns, nk, nb). """ if array.ndim == 2: return array.reshape((1,) + array.shape) return array
[docs]def atoms_to_castep(atoms, index): """Convert ase atoms' index to castep like return (Specie, Ion) Deprecated, use ase_to_castep_index""" atom = atoms[index] symbol = atom.symbol # Start counter count = 0 for atom in atoms: if atom.symbol == symbol: count += 1 if atom.index == index: break return symbol, count
[docs]def ase_to_castep_index(atoms, indices): """Convert a list of indices to castep style return list of (element, i in the same element)""" if isinstance(indices, int): indices = [indices] symbols = np.array(atoms.get_chemical_symbols()) iatoms = np.arange(len(atoms)) res = [] # Iterate through given indices for i in indices: mask = symbols == symbols[i] # Select the same species # CASTEP start counting from 1 counter = np.where(iatoms[mask] == i)[0][0] + 1 res.append([symbols[i], counter]) return res
[docs]def generate_ionic_fix_cons(atoms, indices, mask=None, count_start=1): """ create ionic constraint section via indices and ase Atoms :param atoms: atoms object to be fixed :param indices: indices of the atoms to be fixed :param mask: a list of 3 integers, must be 0 (no fix) or 1 (fix this Cartesian) :returns: (lines, index of the next constraint) """ # Convert to castep style indices (element, index) castep_indices = ase_to_castep_index(atoms, indices) count = count_start lines = [] if mask is None: mask = (1, 1, 1) for symbol, i in castep_indices: if mask[0]: lines.append(f"{count:<4d} {symbol:<2} {i:<4d} 1 0 0") if mask[1]: lines.append(f"{count + 1:<4d} {symbol:<2} {i:<4d} 0 1 0") if mask[2]: lines.append(f"{count + 2:<4d} {symbol:<2} {i:<4d} 0 0 1") count += sum(mask) return lines, count
[docs]def generate_rel_fix(atoms, indices, ref_index=0, count_start=1): """ Generate relative constraints .. note:: In CASTEP the mass of atoms are coupled in the fix, hence to truelly fix the relative positions you will have to declare all atoms having the same weight using the SPECIES_MASS block. :param atoms: atoms object to be fixed :param indices: indices of the atoms to be fixed :param ref_index: index of the reference atom in amoung the atoms being fixed. Default is the first atom appear in the indices. :returns: (lines, index of the next constraint) """ castep_indices = ase_to_castep_index(atoms, indices) lines = [] count = count_start symbol_ref, i_ref = castep_indices[ref_index] for symbol, i in castep_indices[1:]: lines.append(f"{count:<4d} {symbol:<2} {i:<4d} 1 0 0") lines.append(f"{count:<4d} {symbol_ref:<2} {i_ref:<4d} -1 0 0") lines.append(f"{count + 1:<4d} {symbol:<2} {i:<4d} 0 1 0") lines.append(f"{count + 1:<4d} {symbol_ref:<2} {i_ref:<4d} 0 -1 0") lines.append(f"{count + 2:<4d} {symbol:<2} {i:<4d} 0 0 1") lines.append(f"{count + 2:<4d} {symbol_ref:<2} {i_ref:<4d} 0 0 -1") count += 3 return lines, count
[docs]def castep_to_atoms(atoms, specie, ion): """Convert castep like index to ase Atoms index""" return [atom for atom in atoms if atom.symbol == specie][ion - 1].index
[docs]def sort_atoms_castep(atoms, copy=True, order=None): """ Sort ``ase.Atoms`` instance to castep style. A sorted ``Atoms`` will have the same index before and after calculation. This is useful when chaining optimisation requires specifying per atoms tags such as *SPIN* and *ionic_constraints*. :param order: orders of coordinates. (0, 1, 2) means the sorted atoms will be ascending by x, then y, then z if there are equal x or ys. :returns: A ``ase.Atoms`` object that is sorted. """ _ = copy # Sort castep style if order is not None: for i in reversed(order): isort = np.argsort(atoms.positions[:, i], kind="mergesort") atoms.positions = atoms.positions[isort] atoms.numbers = atoms.numbers[isort] isort = np.argsort(atoms.numbers, kind="mergesort") atoms = atoms[isort] return atoms
[docs]def desort_atoms_castep(atoms, original_atoms): """ Recover the original ordering of the atoms. CASTEP sort the atoms in the order of atomic number and then the order of appearance internally. :param copy: If True then return an copy of the atoms :returns: A ``ase.Atoms`` object that has be sorted """ isort = np.argsort(original_atoms.numbers, kind="mergesort") rsort = np.zeros(isort.shape, dtype=int) rsort.fill(-1) for i, j in enumerate(isort): rsort[j] = i return atoms[rsort]
[docs]def is_castep_sorted(atoms): """ Check if an Atoms object is CASTEP style sorted """ numbers = np.asarray(atoms.numbers) return np.all(numbers == np.sort(numbers))
[docs]def reuse_kpoints_grid(grid, lowest_pk=False): """ Retrieve previously stored kpoints mesh data node. If there is no such ``KpointsData``, a new node will be created. Will return the one with highest pk :param grid: Grid to be retrieved :param bool lowest_pk: If set to True will return the node with lowest pk :returns: A KpointsData node representing the grid requested """ from aiida.orm import KpointsData, QueryBuilder qbd = QueryBuilder() qbd.append( KpointsData, tag="kpoints", filters={ "attributes.mesh.0": grid[0], "attributes.mesh.1": grid[1], "attributes.mesh.2": grid[2], }, ) if lowest_pk: order = "asc" else: order = "desc" qbd.order_by({"kpoints": [{"id": {"order": order}}]}) if qbd.count() >= 1: return qbd.first()[0] kpoints = KpointsData() kpoints.set_kpoints_mesh(grid) return kpoints
[docs]def traj_to_atoms(traj, combine_ancesters=False, eng_key="enthalpy"): """ Generate a list of ASE Atoms given an AiiDA TrajectoryData object :param bool combine_ancesters: If true will try to combine trajectory from ancestor calculations :returns: A list of atoms for the trajectory. """ from aiida.orm import CalcJobNode, Node, QueryBuilder from ase import Atoms from ase.calculators.singlepoint import SinglePointCalculator from aiida_castep.common import OUTPUT_LINKNAMES # If a CalcJobNode is passed, select its output trajectory if isinstance(traj, CalcJobNode): traj = traj.outputs.__getattr__(OUTPUT_LINKNAMES["trajectory"]) # Combine trajectory from ancesters if combine_ancesters is True: qbd = QueryBuilder() qbd.append(Node, filters={"uuid": traj.uuid}) qbd.append(CalcJobNode, tag="ans", ancestor_of=Node) qbd.order_by({"ans": "id"}) calcs = [_[0] for _ in qbd.iterall()] atoms_list = [] for counter in calcs: atoms_list.extend( traj_to_atoms( counter.outputs.__getattr__(OUTPUT_LINKNAMES["trajectory"]), combine_ancesters=False, eng_key=eng_key, ) ) return atoms_list forces = traj.get_array("forces") symbols = traj.get_array("symbols") positions = traj.get_array("positions") try: eng = traj.get_array(eng_key) except KeyError: eng = None cells = traj.get_array("cells") atoms_traj = [] for counter, pos, eng_, force in zip(cells, positions, eng, forces): atoms = Atoms(symbols=symbols, cell=counter, pbc=True, positions=pos) calc = SinglePointCalculator(atoms, energy=eng_, forces=force) atoms.set_calculator(calc) atoms_traj.append(atoms) return atoms_traj
[docs]def get_remote_folder_info(calc, transport): """Get the information of the remote folder of a calculation""" path = calc.outputs.remote_folder.get_remote_path() transport.chdir(path) lsattrs = transport.listdir_withattributes() return lsattrs
[docs]def get_remote_folder_size(calc, transport): """ Return size of a remote folder of a calculation in MB """ try: lsattrs = get_remote_folder_info(calc, transport) except OSError: return 0 total_size = 0 for attr in lsattrs: total_size += attr["attributes"].st_size # in byres return total_size / (1024) ** 2
[docs]def take_popn(seed): """ Take section of population analysis from a seed.castep file Return a list of StringIO of the population analysis section """ popns = [] rec = False with open(seed + ".castep") as fhd: for line in fhd: if "Atomic Populations (Mulliken)" in line: record = io.StringIO() rec = True # record information if rec is True: if line.strip() == "": rec = False record.seek(0) popns.append(record) else: record.write(line) return popns
[docs]def read_popn(fname): """Read population file into pandas dataframe""" import pandas as pd table = pd.read_table(fname, sep=r"\s\s+", header=2, comment="=", engine="python") return table
[docs]def export_calculation(node, output_dir, prefix=None): """ Export one calculation a a directory """ output_dir = Path(output_dir) output_dir.mkdir(exist_ok=True) def bwrite(node, outpath): """Write the objects stored under a node to a certain path""" try: seedname = node.get_option("seedname") except AttributeError: seedname = None for objname in node.list_object_names(): if node.get_object(objname).file_type != FileType.FILE: continue with node.base.repository.open(objname, mode="rb") as fsource: if prefix and Path(objname).stem == seedname: outname = prefix + Path(objname).suffix else: outname = objname fpath = str(outpath / outname) with open(fpath, "wb") as fout: readlength = 1024 * 512 # 1MB while True: buf = fsource.read(readlength) if buf: fout.write(buf) else: break # inputs bwrite(node, output_dir) # outputs retrieved = node.outputs.retrieved bwrite(retrieved, output_dir)
[docs]def compute_kpoints_spacing(cell, grid, unit="2pi"): """ Compute the spacing of the kpoints in the reciprocal space. Spacing = 1 / cell_length / mesh for each dimension. Assume orthogonal cell shape. """ cell = np.asarray(cell, dtype=float) grid = np.asarray(grid, dtype=float) spacings = 1.0 / cell / grid if unit == "1/A": spacings *= 2 * np.pi elif unit != "2pi": raise ValueError(f"Unit {unit} is not unkown") return spacings