Source code for aiida_castep.data.otfg

"""
Storing OTFG configuration as Data nodes
"""

from aiida.common import ValidationError
from aiida.common.utils import classproperty
from aiida.orm import Data, Group, QueryBuilder, User

from .usp import UspData
from .utils import split_otfg_entry

OLD_OTFGGROUP_TYPE = "data.castep.otfg.family"
OTFGGROUP_TYPE = "castep.otfg"


[docs]class OTFGGroup(Group): """Class representing an OTFGGroup"""
[docs]def migrate_otfg_family(): """Migrate the old OTFG families to new families""" old_types = [OLD_OTFGGROUP_TYPE, "data.castep.usp.family"] q = QueryBuilder() q.append(Group, filters={"type_string": {"in": old_types}}) migrated = [] created = [] for (old_group,) in q.iterall(): new_group, created = OTFGGroup.objects.get_or_create( label=old_group.label, description=old_group.description ) new_group.add_nodes(list(old_group.nodes)) new_group.store() migrated.append(new_group.label) if created: print(f"Created new style Group for <{old_group.label}>") else: print(f"Adding nodes to existing group <{old_group.label}>") return
[docs]def upload_otfg_family(entries, group_label, group_description, stop_if_existing=True): """ Set a family for the OTFG pseudo potential strings """ from aiida.common import NotExistent, UniquenessError from aiida.orm.querybuilder import QueryBuilder # from aiida.common import aiidalogger # Try to retrieve a group if it exists try: group = OTFGGroup.get(label=group_label) group_created = False except NotExistent: group = OTFGGroup( label=group_label, ) group_created = True group.description = group_description otfg_and_created = [] nentries = len(entries) for entry in entries: # Add it if it is just one existing data if isinstance(entry, OTFGData): element, setting = entry.element, entry.string elif isinstance(entry, str): element, setting = split_otfg_entry(entry) elif isinstance(entry, UspData): element, setting = entry.element, entry.md5sum qb = QueryBuilder() qb.append( OTFGData, filters={ "attributes.otfg_entry": {"==": setting}, "attributes.element": {"==": element}, }, ) existing_otfg = qb.first() # Try find Usp data if existing_otfg is None: qb = QueryBuilder() qb.append( UspData, filters={ "attributes.md5sum": {"==": setting}, "attributes.element": {"==": element}, }, ) existing_otfg = qb.first() # Act based on wether the data exists if existing_otfg is None: if isinstance(entry, OTFGData): otfg_and_created.append((entry, True)) elif isinstance(entry, str): otfg, created = OTFGData.get_or_create( entry, use_first=True, store_otfg=False ) otfg_and_created.append((otfg, created)) elif isinstance(entry, UspData): otfg_and_created.append((entry, True)) else: if stop_if_existing: raise ValidationError( "A OTFG group cannot be added when stop_if_existing is True" ) existing_otfg = existing_otfg[0] otfg_and_created.append((existing_otfg, False)) # Check for unique for the complete group elements = [(i[0].element, i[0].string) for i in otfg_and_created] # Add other entries for the list to check if not group_created: for aiida_n in group.nodes: if not isinstance(aiida_n, (OTFGData, UspData)): print(f"Warning: unsupported node: {aiida_n}") continue elements.append((aiida_n.element, aiida_n.string)) # Discard duplicated pairs elements = set(elements) elements_names = [e[0] for e in elements] # Check the uniqueness of the complete group if not len(elements_names) == len(set(elements_names)): duplicates = {x for x in elements_names if elements_names.count(x) > 1} dup_string = ", ".join(duplicates) raise UniquenessError( "More than one Nodes found for the elements: " + dup_string + "." ) # If we survive here uniqueness is fine # Save the group - note we have not added the nodes yet if group_created: group.store() # Save the OTFG in the database if necessary and add them to the group for otfg, created in otfg_and_created: if created: otfg.store() else: pass nodes_add = [otfg for otfg, created in otfg_and_created] nodes_new = [otfg for otfg, created in otfg_and_created if created is True] group.add_nodes(nodes_add) return nentries, len(nodes_new)
[docs]class OTFGData(Data): """ Class representing an OTFG configuration """
[docs] def __init__(self, **kwargs): """ Store a string for on-the-fly generation of pseudopotentials :param otfg_entry str: a string specifying the generation. The element this potential is for can also be included. For example: 'O 2|1.1|15|18|20|20:21(qc=7)' """ otfg_entry = kwargs.pop("otfg_entry", None) super().__init__(**kwargs) if otfg_entry: element, entry = split_otfg_entry(otfg_entry) self.set_string(entry) if element: self.set_element(element)
[docs] @classmethod def get_or_create(cls, otfg_entry, use_first=False, store_otfg=True): """ Create or retrieve OTFG from database :param otfg_entry: CASTEP styled OTFG entry. Can either be the name of library (e.g C9) or the full specification with element like: "O 2|1.1|15|18|20|20:21(qc=7)" The created OTFGData node will by default labelled by the fully entry. """ in_db = cls.from_entry(otfg_entry) # No existing entry if len(in_db) == 0: instance = cls(otfg_entry=otfg_entry) if store_otfg: instance.store() # Automatically set the label instance.label = otfg_entry return (instance, True) # There is a existing identical enetry in the db else: if len(in_db) > 1: if use_first: return (in_db[0], False) else: pks = ", ".join([str(i.pk) for i in in_db]) raise ValueError( "More than one duplicated OTFG data has been found. pks={}".format( pks ) ) else: return (in_db[0], False)
[docs] def set_string(self, otfg_entry): """Set the full string of OTFGData instance""" if self.element is None: self.set_element("LIBRARY") self.set_attribute("otfg_entry", str(otfg_entry))
[docs] def set_element(self, element): """Set the element of OTFGData instance""" self.set_attribute("element", str(element))
[docs] def store(self, *args, **kwargs): self._validate() return super().store(*args, **kwargs)
@property def string(self): return self.get_attribute("otfg_entry", None) @property def element(self): """Element of the OTFG. May not be available""" return self.get_attribute("element", None) @property def entry(self): """Plain format of the OTFG""" string = self.string element = self.element if element is None or element == "LIBRARY": return string else: return element + " " + string
[docs] @classmethod def from_entry(cls, entry): """ Return a list of OTFG that matches with the string """ from aiida.orm.querybuilder import QueryBuilder from .utils import split_otfg_entry element, string = split_otfg_entry(entry) qb = QueryBuilder() qb.append( cls, filters={ "attributes.otfg_entry": {"==": string}, "attributes.element": {"==": element}, }, ) return [i[0] for i in qb.all()]
[docs] def _validate(self): """Validate the format of OTFG configuration""" super()._validate() if self.element is None: raise ValidationError( "The value of element is not set. " "Set it to 'LIBRARY' manually to indicate. " "This is a library" )
@classproperty def otfg_family_type_string(cls): return OTFGGROUP_TYPE
[docs] @classmethod def get_otfg_group(cls, group_label): """ Return the OTFGData group with the given name. """ return OTFGGroup.objects.get(label=group_label)
[docs] @classmethod def get_otfg_groups(cls, filter_elements=None, user=None): """ Return all names of groups of type otfg, possibly with some filters. :param filter_elements: A string or a list of strings. If present, returns only the groups that contains one Usp for every element present in the list. Default=None, meaning that all families are returned. :param user: if None (default), return the groups for all users. If defined, it should be either a DbUser instance, or a string for the user name (that is, the user email). """ query = QueryBuilder() filters = {"type_string": {"==": cls.otfg_family_type_string}} query.append(OTFGGroup, filters=filters, tag="group", project="*") if user: query.append(User, filters={"email": {"==": user}}, with_group="group") if isinstance(filter_elements, str): filter_elements = [filter_elements] if filter_elements is not None: actual_filter_elements = {_.capitalize() for _ in filter_elements} # LIBRARY is a wild card actual_filter_elements.add("LBIRARY") query.append( cls, filters={"attributes.element": {"in": filter_elements}}, with_group="group", ) query.order_by({"group": {"id": "asc"}}) return [_[0] for _ in query.all()]