Source code for quantarhei.spectroscopy.twodcontainer

"""Class holding a set of TwoDSpectra



Class Details
-------------

"""

from __future__ import annotations

import numbers
from typing import Any

# import matplotlib.pyplot as plt
import numpy

# from ..core.managers import energy_units
from .. import (
    COMPLEX,
    REAL,
    part_ABS,
    part_COMPLEX,
    part_IMAGINARY,
    part_REAL,
    signal_TOTL,  # , signal_REPH, signal_NONR
)
from ..core.dfunction import DFunction
from ..core.frequency import FrequencyAxis
from ..core.managers import Manager, energy_units
from ..core.saveable import Saveable
from ..core.time import TimeAxis
from ..core.valueaxis import ValueAxis
from ..exceptions import QuantarheiError

# from .twod2 import TwoDResponse
from .twodspect import TwoDSpectrum


[docs] class TwoDResponseContainer(Saveable): """Class holding a set of TwoDSpectra Parameters ---------- t2axis: TimeAxis object holding waiting times at which spectra are calculated keep_pathways: bool if set True, the container will keep all types of Liouville pathways stored separately keep_stypes: bool if se t True, the container will keep rephasing and non-rephasing spectra stored separately """ def __init__( self, t2axis: Any = None, keep_pathways: bool = False, keep_stypes: bool = True ) -> None: self.t2axis = t2axis self.keep_pathways = keep_pathways self.keep_stypes = keep_stypes self.axis: ValueAxis | None = None self.itype: str | None = None self.index = 0 self.tags: list[Any] = [] if self.keep_pathways: raise QuantarheiError("Container keeping pathways not available yet") self.spectra: dict[Any, Any] = {} self._which: float | None = None if t2axis is not None: self.use_indexing_type(itype=t2axis)
[docs] def use_indexing_type(self, itype: Any) -> None: """Sets the type of indices used to identify spectra Parameters ---------- itype : string, ValueAxis, TimeAxis, FrequencyAxis Type of indexig. If itype is a string, it should have values either 'integer' or 'string' in which case the specra will be stored by integer index or by string (as in dictionary). If itype is a ValueAxis (TimeAxis, FrequencyAxis), spectra will be indexed by the values in the axis object. """ if isinstance(itype, str): if itype == "integer": self.itype = "integer" elif itype == "string": self.itype = "string" else: raise QuantarheiError("Unknown indexing type") elif isinstance(itype, ValueAxis): if isinstance(itype, TimeAxis): self.itype = "TimeAxis" self.axis = itype # This axis must FFT in the "standard" way # -- it must of the "complete" type itype.atype = "complete" elif isinstance(itype, FrequencyAxis): self.itype = "FrequencyAxis" self.axis = itype else: self.itype = "ValueAxis" self.axis = itype else: raise QuantarheiError("Unknown indexing type")
[docs] def set_spectrum(self, spect: Any, tag: Any = None) -> Any: """Stores spectrum with a tag (time, index, etc.) Stores the spectrum according to present indexing scheme Parameters ---------- spect : TwoDSpectrum Object holding the spectrum; when not tag is specified for a spectrum which as its t2 time set, the tag is set to t2 time. tag : {int, string, ValuesAxis, TimeAxis, FrequencyAxis} Tag which will be used for retrieval of the spectrum from the container. """ if self.itype == "integer": if tag is None: self.spectra[self.index] = spect self.index += 1 return self.index if isinstance(tag, numbers.Integral): self.spectra[tag] = spect else: raise QuantarheiError("The spectrum has to be tagged by an integer") return tag if self.itype in ["ValueAxis", "TimeAxis", "FrequencyAxis"]: if tag is None: # we will read the spectrum intrinsic t2 time and set it as tag if spect.t2 >= 0.0: tag = spect.t2 if tag is not None: if tag in self.axis.data: self.spectra[tag] = spect self.tags.append(tag) self.index += 1 else: raise QuantarheiError("Tag not compatible with the ValueAxis") else: raise QuantarheiError( "No tag specified, and spectrum" " does not have t2 time set" " - cannot store spectrum" ) return self.index if self.itype == "string": if tag is not None: stag = str(tag) self.spectra[stag] = spect self.tags.append(stag) self.index += 1 else: raise QuantarheiError("No tag specified - cannot store spectrum") return self.index raise QuantarheiError("Unknown type of indexing")
def _lousy_equal(self, x1: float, x2: float, dx: float, frac: float = 0.25) -> bool: """Equals up to fraction of dx This function returns True if x1 is closer to x2 than `frac` of a specified interval. In addition it saves the value of x2 to which x1 is equal in the attribute _which of the present class. """ if abs(x1 - x2) < dx * frac: self._which = x2 return True self._which = None return False
[docs] def get_spectrum_by_index(self, indx: int) -> Any: """Returns spectrum by integet index The integer index is assigned to all spectra in the order they were saved to the container. They can be retrieved in this order Parameters ---------- indx : int Index of the spectrum to be retrieved """ if self.itype == "integer": return self.get_spectrum(indx) return self.spectra[self.tags[indx]]
[docs] def get_response(self, tag: Any) -> Any: """Same as get_spectrum, but the name more sense for a response container""" return self.get_spectrum(tag)
[docs] def get_spectrum(self, tag: Any) -> Any: """Returns spectrum corresponing to time t2 Checks if the time t2 is present in the t2axis Parameters ---------- t2 : float Waiting time for which spectrum should be returned """ if self.itype in ["integer"]: return self.spectra[tag] if self.itype in ["string"]: return self.spectra[str(tag)] if self.itype in ["ValueAxis", "TimeAxis", "FrequencyAxis"]: assert self.axis is not None with energy_units("int"): if any( self._lousy_equal(tag, li, self.axis.step) for li in self.axis.data ): try: return self.spectra[self._which] except KeyError: print(self.spectra) raise QuantarheiError() else: raise QuantarheiError("Tag not compatible with the ValueAxis") else: raise QuantarheiError("Unknown type of indexing")
[docs] def set_data_flag(self, flag: Any) -> None: """Sets data flag for all spectra in the container""" for tag in self.spectra: sp = self.spectra[tag] sp.set_data_flag(flag)
[docs] def get_TwoDSpectrumContainer(self, stype: Any = signal_TOTL) -> Any: """Returns a container with specific spectra""" if self.itype in ["ValueAxis", "TimeAxis", "FrequencyAxis"]: axis = self.axis.deepcopy() cont = TwoDSpectrumContainer(axis) for val in self.axis.data: sp = self.get_spectrum(val) nsp = sp.get_TwoDSpectrum(dtype=stype) cont.set_spectrum(nsp, tag=val) return cont raise QuantarheiError("")
[docs] def get_nearest(self, val: float) -> Any: if self.itype == "FrequencyAxis": # print(Manager().current_units["frequency"]) # print(Manager().current_units["energy"]) nval = Manager().convert_energy_2_internal_u(val) # get tags and convert them to numbers ntags: numpy.ndarray = numpy.zeros(len(self.tags), dtype=REAL) k = 0 for stag in self.tags: # print(stag) ntag = float(stag) ntags[k] = ntag k += 1 dtags = numpy.abs(ntags - nval) imin = numpy.argmin(dtags) # print("Returning spectrum at: ", # Manager().convert_energy_2_current_u(self.tags[imin])) return self.spectra[self.tags[imin]], imin
[docs] def length(self) -> int: """Returns the length of the container""" return len(self.spectra.keys())
[docs] def get_spectra(self, start: Any = None, end: Any = None) -> list[Any]: """Returns a list of the calculated spectra Returns all spectra or an interval of spectra when `start` and `end` are specified Parameters ---------- start : int Index of the first spectrum to be returned end : int Index of the last spectrum to be returned """ ven = [value for (key, value) in sorted(self.spectra.items())] if (start is None) and (end is None): return ven ven2 = [] vkeys = [key for (key, value) in sorted(self.spectra.items())] for k in vkeys: if k >= start and k <= end: ven2.append(self.spectra[k]) return ven2
[docs] def get_PumpProbeSpectrumContainer(self, skip: int = 0) -> Any: """Converts this response container into PumpProbeSpectrumContainer. Each stored :class:`TwoDResponse` already contains enough spectral information to be projected onto the pump-probe axis. This method performs that projection for every stored waiting time. """ from .pumpprobe import PumpProbeSpectrumContainer k = 0 ppc = [] ttc = [] ii = 0 assert self.axis is not None for sp in self.get_spectra(): if k == 0: pp = sp.get_PumpProbeSpectrum() ppc.append(pp) ttc.append(self.axis.data[ii]) k += 1 if k > skip: k = 0 ii += 1 length = len(ppc) if length == 0: raise Exception("No spectra available for pump-probe conversion") start = ppc[0].get_t2() if length > 1: step = ppc[1].get_t2() - start elif self.axis is not None: step = self.axis.step else: step = 1.0 naxis = TimeAxis(start, length, step) ppcont = PumpProbeSpectrumContainer(t2axis=naxis) ppcont.itype = self.itype ii = 0 for sp in ppc: tt = ttc[ii] ppcont.set_spectrum(sp, tt) ii += 1 return ppcont
[docs] def get_integrated_area_evolution( self, times: Any, area: Any, dpart: Any = part_REAL ) -> Any: """Returns the integrated area of the 2D spectra as a function of their index""" vals = numpy.zeros(times.length, dtype=COMPLEX) k = 0 # this only acts on Frequency axis with energy_units("int"): tms = times.data for t2 in tms: sp = self.get_spectrum(t2) vals[k] = sp.get_area_integral(area, dpart=part_REAL) k += 1 return DFunction(times, vals)
[docs] def get_point_evolution(self, x: float, y: float, times: Any) -> Any: """Tracks an evolution of a single point on the 2D spectrum Parameters ---------- x : float x coordinate in the 2D spectrum (usually omega_1 axis) y : float y coordinate in the 2D spectrum (usually omega_3 axis) times : ValueAxis Times (usually waiting t_2 times) in which spectra are taken """ vals = numpy.zeros(times.length, dtype=COMPLEX) k = 0 # this only acts on Frequency axis with energy_units("int"): tms = times.data for t2 in tms: sp = self.get_spectrum(t2) vals[k] = sp.get_value_at(x, y) k += 1 return DFunction(times, vals)
[docs] def global_fit_exponential(self, guess: Any = None) -> Any: """Global fit of the data with exponentials""" from functools import partial from scipy.optimize import least_squares if guess is None: guess = [1.0, 1.0 / 100.0, 0.0] assert self.axis is not None _exp_2D_fcion = partial(_exp_2D_data0, times=self.axis.data, cont=self) params = least_squares(_exp_2D_fcion, guess) return params
[docs] def fft( self, ffttype: str = "complex-positive", window: Any = None, offset: float = 0.0, dtype: Any = None, dpart: Any = part_COMPLEX, tag: Any = None, ) -> Any: """Fourier transform in t2 time This method performs FFT on the container data determined by the value of the `dtype` argument. The new container is created and the storage resolution of its components is set `off`. This means that the container and its spectra have no idea about what data they store. Even when plotting the spectra, one has to set plotting of the `total` spectrum. Parameters ---------- ffttype : string Specifies the type Fourier transform we perform window : DFunction Windowing function for the data. Default is None """ if dtype is None: raise QuantarheiError("Type of the data for FFT has to be specified") if self.itype not in ["ValueAxis", "TimeAxis", "FrequencyAxis"]: raise QuantarheiError("FFT cannot be performed for this type of indexing") # even when no window function is supplied, we create one with # all elements equal to one if window is None: winfce = DFunction(self.axis, numpy.ones(self.axis.length, dtype=REAL)) else: winfce = window eff_axis: ValueAxis if isinstance(self.axis, TimeAxis): # restrict the time axis by the off-set tlist = [] for tt in self.axis.data: if tt >= offset: tlist.append(tt) if len(tlist) > 1: dt = tlist[1] - tlist[0] Nt = len(tlist) t0 = tlist[0] # effective time axis for fft eff_axis = TimeAxis(t0, Nt, dt, atype="complete") else: raise QuantarheiError("Offset too large") else: assert self.axis is not None eff_axis = self.axis # put all data into one array # raise QuantarheiError() self.set_data_flag(dtype) tags = eff_axis.data # self.axis.data Nos = self.length() if len(tags) <= Nos: tag1 = eff_axis.data[0] # self.axis.data[0] sp1 = self.get_spectrum(tag1) N1, N2 = sp1.d__data.shape data = numpy.zeros((N1, N2, len(tags)), dtype=sp1.d__data.dtype) for k_n in range(len(tags)): tag = eff_axis.data[k_n] # self.axis.data[k_n] spect = self.get_spectrum(tag) spect.set_data_flag(dtype) if dpart == part_COMPLEX: data[:, :, k_n] = spect.d__data elif dpart == part_REAL: data[:, :, k_n] = numpy.real(spect.d__data) elif dpart == part_IMAGINARY: data[:, :, k_n] = numpy.imag(spect.d__data) elif dpart == part_ABS: data[:, :, k_n] = numpy.abs(spect.d__data) else: raise QuantarheiError( "Number of spectra not consistent with ValueAxis object" ) # # FFT of the axis # axis = eff_axis # = self.axis new_axis: ValueAxis if isinstance(axis, TimeAxis): axis.shift_to_zero() new_axis = axis.get_FrequencyAxis() elif isinstance(axis, FrequencyAxis): new_axis = axis.get_TimeAxis() else: # this must be ValueAxis ftaxdata = (2.0 * numpy.pi) * numpy.fft.fftfreq(axis.length, axis.step) ftaxdata = numpy.fft.fftshift(ftaxdata) start = ftaxdata[0] length = len(ftaxdata) step = ftaxdata[1] - ftaxdata[0] new_axis = ValueAxis(start, length, step) # # FFT of the data # # window function ftdata: numpy.ndarray = numpy.zeros(data.shape, dtype=data.dtype) Nwin = len(winfce.data) Ndat = data.shape[2] for i_n in range(data.shape[0]): for j_n in range(data.shape[1]): ftdata[i_n, j_n, :] = ( data[i_n, j_n, :] * winfce.data[Nwin - Ndat : Nwin] ) ftdata = numpy.fft.ifft(ftdata, axis=2) ftdata = numpy.fft.fftshift(ftdata, axes=2) # save it to a new container new_container = TwoDSpectrumContainer() new_container.use_indexing_type(new_axis) for k_n in range(Ndat): tag = new_axis.data[k_n] spect = TwoDSpectrum() spect.set_axis_1(sp1.xaxis) spect.set_axis_3(sp1.yaxis) spect.set_data(ftdata[:, :, k_n], dtype=signal_TOTL) new_container.set_spectrum(spect, tag=tag) return new_container
def _create_root_group(self, start: Any, name: str) -> Any: return start.create_group(name) def _save_axis(self, rt: Any, name: str, ax: Any) -> None: axdir = rt.create_group(name) axdir.attrs.create("start", ax.start) axdir.attrs.create("length", ax.length) axdir.attrs.create("step", ax.step) def _load_axis(self, rt: Any, name: str) -> Any: axdir = rt[name] start = axdir.attrs["start"] length = axdir.attrs["length"] step = axdir.attrs["step"] return TimeAxis(start, length, step)
[docs] def trimall_to(self, window: Any = None) -> None: """Trims all spectra in the container Parameters ---------- window: list of floats Window, specified by four float number, to which all spectra in the container should be trimmed """ if window is not None: axes = window for s in self.get_spectra(): s.trim_to(window=axes)
# FIXME: this is still legacy version
[docs] def amax(self, spart: Any = part_REAL) -> Any: """Returns maximum amplitude of the spectra in the container""" mxs = [] for s in self.get_spectra(): # spect2D = numpy.real(s.d__data) if spart == part_REAL: spect2D = numpy.real(s.data) elif spart == part_IMAGINARY: spect2D = numpy.imag(s.data) elif spart == part_ABS: spect2D = numpy.abs(s.data) else: raise QuantarheiError("Unknow part of the spectrum:", spart) mx = numpy.amax(spect2D) mxs.append(mx) return numpy.amax(numpy.array(mxs))
# Print iterations progress def _printProgressBar( self, iteration: int, total: int, prefix: str = "", suffix: str = "", decimals: int = 1, length: int = 100, fill: str = "*", ) -> None: """Call in a loop to create terminal progress bar @params: iteration - Required : current iteration (Int) total - Required : total iterations (Int) prefix - Optional : prefix string (Str) suffix - Optional : suffix string (Str) decimals - Optional : positive number of decimals in percent complete (Int) length - Optional : character length of bar (Int) fill - Optional : bar fill character (Str) Based on: https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console """ # fill = '█'): percent = ("{0:." + str(decimals) + "f}").format( 100 * (iteration / float(total)) ) filledLength = int(length * iteration // total) bar = fill * filledLength + "-" * (length - filledLength) print(f"\r{prefix} |{bar}| {percent}% {suffix}", end="\r") # Print New Line on Complete if iteration == total: print()
[docs] def make_movie( self, filename: str, window: Any = None, stype: Any = signal_TOTL, spart: Any = part_REAL, cmap: Any = None, Npos_contours: int = 10, vmax: Any = None, vmin_ratio: float = 0.5, xlabel: Any = None, ylabel: Any = None, axis_label_font: Any = None, start: Any = None, end: Any = None, frate: int = 20, dpi: int = 100, show_states: Any = None, show_states_func: Any = None, label: Any = None, label_func: Any = None, text_loc: Any = None, progressbar: bool = False, use_t2: bool = True, title: str = "Quantarhei movie", comment: str = "Created with Quantarhei", ) -> None: """Creates a movie out of the spectra in the container Parameters ---------- Npos_contours : int Nomber of positive value contours in the plot """ import matplotlib.animation as manimation import matplotlib.pyplot as plt FFMpegWriter = manimation.writers["ffmpeg"] metadata = dict(title=title, artist="Quantarhei", comment=comment) writer = FFMpegWriter(fps=frate, metadata=metadata) fig = plt.figure() spctr = self.get_spectra() l = len(spctr) if use_t2 and (spctr[0].get_t2() < 0.0): # print("Warning: switching off usage of t2" # +" information obtained from the spectrum object (t2 < 0)") use_t2 = False if use_t2: last_t2 = spctr[l - 1].get_t2() first_t2 = spctr[0].get_t2() if start is None: start = first_t2 if end is None: end = last_t2 if vmax is None: mx = self.amax(spart=spart) else: mx = vmax with writer.saving(fig, filename, dpi): k = 0 # Initial call to print 0% progress if use_t2: sp2write = self.get_spectra(start=start, end=end) else: sp2write = self.get_spectra() l = len(sp2write) if progressbar: self._printProgressBar( 0, l, prefix="Progress:", suffix="Complete", length=50 ) for sp in sp2write: # self.get_spectra(start=start, end=end): if label_func is not None: (label, text_loc) = label_func(sp) if show_states_func is not None: show_states = show_states_func(sp) sp.plot( fig=fig, window=window, cmap=cmap, vmax=mx, vmin_ratio=vmin_ratio, Npos_contours=Npos_contours, stype=stype, spart=spart, show_states=show_states, xlabel=xlabel, ylabel=ylabel, axis_label_font=axis_label_font, label=label, text_loc=text_loc, ) # "T="+str(sp.get_t2())+"fs") writer.grab_frame() if progressbar: self._printProgressBar( k + 1, l, prefix="Progress:", suffix="Complete", length=50 ) k += 1
# if k == 20: # return def _exp_2D_data0(params: Any, times: Any = None, cont: Any = None) -> numpy.ndarray: """Returns a residue between time dependent matrix data and a matrix multipled by a sum of exponentials """ if times is None: raise QuantarheiError("Times have to be supplied") if cont is None: raise QuantarheiError("Spectra container has to be supplied") np = len(params) data0 = cont.get_spectrum(0.0) residues = numpy.zeros( (times.shape[0], data0.data.shape[0], data0.data.shape[1]), dtype=data0.dtype ) N = times.shape[0] * data0.data.shape[0] * data0.data.shape[0] nexp = int((np - 1) / 2) ii = 0 for t in times: ret = 0.0 kp = 0 for kk in range(nexp): ret += params[kp] * numpy.exp(-params[kp + 1] * t) kp += 2 ret += params[kp] spect = cont.get_spectrum(t) residues[ii, :, :] = numpy.abs(spect.data - ret * data0.data) ii += 1 return residues.reshape(N)
[docs] class TwoDSpectrumContainer(TwoDResponseContainer): def __init__(self, t2axis: Any = None, dtype: Any = signal_TOTL) -> None: self.t2axis = t2axis self.axis: ValueAxis | None = None self.itype: str | None = None self.index = 0 self.tags: list[Any] = [] self.dtype = dtype self.spectra: dict[Any, Any] = {} self._which: float | None = None if t2axis is not None: self.use_indexing_type(itype=t2axis)
[docs] def set_data_flag(self, flag: Any) -> None: """Sets data flag for all spectra in the container""" if flag != self.dtype: raise QuantarheiError("Cannot change spectra type")
[docs] def get_TwoDSpectrumContainer(self, stype: Any = signal_TOTL) -> Any: """Returns a container with specific spectra""" if stype == self.dtype: return self raise QuantarheiError("Cannot change spectra type in this container")
# FIXME: This needs to be reimplemented
[docs] def get_PumpProbeSpectrumContainer(self, skip: int = 0) -> Any: """Converts this container into PumpProbeSpectrumContainer""" if self.dtype == signal_TOTL: from .pumpprobe import PumpProbeSpectrumContainer k = 0 ppc = [] ttc = [] ii = 0 assert self.axis is not None for sp in self.get_spectra(): if k == 0: pp = sp.get_PumpProbeSpectrum() ppc.append(pp) ttc.append(self.axis.data[ii]) k += 1 if k > skip: k = 0 ii += 1 length = len(ppc) if length == 0: raise Exception("No spectra available for pump-probe conversion") start = ppc[0].get_t2() if length > 1: step = ppc[1].get_t2() - start elif self.axis is not None: step = self.axis.step else: step = 1.0 naxis = TimeAxis(start, length, step) ppcont = PumpProbeSpectrumContainer(t2axis=naxis) ii = 0 for sp in ppc: tt = ttc[ii] ppcont.set_spectrum(sp, tt) ii += 1 return ppcont raise QuantarheiError( "Cannot calculate Pump-probe from 2D spectra of type" + self.dtype )
[docs] def normalize2( self, norm: float = 1.0, each: bool = False, dpart: Any = part_REAL ) -> Any: """Normalize the whole container of spectra Normalization of the whole container so that the maximum value of the spectrum is equal to the `norm`. Parameters ---------- norm : float Value to which we normalize the spectra each: bool If False, we normalize the global maximum of the container, i.e. a maximum accross whole spectra. if True, each spectrum is normalized individually against its own maximum. dpart: string Part of the spectrum from which the maximum is calculated, it can be part_REAL, part_IMAGINARY or part_ABS. The values of these constants are defined in the highest level of namespace in Quantarhei. """ nsp = len(self.spectra) mxs: numpy.ndarray = numpy.zeros(nsp, dtype=REAL) ii = 0 for tag in self.spectra.keys(): sp = self.get_spectrum(tag) mxs[ii] = sp.get_max_value(dpart=dpart) ii += 1 mx = numpy.amax(mxs) nmax = [mx] for tag in self.spectra.keys(): sp = self.get_spectrum(tag) if each: nmax = [sp.get_max_value(dpart=dpart)] sp.normalize2(norm, dpart=dpart, nmax=nmax) return nmax
[docs] def fft( self, ffttype: str = "complex-positive", window: Any = None, offset: float = 0.0, dtype: Any = None, dpart: Any = part_COMPLEX, tag: Any = None, ) -> Any: """Fourier transform in t2 time This method performs FFT on the container data determined by the value of the `dtype` argument. The new container is created and the storage resolution of its components is set `off`. This means that the container and its spectra have no idea about what data they store. Even when plotting the spectra, one has to set plotting of the `total` spectrum. Parameters ---------- ffttype : string Specifies the type Fourier transform we perform window : DFunction Windowing function for the data. Default is None """ if dtype is not None: if dtype != self.dtype: raise QuantarheiError( "Cannot change spectra type in TwoDSpectrumContainer" ) if self.itype not in ["ValueAxis", "TimeAxis", "FrequencyAxis"]: raise QuantarheiError("FFT cannot be performed for this type of indexing") # even when no window function is supplied, we create one with # all elements equal to one if window is None: winfce = DFunction(self.axis, numpy.ones(self.axis.length, dtype=REAL)) else: winfce = window eff_axis: ValueAxis if isinstance(self.axis, TimeAxis): # restrict the time axis by the off-set tlist = [] for tt in self.axis.data: if tt >= offset: tlist.append(tt) if len(tlist) > 1: dt = tlist[1] - tlist[0] Nt = len(tlist) t0 = tlist[0] # effective time axis for fft eff_axis = TimeAxis(t0, Nt, dt, atype="complete") else: raise QuantarheiError("Offset too large") else: assert self.axis is not None eff_axis = self.axis # put all data into one array # raise QuantarheiError() tags = eff_axis.data # self.axis.data Nos = self.length() if len(tags) <= Nos: tag1 = eff_axis.data[0] # self.axis.data[0] sp1 = self.get_spectrum(tag1) N1, N2 = sp1.data.shape data = numpy.zeros((N1, N2, len(tags)), dtype=sp1.data.dtype) for k_n in range(len(tags)): tag = eff_axis.data[k_n] # self.axis.data[k_n] spect = self.get_spectrum(tag) if dpart == part_COMPLEX: data[:, :, k_n] = spect.data elif dpart == part_REAL: data[:, :, k_n] = numpy.real(spect.data) elif dpart == part_IMAGINARY: data[:, :, k_n] = numpy.imag(spect.data) elif dpart == part_ABS: data[:, :, k_n] = numpy.abs(spect.data) else: raise QuantarheiError( "Number of spectra not consistent with ValueAxis object" ) # # FFT of the axis # axis = eff_axis # = self.axis new_axis: ValueAxis if isinstance(axis, TimeAxis): axis.shift_to_zero() new_axis = axis.get_FrequencyAxis() elif isinstance(axis, FrequencyAxis): new_axis = axis.get_TimeAxis() else: # this must be ValueAxis ftaxdata = (2.0 * numpy.pi) * numpy.fft.fftfreq(axis.length, axis.step) ftaxdata = numpy.fft.fftshift(ftaxdata) start = ftaxdata[0] length = len(ftaxdata) step = ftaxdata[1] - ftaxdata[0] new_axis = ValueAxis(start, length, step) # # FFT of the data # # window function ftdata: numpy.ndarray = numpy.zeros(data.shape, dtype=data.dtype) Nwin = len(winfce.data) Ndat = data.shape[2] for i_n in range(data.shape[0]): for j_n in range(data.shape[1]): ftdata[i_n, j_n, :] = ( data[i_n, j_n, :] * winfce.data[Nwin - Ndat : Nwin] ) ftdata = numpy.fft.ifft(ftdata, axis=2) ftdata = numpy.fft.fftshift(ftdata, axes=2) # save it to a new container new_container = TwoDSpectrumContainer() new_container.use_indexing_type(new_axis) for k_n in range(Ndat): tag = new_axis.data[k_n] spect = TwoDSpectrum() spect.set_axis_1(sp1.xaxis) spect.set_axis_3(sp1.yaxis) spect.set_data(ftdata[:, :, k_n], dtype=self.dtype) new_container.set_spectrum(spect, tag=tag) return new_container
[docs] def unitedir(self, dname: str) -> Any: clist = self.loaddir(dname) n = len(clist) cont = clist[1] for k in range(1, n): ctn = clist[k + 1] for tag in ctn.spectra: cont.set_spectrum(ctn.spectra[tag], tag=tag) return cont