# -*- coding: utf-8 -*-
    This class handles several important package wide tasks:

    1) Usage of units across objects storing data
    2) Basis conversion of all registered objects
    3) Calls to proper optimized implementations of numerically heavy
       sections of the calculations

    Manager is a singleton class, only one instance exists at all times
    and all managing objects have the instance of the Manager.
    version : string
        contains the package version number
    allower_utypes : list
        contains a list of unit types which can be controlled by the Manager
    units : dictionary
        dictionary of available units for each units type
    units_repre : dictionary
        dictionary of abreviations used to represent various units
    units_repre_latex : dictionary
        dictionary of latex prepresentations of available units

    Units Management
    Units management is performed for all classes derived from
    quantarhei.managers.UnitsManaged class.

    Basis Conversion Management
    Units management is performed for all classes derived from
    quantarhei.managers.BasisManaged class.

    Basis management works like this: when an class is defined, and its
    property needs to be basis managed, one should use a predefined type


import os
import warnings

# This stops future warnings, notably those in h5py library
# FIXME: remove this in "future"
warnings.simplefilter(action='ignore', category=FutureWarning)

import json
import pkg_resources

import numpy

from .units import conversion_facs_frequency
from .units import conversion_facs_energy
from .units import conversion_facs_length

from .singleton import Singleton

from .numconf import NumConf
from .logconf import LogConf
from .genconf import GenConf

[docs]class Manager(metaclass=Singleton): """ Main package Manager """ version = "0.0.66" # hard wired unit options allowed_utypes = ["energy", "frequency", "dipolemoment", "temperature", "time", "length"] units = {"energy" : ["1/fs", "int", "1/cm", "eV", "meV", "THz", "J", "SI", "nm", "Ha", "a.u."], "frequency" : ["1/fs", "int", "1/cm", "THz", "Hz", "SI", "nm", "Ha", "a.u."], "dipolemoment" : ["Debye", "a.u"], "temperature" : ["1/fs", "int", "Kelvin", "Celsius", "1/cm", "eV", "meV", "Thz", "SI"], "time" : ["fs", "int", "as", "ps", "ns", "Ms","ms", "s", "SI"], "length" : ["int", "A", "nm", "Bohr", "a.u.", "m", "SI"]} units_repre = {"Kelvin":"K", "Celsius":"C", "Debye":"D", "1/cm":"1/cm", "THz":"THz", "eV":"eV", "1/fs":"1/fs", "int":"1/fs", "meV":"meV", "nm":"nm", "Ha":"Ha", "a.u.":"a.u."} units_repre_latex = {"Kelvin":"K", "Celsius":"C", "Debye":"D", "1/cm":"cm$^-1$", "THz":"THz", "eV":"eV", "1/fs":"fs$^{-1}$", "meV":"meV", "nm":"nm", "Ha":"Ha", "a.u.":"a.u."} def __init__(self): try: # this is numpy 1.14 numpy.set_printoptions(precision=8, sign=' ', legacy='1.13') except: # before there was no `sign` parameters numpy.set_printoptions(precision=8) self.current_units = {} # main configuration file cfile = "~/.quantarhei/quantarhei.json" # test the presence of configuration directory conf_path = os.path.dirname(cfile) self.conf_path = os.path.expanduser(conf_path) self.cfile = os.path.expanduser(cfile) exists = os.path.exists(self.conf_path) isdir = os.path.isdir(self.conf_path) if not exists: # create directory os.mkdir(self.conf_path) # write default configuration self.main_conf = {"units":"units.json", "implementations":"implementations.json" } # save it with open(self.cfile, 'w') as f: json.dump(self.main_conf, f) elif exists and (not isdir): raise Exception("Cannot create configuration directory.") else: # load the main configuration file with open(self.cfile, 'r') as f: self.main_conf = json.load(f) self.current_basis_operator = None # # Flags for all contexts which are enforced or prevented by functions # self._enforce_contexts = True self._in_eigenbasis_of_context = False self._in_eb_count = 0 self._in_energy_units_context = False self._in_eu_count = 0 # # Setting physical units # # internal units are hardwired self.internal_units = {"energy":"1/fs", "frequency":"1/fs", "dipolemoment":"Debye", "temperature":"Kelvin", "length":"A"} # current units are read from conf file if not exists: # set hard wired defaults and save them self.current_units = {"energy":"1/fs", "frequency":"1/fs", "dipolemoment":"Debye", "temperature":"Kelvin", "length":"A"} # save them self.save_units() else: self.load_units() # # Setting implementations # self.implementation_points = { "secular-standard-Redfield-rates":"redfield.ssRedfieldRateMatrix" } # # All available implementations # self.all_implementations = { "redfieldrates.ssRedfieldRateMatrix": { '0':"quantarhei.implementations.python", '1':"quantarhei.implementations.cython" } } self.all_implementations["redfieldtensor.ssRedfieldTensor"] = \ {'0':"quantarhei.implementations.python", '1':"quantarhei.implementations.cython"} self.default_implementations = { "redfieldrates.ssRedfieldRateMatrix":'0', "redfieldtensor.ssRedfieldRateTensor":'0' } self.optimal_implementations = { "redfieldrates.ssRedfieldRateMatrix":'1' } self.current_implementations = { "redfieldrates.ssRedfieldRateMatrix":'0', "redfieldtensor.ssRedfieldRateTensor":'0' } self.parallel_implementations = {} self.parallel_implementations["redfieldrates." +"ssRedfieldRateMatrix"] = \ {'0':"quantarhei.implementations.python.parallel", '1':"quantarhei.implementations.cython.parallel"} if not exists: # and save them self.save_implementations() else: self.load_implementations() self.change_implementation_at_runtime = True self.basis_stack = [] self.basis_stack.append(0) self.basis_transformations = [] self.basis_transformations.append(1) self.basis_registered = {} self.warn_about_basis_change = False self.warn_about_basis_changing_objects = False self.parallel_conf = None self.save_dict = {} # # Configuration controlable from qrhei (conf file and qrhei script) # self.num_conf = NumConf() self.log_conf = LogConf() self.use_pytorch = False self.use_gpu = False self.gen_conf = GenConf() # # Read central configuration from ./quantarhei directory # # # Read local user config file (this will only be done on request) # # self._read_uconf() # # Initialization of parallel environment # try: from .parallel import DistributedConfiguration #from .parallel import start_parallel_region dc = DistributedConfiguration() self.parallel_conf = dc dc.start_parallel_region() # this must be put into qrhei script !!! #if dc.rank != 0: # self.log_conf.verbosity -= 2 # self.log_conf.fverbosity -= 2 #print(dc.rank, self.log_conf.verbosity) except: self.parallel_conf = None def __del__(self): """Closes parallel environment if needed """ if self.parallel_conf is not None: #from .parallel import close_parallel_region self.parallel_conf.finish_parallel_region()
[docs] def load_conf(self): """Loads configuration file This is to be called in scripts and notebooks """ self._read_uconf()
def _read_uconf(self): """Reads user defined local config file From Stackoverflow recipe: """ fname = self.gen_conf.conf_file_name fdir = self.gen_conf.conf_file_path fpath = os.path.join(fdir, fname) from pathlib import Path cfile = Path(fpath) if cfile.exists() & cfile.is_file(): self._load_uconf(fpath) else: if cfile.exists(): raise Exception("Configuration file "+fpath+" seems to exist"+ " but it is not a file") else: print("Warning: Configuration file "+fpath+" does not exit") print("Warning: Placing a default configuration are using it") import pkg_resources resource_package = "quantarhei" # Could be any module/package name resource_path = '/'.join(('core', 'conf', '')) content = pkg_resources.resource_string(resource_package, resource_path) with open(fpath, "w") as f: f.write(content.decode("utf-8")) self._load_uconf(fpath) #printlog("Configuration file: ", fpath, "loaded", loglevel=9) def _load_uconf(self, fpath): """ """ #print("Conf path: ", os.path.abspath(fpath)) try: import importlib.util spec = importlib.util.spec_from_file_location("qrconf", fpath) foo = importlib.util.module_from_spec(spec) spec.loader.exec_module(foo) #print("Configuring Manager:") #print(self) foo.configure(self) #print("..done") except: raise Exception() def save_settings(self): # main configuration file with open(self.cfile,'w') as f: json.dump(self.main_conf,f) # units setting self.save_units() # implementations setting self.save_implementations() def save_implementations(self): # set the implementations to standard implementations = { "imp_points":self.implementation_points, "all_available":self.all_implementations, "default":self.default_implementations, "optimal":self.optimal_implementations, "current":self.current_implementations } imp_file = self.main_conf["implementations"] imp_file = os.path.join(self.conf_path,imp_file) with open(imp_file,'w') as f: json.dump(implementations,f) def load_implementations(self): imp_file = self.main_conf["implementations"] imp_file = os.path.join(self.conf_path,imp_file) with open(imp_file,'r') as f: implementations = json.load(f) self.implementation_points = implementations["imp_points"] self.all_implementations = implementations["all_available"] self.default_implementations = implementations["default"] self.optimal_implementations = implementations["optimal"] self.current_implementations = implementations["current"] def save_units(self): units_file = self.main_conf["units"] units_file = os.path.join(self.conf_path,units_file) with open(units_file,'w') as f: json.dump(self.current_units,f) def load_units(self): units_file = self.main_conf["units"] units_file = os.path.join(self.conf_path,units_file) with open(units_file,'r') as f: self.current_units = json.load(f)
[docs] def get_real_type(self): """Returns default numpy float type """ import numpy return numpy.float64
[docs] def get_complex_type(self): """Returns default numpy complex type """ import numpy return numpy.complex128
def store_current_basis_operator(self, op): self.current_basis_operator = op def remove_current_basis_operator(self): self.current_basis_operator = None
[docs] def unit_repr(self,utype="energy",mode="current"): """Returns a string representing the currently used units """ if utype in self.allowed_utypes: if mode == "current": return self.units_repre[self.current_units[utype]] elif mode == "internal": return self.units_repre[self.internal_units[utype]] else: raise Exception("Unknown representation mode") else: raise Exception("Unknown unit type")
[docs] def unit_repr_latex(self,utype="energy",mode="current"): """Returns a string representing the currently used units """ if utype in self.allowed_utypes: if mode == "current": return self.units_repre_latex[self.current_units[utype]] elif mode == "internal": return self.units_repre_latex[self.internal_units[utype]] else: raise Exception("Unknown representation mode") else: raise Exception("Unknown unit type")
[docs] def set_current_units(self, utype, units): """Sets current units """ self._saved_units = {} self._saved_units[utype] = self.get_current_units(utype) if utype in self.allowed_utypes: if units in self.units[utype]: self.current_units[utype] = units else: raise Exception("Unknown units of %s" % utype) else: raise Exception("Unknown type of units")
[docs] def unset_current_units(self, utype): """Restores previously saved units of a given type """ try: cunits = self._saved_units[utype] except KeyError: raise Exception("Units to restore not found") if utype in self.allowed_utypes: if cunits in self.units[utype]: self.current_units[utype] = cunits else: raise Exception("Unknown units of %s" % utype) else: raise Exception("Unknown type of units")
[docs] def get_current_units(self, utype): """ """ if utype in self.allowed_utypes: return self.current_units[utype] else: raise Exception("Unknown type of units")
# @deprecated
[docs] def cu_energy(self,val,units="1/cm"): """Converst to current energy units """ if units in self.units["energy"]: x = conversion_facs_energy[units] i_val = x*val cu = self.current_units["energy"] if cu != "1/fs": y = conversion_facs_energy[units] return i_val/y return i_val
# @deprecated
[docs] def iu_energy(self,val,units="1/cm"): """Converst to internal energy units """ if units in self.units["energy"]: x = conversion_facs_energy[units] i_val = x*val return i_val
[docs] def convert_energy_2_internal_u(self,val): """Convert energy from currently used units to internal units Parameters ---------- val : number, array, list, tuple of numbers values to convert """ units = self.current_units["energy"] cfact = conversion_facs_energy[self.current_units["energy"]] # special handling for nano meters if units == "nm": # zero is interpretted as zero energy try: ret = numpy.zeros(val.shape, dtype=val.dtype) ret[val!=0.0] = 1.0/val[val!=0] return ret/cfact except: return (1.0/val)/cfact #if val == 0.0: # return 0.0 #return (1.0/val)/cfact else: return val*cfact
[docs] def convert_energy_2_current_u(self,val): """Converts energy from internal units to currently used units Parameters ---------- val : number, array, list, tuple of numbers values to convert """ units = self.current_units["energy"] cfact = conversion_facs_energy[units] # special handling for nanometers if units == "nm": # zero is interpretted as zero energy try: ret = numpy.zeros(val.shape, dtype=val.dtype) ret[val!=0.0] = 1.0/val[val!=0] return ret/cfact except: return (1.0/val)/cfact else: return val/cfact
[docs] def convert_frequency_2_internal_u(self,val): """Converts frequency from currently used units to internal units Parameters ---------- val : number, array, list, tuple of numbers values to convert """ return val*conversion_facs_frequency[self.current_units["frequency"]]
[docs] def convert_frequency_2_current_u(self,val): """Converts frequency from internal units to currently used units Parameters ---------- val : number, array, list, tuple of numbers values to convert """ return val/conversion_facs_frequency[self.current_units["frequency"]]
[docs] def convert_length_2_internal_u(self,val): """Converts length from currently used units to internal units Parameters ---------- val : number, array, list, tuple of numbers values to convert """ return val*conversion_facs_length[self.current_units["length"]]
[docs] def convert_length_2_current_u(self,val): """Converts frequency from internal units to currently used units Parameters ---------- val : number, array, list, tuple of numbers values to convert """ return val/conversion_facs_length[self.current_units["length"]]
def get_implementation_prefix(self,package="",taskname=""): #default_imp_prefix = "quantarhei.implementations.python" pname = package+"."+taskname whichone = self.current_implementations[pname] imp_prefix = self.all_implementations[pname][str(whichone)] return imp_prefix def get_implementation_points(self): return self.implementation_points def get_all_implementations(self): return self.all_implementations def get_all_implementations_of(self,imp): imp_id = self.implementation_points[imp] return self.all_implementations[imp_id] def get_current_implementation(self,imp): imp_id = self.implementation_points[imp] whichone = self.current_implementations[imp_id] return self.all_implementations[imp_id][str(whichone)] def set_current_implementation(self, imp, choice): imp_id = self.implementation_points[imp] self.current_implementations[imp_id] = choice def register_implementation(self,imp_point,prefix,asint=None): pass def commit_implementation(self,imp_point,prefix,asint=None): pass
[docs] def get_current_basis(self): """Returns the current basis id """ l = len(self.basis_stack) return self.basis_stack[l-1]
def set_new_basis(self,SS): nb = self.get_current_basis() + 1 self.basis_stack.append(nb) self.basis_transformations.append(SS) self.basis_registered[nb] = [] return nb
[docs] def transform_to_current_basis(self, operator): """Transforms an operator to the currently used basis Parameters ---------- operator : operator Any basis managed operator """ if operator.is_basis_protected: return ob = operator.get_current_basis() cb = self.get_current_basis() if self.warn_about_basis_changing_objects: print("Object ", operator.__class__, id(operator), " is changing basis from ", ob, " to: ", cb) if ob != cb: SS = numpy.diag(numpy.ones(operator.dim)) # find out if current basis of the object is in the stack (i.e. it # was used sometime in the past) if ob in self.basis_stack: sl = len(self.basis_stack) # scroll back over the bases for k in range(1,sl): # take the basis transformation to the earlier used basis ZZ = self.basis_transformations[sl-k] # included it into the transformation matrix SS =,SS) # if the basis is found, break away from the loop if self.basis_stack[sl-k-1] == ob: break else: raise Exception("Basis of the object is not on stack.") operator.transform(SS) operator.set_current_basis(cb) self.register_with_basis(cb,operator)
def register_with_basis(self,nb,operator): self.basis_registered[nb].append(operator)
[docs] def get_DistributedConfiguration(self): """ """ from .parallel import DistributedConfiguration if self.parallel_conf is None: self.parallel_conf = DistributedConfiguration() return self.parallel_conf
[docs]class Managed: """Base class for managed objects """ manager = Manager()
[docs]class UnitsManaged(Managed): """Base class for objects with management of units """ def convert_energy_2_internal_u(self, val): return self.manager.convert_energy_2_internal_u(val) def convert_energy_2_current_u(self, val): return self.manager.convert_energy_2_current_u(val) def convert_length_2_internal_u(self, val): return self.manager.convert_length_2_internal_u(val) def convert_length_2_current_u(self, val): return self.manager.convert_length_2_current_u(val) def unit_repr(self,utype="energy"): return self.manager.unit_repr(utype) def unit_repr_latex(self,utype="energy"): return self.manager.unit_repr_latex(utype)
[docs]class EnergyUnitsManaged(Managed): utype = "energy" units = "1/fs" def convert_2_internal_u(self,val): return self.manager.convert_energy_2_internal_u(val) def convert_2_current_u(self,val): return self.manager.convert_energy_2_current_u(val) def unit_repr(self): return self.manager.unit_repr("energy") def unit_repr_latex(self, utype="energy"): return self.manager.unit_repr_latex(utype)
[docs]class LengthUnitsManaged(Managed): """Class providing functions for length units conversion """ utype = "length" units = "A" def convert_2_internal_u(self, val): return self.manager.convert_length_2_internal_u(val) def convert_2_current_u(self,val): return self.manager.convert_length_2_current_u(val) def unit_repr(self): return self.manager.unit_repr(self.utype) def unit_repr_latex(self): return self.manager.unit_repr_latex(self.utype)
[docs]class BasisManaged(Managed): """Base class for objects with managed basis """ _current_basis = Manager().get_current_basis() is_basis_protected = False def get_current_basis(self): return self._current_basis def set_current_basis(self,bb): self._current_basis = bb def protect_basis(self): self.is_basis_protected = True def unprotect_basis(self): self.is_basis_protected = False
[docs]class units_context_manager: """General context manager to manage physical units of values """ def __init__(self,utype="energy"): self.manager = Manager() if utype in self.manager.allowed_utypes: self.utype = utype else: raise Exception("Unknown units type") def __enter__(self): pass def __exit__(self): pass
[docs]class energy_units(units_context_manager): """Context manager for units of energy """ def __init__(self,units): super().__init__(utype="energy") if units in self.manager.units["energy"]: self.units = units else: raise Exception("Unknown energy units") def __enter__(self): # save current energy units self.units_backup = self.manager.get_current_units("energy") self.manager.set_current_units(self.utype,self.units) self.manager._in_energy_units_context = True self.manager._in_eu_count += 1 def __exit__(self,ext_ty,exc_val,tb): self.manager.set_current_units("energy",self.units_backup) self.manager._in_eu_count -= 1 if self.manager._in_eu_count == 0: self.manager._in_energy_units_context = False
[docs]class frequency_units(energy_units): """Context manager for units of frequency It behaves exactly the same as ``energy_units`` context manager. """ pass
[docs]class length_units(units_context_manager): """Context manager for length units """ def __init__(self, units): super().__init__(utype="length") if units in self.manager.units["length"]: self.units = units else: raise Exception("Unknown length units") def __enter__(self): # save current energy units self.units_backup = self.manager.get_current_units("length") self.manager.set_current_units(self.utype,self.units) def __exit__(self,ext_ty,exc_val,tb): self.manager.set_current_units("length",self.units_backup)
[docs]class basis_context_manager: """General context manager to manage basis """ def __init__(self): self.manager = Manager() def __enter__(self): pass def __exit__(self,ext_ty,exc_val,tb): pass
[docs]class eigenbasis_of(basis_context_manager): """Context manager for basis change """ def __init__(self, operator): super().__init__() self.op = operator self.manager.store_current_basis_operator(self.op) def __enter__(self): self.manager._in_eigenbasis_of_context = True if self.manager.warn_about_basis_change: print("\nQr >>> Entering basis context manager ...") cb = self.manager.get_current_basis() ob = self.op.get_current_basis() if cb != ob: self.manager.transform_to_current_basis(self.op) #SS = self.op.diagonalize() SS = self.op.get_diagonalization_matrix() self.manager.set_new_basis(SS) #self.manager.register_with_basis(nb,self.op) #self.op.set_current_basis(nb) if self.manager.warn_about_basis_change: print("\nQr >>> ... setting context done") def __exit__(self,ext_ty,exc_val,tb): if self.manager.warn_about_basis_change: print("\nQr >>> Returning from basis context manager. Cleaning ...") # This is the basis we are leaving bb = self.manager.basis_stack.pop() # this is the transformation we got here with SS = self.manager.basis_transformations.pop() # This is the new basis bss = len(self.manager.basis_stack) nb = self.manager.basis_stack[bss-1] # inverse of the transformation matrix S1 = numpy.linalg.inv(SS) # transform all registered objects operators = self.manager.basis_registered[bb] if nb != 0: # operators registered with the context above this one ops_above = self.manager.basis_registered[nb] for op in operators: # the operator might have been set to protected mode # inside the context if not op.is_basis_protected: op.transform(S1,inv=SS) op.set_current_basis(nb) # operators which appeared in this context and where not # register in the one above are now registerd if nb != 0: if op not in ops_above: self.manager.register_with_basis(nb,op) self.manager.remove_current_basis_operator() del self.manager.basis_registered[bb] if len(self.manager.basis_stack) == 1: self.manager._in_eigenbasis_of_context = False if self.manager.warn_about_basis_change: print("\nQr >>> ... cleaning done")
[docs]def set_current_units(units=None): """Sets units globaly without the need for a context manager """ manager = Manager() if units is not None: # set units using a supplied dictionary for utype in units: if utype in manager.allowed_utypes: un = units[utype] # handle the identity of "frequency" and "energy" if utype=="frequency": utype="energy" un = units["frequency"] manager.set_current_units(utype,un) else: raise Exception("Unknown units type %s" % utype) else: # reset units to the default for utype in manager.internal_units: if utype in manager.allowed_utypes: manager.set_current_units(utype,manager.internal_units[utype]) else: raise Exception("Unknown units type %s" % utype)