Source code for aiida_castep.data.otfg

"""
Storing OTFG configuration as Data nodes
"""

from aiida.orm import Data, Group, QueryBuilder, User

from aiida.common.utils import classproperty
from aiida.common import ValidationError
from .utils import split_otfg_entry
from .usp import UspData

OLD_OTFGGROUP_TYPE = "data.castep.otfg.family"
OTFGGROUP_TYPE = "castep.otfg"


[docs]class OTFGGroup(Group): """Class representing an OTFGGroup"""
[docs]def migrate_otfg_family(): """Migrate the old OTFG families to new families""" old_types = [OLD_OTFGGROUP_TYPE, "data.castep.usp.family"] q = QueryBuilder() q.append(Group, filters={'type_string': {'in': old_types}}) migrated = [] created = [] for (old_group, ) in q.iterall(): new_group, created = OTFGGroup.objects.get_or_create( label=old_group.label, description=old_group.description) new_group.add_nodes(list(old_group.nodes)) new_group.store() migrated.append(new_group.label) if created: print("Created new style Group for <{}>".format(old_group.label)) else: print(("Adding nodes to existing group <{}>".format( old_group.label))) return
[docs]def upload_otfg_family(entries, group_label, group_description, stop_if_existing=True): """ Set a family for the OTFG pseudo potential strings """ from aiida.common import UniquenessError, NotExistent from aiida.orm.querybuilder import QueryBuilder #from aiida.common import aiidalogger # Try to retrieve a group if it exists try: group = OTFGGroup.get(label=group_label) group_created = False except NotExistent: group = OTFGGroup(label=group_label, ) group_created = True group.description = group_description otfg_and_created = [] nentries = len(entries) for entry in entries: # Add it if it is just one existing data if isinstance(entry, OTFGData): element, setting = entry.element, entry.string elif isinstance(entry, str): element, setting = split_otfg_entry(entry) elif isinstance(entry, UspData): element, setting = entry.element, entry.md5sum qb = QueryBuilder() qb.append(OTFGData, filters={ 'attributes.otfg_entry': { "==": setting }, 'attributes.element': { "==": element } }) existing_otfg = qb.first() # Try find Usp data if existing_otfg is None: qb = QueryBuilder() qb.append(UspData, filters={ 'attributes.md5sum': { "==": setting }, 'attributes.element': { "==": element } }) existing_otfg = qb.first() # Act based on wether the data exists if existing_otfg is None: if isinstance(entry, OTFGData): otfg_and_created.append((entry, True)) elif isinstance(entry, str): otfg, created = OTFGData.get_or_create(entry, use_first=True, store_otfg=False) otfg_and_created.append((otfg, created)) elif isinstance(entry, UspData): otfg_and_created.append((entry, True)) else: if stop_if_existing: raise ValidationError( "A OTFG group cannot be added when stop_if_existing is True" ) existing_otfg = existing_otfg[0] otfg_and_created.append((existing_otfg, False)) # Check for unique for the complete group elements = [(i[0].element, i[0].string) for i in otfg_and_created] # Add other entries for the list to check if not group_created: for aiida_n in group.nodes: if not isinstance(aiida_n, (OTFGData, UspData)): print(("Warning: unsupported node: {}".format(aiida_n))) continue elements.append((aiida_n.element, aiida_n.string)) # Discard duplicated pairs elements = set(elements) elements_names = [e[0] for e in elements] # Check the uniqueness of the complete group if not len(elements_names) == len(set(elements_names)): duplicates = set( [x for x in elements_names if elements_names.count(x) > 1]) dup_string = ", ".join(duplicates) raise UniquenessError("More than one Nodes found for the elements: " + dup_string + ".") # If we survive here uniqueness is fine # Save the group - note we have not added the nodes yet if group_created: group.store() # Save the OTFG in the database if necessary and add them to the group for otfg, created in otfg_and_created: if created: otfg.store() else: pass nodes_add = [otfg for otfg, created in otfg_and_created] nodes_new = [otfg for otfg, created in otfg_and_created if created is True] group.add_nodes(nodes_add) return nentries, len(nodes_new)
[docs]class OTFGData(Data): """ Class representing an OTFG configuration """
[docs] def __init__(self, **kwargs): """ Store a string for on-the-fly generation of pseudopotentials :param otfg_entry str: a string specifying the generation. The element this potential is for can also be included. For example: 'O 2|1.1|15|18|20|20:21(qc=7)' """ otfg_entry = kwargs.pop('otfg_entry', None) super(OTFGData, self).__init__(**kwargs) if otfg_entry: element, entry = split_otfg_entry(otfg_entry) self.set_string(entry) if element: self.set_element(element)
[docs] @classmethod def get_or_create(cls, otfg_entry, use_first=False, store_otfg=True): """ Create or retrieve OTFG from database :param otfg_entry: CASTEP styled OTFG entry. Can either be the name of library (e.g C9) or the full specification with element like: "O 2|1.1|15|18|20|20:21(qc=7)" The created OTFGData node will by default labelled by the fully entry. """ in_db = cls.from_entry(otfg_entry) # No existing entry if len(in_db) == 0: instance = cls(otfg_entry=otfg_entry) if store_otfg: instance.store() # Automatically set the label instance.label = otfg_entry return (instance, True) # There is a existing identical enetry in the db else: if len(in_db) > 1: if use_first: return (in_db[0], False) else: pks = ", ".join([str(i.pk) for i in in_db]) raise ValueError( "More than one duplicated OTFG data has been found. pks={}" .format(pks)) else: return (in_db[0], False)
[docs] def set_string(self, otfg_entry): """Set the full string of OTFGData instance""" if self.element is None: self.set_element("LIBRARY") self.set_attribute("otfg_entry", str(otfg_entry))
[docs] def set_element(self, element): """Set the element of OTFGData instance""" self.set_attribute("element", str(element))
[docs] def store(self, *args, **kwargs): self._validate() return super(OTFGData, self).store(*args, **kwargs)
@property def string(self): return self.get_attribute('otfg_entry', None) @property def element(self): """Element of the OTFG. May not be available""" return self.get_attribute('element', None) @property def entry(self): """Plain format of the OTFG""" string = self.string element = self.element if element is None or element == "LIBRARY": return string else: return element + " " + string
[docs] @classmethod def from_entry(cls, entry): """ Return a list of OTFG that matches with the string """ from aiida.orm.querybuilder import QueryBuilder from .utils import split_otfg_entry element, string = split_otfg_entry(entry) qb = QueryBuilder() qb.append(cls, filters={ 'attributes.otfg_entry': { '==': string }, 'attributes.element': { '==': element } }) return [i[0] for i in qb.all()]
[docs] def _validate(self): """Validate the format of OTFG configuration""" super(OTFGData, self)._validate() if self.element is None: raise ValidationError("The value of element is not set. " "Set it to 'LIBRARY' manually to indicate. " "This is a library")
@classproperty def otfg_family_type_string(cls): return OTFGGROUP_TYPE
[docs] @classmethod def get_otfg_group(cls, group_label): """ Return the OTFGData group with the given name. """ return OTFGGroup.objects.get(label=group_label)
[docs] @classmethod def get_otfg_groups(cls, filter_elements=None, user=None): """ Return all names of groups of type otfg, possibly with some filters. :param filter_elements: A string or a list of strings. If present, returns only the groups that contains one Usp for every element present in the list. Default=None, meaning that all families are returned. :param user: if None (default), return the groups for all users. If defined, it should be either a DbUser instance, or a string for the user name (that is, the user email). """ query = QueryBuilder() filters = {'type_string': {'==': cls.otfg_family_type_string}} query.append(OTFGGroup, filters=filters, tag='group', project='*') if user: query.append(User, filters={'email': { '==': user }}, with_group='group') if isinstance(filter_elements, str): filter_elements = [filter_elements] if filter_elements is not None: actual_filter_elements = {_.capitalize() for _ in filter_elements} # LIBRARY is a wild card actual_filter_elements.add("LBIRARY") query.append( cls, filters={'attributes.element': { 'in': filter_elements }}, with_group='group') query.order_by({'group': {'id': 'asc'}}) return [_[0] for _ in query.all()]