Source code for

Module for storing usp files into the database

import warnings
from pathlib import Path
from aiida.orm import SinglefileData
from aiida.common.utils import classproperty
from aiida.common.files import md5_file
from .utils import get_usp_element

USPGROUP_TYPE = "castep.otfg"

# Extract element from filename

[docs]def upload_usp_family(folder, group_label, group_description, stop_if_existing=True): """ Upload a set of usp/recpot files in a give group :param folder: a path containing all UPF files to be added. Only files ending in .usp/.recpot are considered. :param group_label: the name of the group to create. If it exists and is non-empty, a UniquenessError is raised. :param group_description: a string to be set as the group description. Overwrites previous descriptions, if the group was existing. :param stop_if_existing: if True, check for the md5 of the files and, if the file already exists in the DB, raises a MultipleObjectsError. If False, simply adds the existing UPFData node to the group. """ import os import aiida.common #from aiida.common import aiidalogger from aiida.common import UniquenessError, NotExistent from aiida.orm.querybuilder import QueryBuilder from .otfg import OTFGGroup files = [ os.path.realpath(os.path.join(folder, i)) for i in os.listdir(folder) if os.path.isfile(os.path.join(folder, i)) and ( i.lower().endswith('.usp') or i.lower().endswith('recpot') or i.lower().endswith('.uspcc')) ] nfiles = len(files) try: group = OTFGGroup.get(label=group_label) group_created = False except NotExistent: group = OTFGGroup(label=group_label, ) group_created = True # Update the descript even if the group already existed group.description = group_description pseudo_and_created = [] # A list of records (UspData, created) for f in files: md5sum = md5_file(f) qb = QueryBuilder() qb.append(UspData, filters={'attributes.md5': {'==': md5sum}}) existing_usp = qb.first() # Add the file if it is in the database if existing_usp is None: pseudo, created = UspData.get_or_create(f, use_first=True, store_usp=False) pseudo_and_created.append((pseudo, created)) # The same file is there already else: if stop_if_existing: raise ValueError("A usp/recpot with identical MD5 to" " {} cannot be added with stop_if_existing" "".format(f)) existing_usp = existing_usp[0] pseudo_and_created.append((existing_usp, False)) # Check for unique per element elements = [(i[0].element, i[0].md5sum) for i in pseudo_and_created] # Check if we will duplicate after insertion if not group_created: for aiida_n in group.nodes: if not isinstance(aiida_n, UspData): continue elements.append((aiida_n.element, aiida_n.md5sum)) # Discard duplicated pairs elements = set(elements) elements_names = [e[0] for e in elements] # Check the uniqueness of the complete group if not len(elements_names) == len(set(elements_names)): duplicates = set( [x for x in elements_names if elements_names.count(x) > 1]) dup_string = ", ".join(duplicates) raise UniquenessError( "More than one usp/recpot found for the elements: " + dup_string + ".") if group_created: # Save the usp in the database if necessary and add them to the group for pseudo, created in pseudo_and_created: if created: #aiidalogger.debug("New node {} created for file {}".format( # pseudo.uuid, pseudo.filename)) else: #aiidalogger.debug("Reusing node {} for file {}".format( # pseudo.uuid, pseudo.filename)) pass nodes_new = [ pseduo for pseduo, created in pseudo_and_created if created is True ] nodes_add = [pseduo for pseduo, created in pseudo_and_created] group.add_nodes(nodes_add) return nfiles, len(nodes_new)
[docs]class UspData(SinglefileData): """ Class for a single usp file These usp files are stored as individual file nodes in the database """
[docs] def __init__(self, **kwargs): """ Initialize a UspData node :param file str: A full path to the file of the potential :param filename str: The filename needs to be used if ``file`` is a file-like object. :param element: The element that this pseudo potential should be used for """ element = kwargs.pop("element", None) self._abs_path = kwargs["file"] super(UspData, self).__init__(**kwargs) # Overides the element inferred if element is not None: self.set_element(element)
[docs] @classmethod def get_or_create(cls, filename, element=None, use_first=False, store_usp=True): """ Same ase init. Check md5 in the db, it is found return a UspData. Otherwise will store the data into the db :return (usp, created) """ import aiida.common.utils import os # Convert the filename to an absolute path filename = str(filename) if filename != os.path.abspath(filename): raise ValueError("filename must be an absolute path") md5 = md5_file(filename) # Check if we have got the file already pseudos = cls.from_md5(md5) if len(pseudos) == 0: # No existing pseudopotential file is in the database instance = cls(file=filename) # If we there is an element given then I set it if element is not None: instance.set_element(element) # Store the usp if requested if store_usp is True: return (instance, True) else: if len(pseudos) > 1: if use_first: return (pseudos[0], False) else: pks = ", ".join([str( for i in pseudos]) raise ValueError("More than one copy of a pseudopotential" " found. pks={}".format(pks)) else: return (pseudos[0], False)
[docs] @classmethod def from_md5(cls, md5): """ Return a list of all usp pseudopotentials that match a given MD5 hash. Note that the hash has to be stored in a md5 attribute, otherwise the pseudo will not be found. We use a special md5 attribute to avoid searching through irrelevant data types. """ from aiida.orm.querybuilder import QueryBuilder qb = QueryBuilder() qb.append(cls, filters={'attributes.md5': {'==': md5}}) return [_ for [_] in qb.all()]
@classproperty def uspfamily_type_string(cls): """ Type string of the underlying group deprecated as new Group should be access by sub-classing """ return USPGROUP_TYPE
[docs] def store(self, *args, **kwargs): """ Store the node. Automatically set md5 and element """ # Cannot revalidate the stored nodes if not self.is_stored: self._validate() return super(UspData, self).store(*args, **kwargs)
[docs] def set_file(self, file, filename=None): """ Extract element and compute the md5hash """ filename = Path(file).name try: element = get_usp_element(file) except KeyError: element = None else: # Only set the element if it is not there if self.element is None: if element is not None: self.set_element(element) else: warnings.warn( "Cannot extract element form the usp/recpot file {}." "Please set it manually.".format(file)) else: # The element is already set, no need to process further pass md5sum = md5_file(file) self.set_attribute('md5', md5sum) super(UspData, self).set_file(file, filename)
[docs] def set_element(self, element): """ Set the element """ self.set_attribute('element', element)
@property def element(self): return self.get_attribute('element', None) @property def md5sum(self): """MD5 sum of the usp/recpot file""" return self.get_attribute('md5', None) @property def string(self): """Alias of the md5sum""" return self.md5sum
[docs] @classmethod def get_usp_group(cls, group_label): """ Return the UspFamily group with the given name. """ from .otfg import OTFGGroup return OTFGGroup.objects.get(label=group_label)
[docs] @classmethod def get_usp_groups(cls, filter_elements=None, user=None): """ Return all names of groups of type UpfFamily, possibly with some filters. :param filter_elements: A string or a list of strings. If present, returns only the groups that contains one Upf for every element present in the list. Default=None, meaning that all families are returned. :param user: if None (default), return the groups for all users. If defined, it should be either a DbUser instance, or a string for the username (that is, the user email). """ from .otfg import OTFGGroup from aiida.orm import QueryBuilder from aiida.orm import User query = QueryBuilder() query.append(OTFGGroup, tag='group', project=['*']) if user: query.append(User, filters={'email': { '==': user }}, with_group='group') if isinstance(filter_elements, str): filter_elements = [filter_elements] if filter_elements is not None: actual_filter_elements = [_ for _ in filter_elements] query.append( cls, filters={'attributes.element': { 'in': filter_elements }}, with_group='group') query.order_by({OTFGGroup: {'id': 'asc'}}) return [_[0] for _ in query.all()]
[docs] def _validate(self): from aiida.common import ValidationError super(UspData, self)._validate() # Check again, in case things changes usp_abspath = str(self._abs_path) if not usp_abspath: raise ValidationError("No valid usp file was passed") parsed_element = get_usp_element(usp_abspath) md5 = md5_file(usp_abspath) attr_element = self.element if attr_element is None: raise ValidationError("No element is set") attr_md5 = self.get_attribute('md5', None) if self.md5sum is None: raise ValidationError("attribute 'md5' not set.") if md5 != attr_md5: raise ValidationError( "Mismatch between store md5 and actual md5 value") # Warn if the parsed elemnt (if any) is not matching the attribute if attr_element != parsed_element and parsed_element is not None: raise ValidationError("Attribute 'element' says '{}' but '{}' was " "parsed from file name instead.".format( attr_element, parsed_element))