"""Liouville pathway analysis module
This module defines classes and function to help in
Liouvile pathway analysis
Classes
-------
LiouvillePathwayAnalyzer
Functions
---------
max_amplitude(pathways)
"""
from __future__ import annotations
import os
from typing import Any
import numpy
from .. import COMPLEX, REAL
from ..core.dfunction import DFunction
from ..core.managers import Manager, UnitsManaged
from ..core.parcel import load_parcel
from ..core.time import TimeAxis
from ..core.units import cm2int, convert
from ..core.wrappers import deprecated
from ..exceptions import QuantarheiError
from .twodcontainer import TwoDSpectrumContainer
[docs]
class LiouvillePathwayAnalyzer(UnitsManaged):
"""Class providing methods for Liouville pathway analysis
Parameters
----------
pathways : list
List of pathways to analyze
Methods
-------
set_pathways(pathways=None)
Set the pathways for the analysis
get_pathways()
Returns current list of pathways
max_amplitude()
Returns a tuple containing the value of the amplitude of the pathway
from current list of pathways with the maximum amplitude and its index
in the list
select_amplitide_GT(val, replace=True, verbose=False)
In the current list of pathways this method selects those that have
absolute value of amplitude larger that a given value
"""
def __init__(self, pathways: Any = None) -> None:
self.pathways = pathways
[docs]
def set_pathways(self, pathways: Any) -> None:
"""Sets pathways to be analyzed"""
self.pathways = pathways
[docs]
def get_pathways(self) -> Any:
"""Returns Liouville pathways"""
return self.pathways
[docs]
def get_number_of_pathways(self) -> int:
"""Returns the number of available pathways"""
return len(self.pathways)
[docs]
@deprecated
def max_pref(self, pathways: Any) -> tuple[float, int]:
"""Return the maximum of pathway prefactors
Parameters
----------
pathways : list
List of Liouville pathways
Returns
-------
pmax : float
Maximum prefactor of the pathways
rec : int
position of the pathway with the maximum prefactor
"""
if pathways is None:
pthways = self.pathways
else:
pthways = pathways
pmax = 0.0
k = 0
rec = -1
for pway in pthways:
if pway.pref > pmax:
rec = k
pmax = pway.pref
k += 1
return (pmax, rec)
[docs]
def max_amplitude(self) -> tuple[float, int]:
"""Return the maximum of pathway prefactors
Parameters
----------
pathways : list
List of Liouville pathways
Returns
-------
pmax : float
Maximum prefactor of the pathways
rec : int
position of the pathway with the maximum prefactor
"""
return max_amplitude(self.pathways)
[docs]
@deprecated
def select_pref_GT(
self,
val: float,
pathways: Any = None,
replace: bool = True,
verbose: bool = False,
) -> Any:
"""Select all pathways with prefactors greater than a value"""
if pathways is None:
pthways = self.pathways
else:
pthways = pathways
selected = []
for pway in pthways:
if numpy.abs(pway.pref) > val:
selected.append(pway)
if verbose:
print("Selected", len(selected), "pathways")
# if the pathways were not specified from argument then return selected
# to self
if (pathways is None) and replace:
self.pathways = selected
return selected
[docs]
def select_amplitude_GT(
self, val: float, replace: bool = True, verbose: bool = False
) -> Any:
"""Select all pathways with abs value of prefactors greater than a value"""
selected = select_amplitude_GT(val, self.pathways, verbose=verbose)
# if the pathways were not specified from argument then return selected
# to self
if replace:
self.pathways = selected
else:
return selected
[docs]
def select_frequency_window(
self, window: Any, replace: bool = True, verbose: bool = False
) -> Any:
"""Selects pathways with omega_1 and omega_3 in a certain range"""
selected = select_frequency_window(window, self.pathways, verbose)
if replace:
self.pathways = selected
else:
return selected
[docs]
def select_omega2(
self, interval: Any, replace: bool = True, verbose: bool = False
) -> Any:
"""Selects pathways with omega_2 in a certain interval"""
selected = select_omega2(interval, self.pathways, verbose)
if replace:
self.pathways = selected
else:
return selected
[docs]
@deprecated
def order_by_pref(self, pthways: Any) -> Any:
"""Orders the list of pathways by pathway prefactors"""
lst = sorted(pthways, key=lambda pway: abs(pway.pref), reverse=True)
return lst
[docs]
def order_by_amplitude(self, replace: bool = True) -> Any:
orderred = order_by_amplitude(self.pathways)
if replace:
self.pathways = orderred
else:
return orderred
[docs]
def select_sign(self, sign: float, replace: bool = True) -> Any:
selected = select_sign(self.pathways, sign)
if replace:
self.pathways = selected
else:
return selected
[docs]
def select_type(self, ptype: str = "REPH", replace: bool = True) -> Any:
selected = select_type(self.pathways, ptype)
if replace:
self.pathways = selected
else:
return selected
return selected
[docs]
def max_amplitude(pathways: Any) -> tuple[float, int]:
"""Return the maximum of pathway prefactors
Parameters
----------
pathways : list
List of Liouville pathways
Returns
-------
pmax : float
Maximum prefactor of the pathways
rec : int
position of the pathway with the maximum prefactor
"""
pmax = 0.0
k = 0
rec = -1
for pway in pathways:
if pway.pref > pmax:
rec = k
pmax = pway.pref
k += 1
return (pmax, rec)
[docs]
def select_amplitude_GT(val: float, pathways: Any, verbose: bool = False) -> list[Any]:
"""Select all pathways with abs value of prefactors greater than a value
Parameters
----------
val : float
Value to compare with the pathway prefactor
pathways : list of pathways
List of the Liouville pathways to analyze
verbose: bool
If set True, number of selected pathways is reported
Returns
-------
selected : list
List of selected pathways
"""
pthways = pathways
selected = []
for pway in pthways:
if numpy.abs(pway.pref) > val:
selected.append(pway)
if verbose:
print("Selected", len(selected), "pathways")
return selected
[docs]
def select_frequency_window(
window: Any, pathways: Any, verbose: bool = False
) -> list[Any]:
"""Selects pathways with omega_1 and omega_3 in a certain range"""
pthways = pathways
m = Manager()
om1_low = m.convert_energy_2_internal_u(window[0])
om1_upp = m.convert_energy_2_internal_u(window[1])
om3_low = m.convert_energy_2_internal_u(window[2])
om3_upp = m.convert_energy_2_internal_u(window[3])
selected = []
for pway in pthways:
ne = len(pway.frequency)
# om1 = numpy.abs(pway.frequency[0])
om1 = numpy.abs(pway.get_interval_frequency(0))
# om3 = pway.frequency[ne-2]
om3 = numpy.abs(pway.get_interval_frequency(ne - 2))
if ((om1 >= om1_low) and (om1 <= om1_upp)) and (
(om3 >= om3_low) and (om3 <= om3_upp)
):
selected.append(pway)
if verbose:
print("Selected", len(selected), "pathways")
return selected
[docs]
def select_omega2(
interval: Any,
pathways: Any,
secular: bool = True,
tolerance: float = 10.0 * cm2int,
verbose: bool = False,
) -> list[Any]:
"""Selects pathways with omega_2 in a certain interval"""
pthways = pathways
m = Manager()
om2_low = m.convert_energy_2_internal_u(interval[0])
om2_upp = m.convert_energy_2_internal_u(interval[1])
selected = []
for pway in pthways:
ne = len(pway.frequency)
# print("Number of frequencies: ", ne)
om2 = pway.get_interval_frequency(ne - 3)
# pathways with transfer
if ne > 4:
# check previous frequency (feeding)
om2_2 = pway.get_interval_frequency(ne - 4)
if secular:
# case with feeding frequency small
if numpy.abs(om2_2) <= tolerance:
if (om2 >= om2_low) and (om2 <= om2_upp):
selected.append(pway)
# print("selected")
# case of fast feeding frequency
elif numpy.abs(om2 - om2_2) <= tolerance:
if (om2 >= om2_low) and (om2 <= om2_upp):
selected.append(pway)
# print("selected")
else:
if (om2 >= om2_low) and (om2 <= om2_upp):
selected.append(pway)
# print("selected")
else:
if (om2 >= om2_low) and (om2 <= om2_upp):
selected.append(pway)
# print("selected")
if verbose:
print("Selected", len(selected), "pathways")
return selected
# def select_incoherent(pathways):
#
# for pway in pathways:
[docs]
def order_by_amplitude(pthways: Any) -> list[Any]:
"""Orders the list of pathways by pathway prefactors"""
lst = sorted(pthways, key=lambda pway: abs(pway.pref), reverse=True)
return lst
[docs]
def select_sign(pathways: Any, sign: float) -> list[Any]:
"""Selects all pathways depending on the overall sign"""
selected = []
pos = False
if sign > 0.0:
pos = True
for pway in pathways:
if pos:
if pway.sign > 0.0:
selected.append(pway)
else:
if pway.sign < 0.0:
selected.append(pway)
return selected
[docs]
def select_type(pathways: Any, stype: str) -> list[Any]:
"""Selects all pathways of a given type
Parameters
----------
pathways : list
List of pathways to be analyzed
stype : str
Type of the pathways to be selected.
Possible values are "REPH", "NONR".
"""
di = dict(REPH="R", NONR="NR")
selected = []
for pw in pathways:
if pw.pathway_type == di[stype]:
selected.append(pw)
return selected
[docs]
def select_by_states(pathways: Any, states: Any) -> Any:
"""Returns one pathway which goes through a given pattern of states
Returns unique pathway which goes through a given pattern of states
or does not return anything
The following diagram
|12 12|
--->|-----------|
| |
|48 12| 2.0
--->|-----------|
| |
|21 12| 0.0
>>|***********|<<
| |
|6 6| 0.0
--->|-----------|
| |
|0 6| -2.0
|-----------|<---
| |
|0 0|
turns into the following tuple
((0,0), (0,6), (6,6), (21, 12), (48, 12), (12,12))
Parameters
----------
pathways : list, tuple
List of tuple of states to analyze
states : list, tuple
List of dyads which describe the states involved in a given Liouville
pathway. States are listed from left to right, from bottom to the top
"""
# loop over all pathways
for pw in pathways:
ch = -1
# loop over the Feynman diagram
for k in range(pw.states.shape[0]):
# check if prescribed states match the pathway states
if not (
(pw.states[k, 0] == states[k + 1][0])
& (pw.states[k, 1] == states[k + 1][1])
):
# leave if there is a mismatch
break
ch += 1
# if all lines match, return the pathway
if ch == k:
return pw
[docs]
def look_for_pathways(
name: str = "pathways", ext: str = "qrp", check: bool = False, directory: str = "."
) -> numpy.ndarray:
"""Load pathways by t2"""
# get all files with the name_*.ext pattern
import glob
import os.path
path = os.path.join(directory, name + "_*." + ext)
files = glob.glob(path)
# get a list of t2s
t2s_list: list[float] = []
for fl in files:
t2 = float(fl.split("_")[1].split("." + ext)[0])
t2s_list.append(t2)
t2s = numpy.array(t2s_list)
t2s = numpy.sort(t2s)
# if check == True load them all and check they are list of pathways
if check:
pass
return t2s
[docs]
def load_pathways_by_t2(
t2: float,
name: str = "pathways",
ext: str = "qrp",
directory: str = ".",
tag_type: Any = REAL,
) -> list[Any]:
"""Load pathways by t2 time"""
t2_str = str(t2)
fname = name + "_" + t2_str + "." + ext
path = os.path.join(directory, fname)
try:
pw = load_parcel(path)
except Exception:
print("Error while loading")
return []
return pw
[docs]
def save_pathways_by_t2(
t2: float,
name: str = "pathways",
ext: str = "qrp",
directory: str = ".",
tag_type: Any = REAL,
) -> None:
pass
# FIXME: This is done in an inefficient way, loading the files multiply
[docs]
def get_evolution_from_saved_pathways(
states: Any,
name: str = "pathways",
ext: str = "qrp",
directory: str = ".",
tag_type: Any = REAL,
repl: float = 0.0,
) -> Any:
"""Reconstructs the evolution of the pathway contribution in t2 time"""
t2s = look_for_pathways(name=name, ext=ext, directory=directory)
# order t2s
t2s = numpy.sort(t2s)
evol = _get_evol(t2s, states, name, ext, directory, repl=repl)
dt = t2s[1] - t2s[0]
length = len(t2s)
taxis = TimeAxis(t2s[0], length, dt)
ii = 0
for tt in t2s:
if tt != taxis.data[ii]:
raise QuantarheiError(
"The set of available times"
" does not correspond to a continuous time axis"
)
ii += 1
return DFunction(x=taxis, y=evol)
# return t2s, evol
[docs]
def get_prefactors_from_saved_pathways(
states: Any,
name: str = "pathways",
ext: str = "qrp",
directory: str = ".",
tag_type: Any = REAL,
repl: float = 0.0,
) -> Any:
"""Reconstructs the evolution of the pathway contribution in t2 time"""
t2s = look_for_pathways(name=name, ext=ext, directory=directory)
# order t2s
t2s = numpy.sort(t2s)
evol = _get_pref(t2s, states, name, ext, directory, repl=repl)
dt = t2s[1] - t2s[0]
length = len(t2s)
taxis = TimeAxis(t2s[0], length, dt)
ii = 0
for tt in t2s:
if tt != taxis.data[ii]:
raise QuantarheiError(
"The set of available times"
" does not correspond to a continuous time axis"
)
ii += 1
return DFunction(x=taxis, y=evol)
[docs]
def get_TwoDSpectrum_from_saved_pathways(
t2: float,
t1axis: Any,
t3axis: Any,
name: str = "pathways",
ext: str = "qrp",
directory: str = ".",
tag_type: Any = REAL,
) -> Any:
"""Returns a 2D spectrum calculated based on the saved Liouville pathways"""
pwt2 = load_pathways_by_t2(
t2, name=name, ext=ext, directory=directory, tag_type=tag_type
)
# calculate 2D spectrum from the loaded pathways
twod = get_TwoDSpectrum_from_pathways(pwt2, t1axis, t3axis)
# return it
return twod
[docs]
def get_TwoDSpectrum_from_pathways(pathways: Any, t1axis: Any, t3axis: Any) -> Any:
"""Returns a 2D spectrum calculated based on submitted Liouville pathways"""
from .mocktwodcalculator import (
MockTwoDResponseCalculator as MockTwoDSpectrumCalculator,
)
t2axis = TimeAxis(0.0, 1, 1.0)
mcalc = MockTwoDSpectrumCalculator(t1axis, t2axis, t3axis)
mcalc.bootstrap(rwa=float(convert(12000.0, "1/cm", "int")), pathways=pathways)
twod = mcalc.calculate()
return twod
[docs]
def get_TwoDSpectrumContainer_from_saved_pathways(
t1axis: Any,
t3axis: Any,
name: str = "pathways",
ext: str = "qrp",
directory: str = ".",
tag_type: Any = REAL,
) -> Any:
"""Returns a container with 2D spectra calculated from saved pathways"""
t2s = look_for_pathways(name=name, ext=ext, directory=directory)
# order t2s
t2s = numpy.sort(t2s)
time2 = TimeAxis(t2s[0], len(t2s), t2s[1] - t2s[0])
tcont = TwoDSpectrumContainer(t2axis=time2)
tcont.use_indexing_type(itype=time2)
for t2 in t2s:
pwt2 = load_pathways_by_t2(
t2, name=name, ext=ext, directory=directory, tag_type=tag_type
)
# calculate 2D spectrum from saved pathways
twod = get_TwoDSpectrum_from_pathways(pwt2, t1axis, t3axis)
# put it into container
tcont.set_spectrum(twod, tag=t2)
# return the container
return tcont
def _is_tuple_of_dyads(states: Any) -> bool:
"""Check if the object is a tuple or list of dyads"""
def _is_a_dyad(dd: Any) -> bool:
return len(dd) == 2
ret = False
for st in states:
if _is_a_dyad(st):
ret = True
else:
ret = False
break
return ret
def _get_evol(
t2s: numpy.ndarray,
states: Any,
name: str,
ext: str,
directory: str,
repl: float = 0.0,
) -> numpy.ndarray:
"""Return evolution of a single pathway"""
N = 1
evol: numpy.ndarray
if _is_tuple_of_dyads(states):
evol = numpy.zeros(len(t2s), dtype=COMPLEX)
else:
N = len(states)
evol = numpy.zeros((len(t2s), N), dtype=COMPLEX)
k = 0
for t2 in t2s:
pws = load_pathways_by_t2(t2, name=name, ext=ext, directory=directory)
if N == 1:
pw = select_by_states(pws, states)
if pw is None:
evol[k] = repl
else:
evol[k] = pw.evolfac
else:
l = 0
for st in states:
pw = select_by_states(pws, st)
if pw is None:
evol[k, l] = repl
else:
evol[k, l] = pw.evolfac
l += 1
k += 1
return evol
def _get_pref(
t2s: numpy.ndarray,
states: Any,
name: str,
ext: str,
directory: str,
repl: float = 0.0,
) -> numpy.ndarray:
"""Return evolution of a single pathway"""
N = 1
evol: numpy.ndarray
if _is_tuple_of_dyads(states):
evol = numpy.zeros(len(t2s), dtype=COMPLEX)
else:
N = len(states)
evol = numpy.zeros((len(t2s), N), dtype=COMPLEX)
k = 0
for t2 in t2s:
pws = load_pathways_by_t2(t2, name=name, ext=ext, directory=directory)
if N == 1:
pw = select_by_states(pws, states)
if pw is None:
evol[k] = repl
else:
evol[k] = pw.pref
else:
l = 0
for st in states:
pw = select_by_states(pws, st)
if pw is None:
evol[k, l] = repl
else:
evol[k, l] = pw.pref
l += 1
k += 1
return evol