Source code for aiida_castep.utils.mock

#!/usr/bin/env python
"""
Mock running CASTEP
"""

import hashlib
import json
import shutil
import tempfile
from pathlib import Path

import castepinput
import click

_TEST_BASE_FOLDER = Path(__file__).parent.parent.parent / "tests"
_TEST_DATA_FOLDER = _TEST_BASE_FOLDER / "data"


[docs]def get_hash(dict_obj): """ Return hash of a dict of strings """ rec = [] for k, v in dict_obj.items(): if isinstance(v, str): rec.append(k + ":" + v) elif isinstance(v, list): rec.append(k + "".join(v)) # Update, use sorted so the original order does not matter base = [record.encode().lower() for record in sorted(rec)] # Compute the hash md5 = hashlib.md5() [md5.update(b) for b in base] return md5.hexdigest(), base
[docs]def test_hash(): """Test hashing""" exmp = {"a": "b", "b": "a"} get_hash(exmp) exmp = {"a": "b", "b": ["a", "c"]} h1, b1 = get_hash(exmp) exmp = {"b": ["a", "c"], "a": "b"} h2, b2 = get_hash(exmp) assert b1 == b2 assert h1 == h2
[docs]class MockOutput:
[docs] def __init__(self, base_dir=None): """ Initialize the object """ if base_dir is None: self.base_dir = _TEST_DATA_FOLDER else: self.base_dir = Path(base_dir).resolve()
[docs] def calculate_hash(self, path): path = str(path) param = castepinput.ParamInput.from_file(path + ".param", plain=True) cell = castepinput.CellInput.from_file(path + ".cell", plain=True) all_inp = dict(param) all_inp.update(cell) all_inp.pop("comment", None) all_inp.pop("COMMENT", None) return get_hash(all_inp)[0]
@property def _reg_file(self): """ Path to the registry file """ return self.base_dir / "registry.json" @property def registry(self): """ Registry, a dictionary loaded from the json """ if not self._reg_file.is_file(): return {} with open(str(self._reg_file)) as fh: reg = json.load(fh) return reg
[docs] def register_node(self, calcjob, tag=None): """ Register the result of a CalcJob node :param calcjob: The CalcJob node to be used :param tag: Tag for the results, used as the folder name """ # Copy the 'objects' to an temporary directory # Include both the retrieved and the inputs tmpwork = tempfile.mkdtemp() retrieved = calcjob.outputs.retrieved for node in [retrieved, calcjob]: for nm in node.list_object_names(): if nm.startswith(".") or nm.startswith("_"): continue with node.open(nm) as fin: fpath = Path(tmpwork) / nm fpath.write_text(fin.read()) seedpath = Path(tmpwork) / calcjob.get_option("seedname") # Register the resutls self.register(seedpath, tag) shutil.rmtree(tmpwork)
[docs] def register(self, seedpath, tag=None): """ Register completed calculation. Such calculation must be in the directroy tree of the mock_castep.py """ seedpath = Path(seedpath).resolve() hash_ = self.calculate_hash(seedpath) if self._reg_file.is_file(): try: with open(str(self._reg_file)) as fh: reg = json.load(fh) except RuntimeError: reg = {} else: reg = {} # Relative seedpath to the folder try: rel_path = seedpath.relative_to(self.base_dir).parent except ValueError: # It is not placed in the sub folder # Copy manually import shutil if tag is None: # Use the first 8 digits as the name tag = hash_[:8] shutil.copytree(str(seedpath.parent), str(self.base_dir / tag)) rel_path = tag reg[hash_] = str(rel_path) # Save the json settings with open(str(self._reg_file), "w") as fh: json.dump(reg, fh)
[docs] def copy_results(self, rel_path): """ Copy existing calculation to the folder """ print("Selected path:", rel_path) import shutil res_files = (self.base_dir / rel_path).glob("*") cwd = Path.cwd() for r in res_files: shutil.copy(str(r), str(cwd)) return
[docs] def run(self, seedname): """ Run the 'Calculation', if the seed is known we just copy the results Can be overiden by the 'MOCK_CALC' environmental varaible , which define the folder of results to be copied """ import os overide = os.environ.get("MOCK_CALC") if overide: self.copy_results(overide) print("Overiden by MOCK_CALC") print(f"Returning results from {Path(overide).resolve()}") return hash_ = self.calculate_hash(seedname) reg = self.registry known_result = reg.get(hash_, None) if known_result: self.copy_results(known_result) print(f"Returning results from {self.base_dir / known_result}") else: raise RuntimeError("Results not registered")
@click.command("mock") @click.option("--reg", default=False, is_flag=True, help="Register the calculation") @click.option("--tag", help="Tag for the folder when registering results", default=None) @click.argument("seed") def main(seed, reg, tag): runner = MockOutput() if reg: runner.register(seed, tag=tag) else: runner.run(seed) if __name__ == "__main__": main()