"""Postprocessing classes.
.. autosummary::
:toctree: postprocessing
PostProcessing
AnalyticPP
RollandPP
Response
TDR
"""
import abc
import matplotlib.pyplot as plt
from numpy import array, convolve, ones, pi, rint, round, squeeze, where, zeros # noqa: A004
from numpy.fft import fft, fftfreq
from traitlets import Float, Instance, List, Unicode
from traittypes import Array
from .abstract_traits import ABCHasTraits
from .deflection import Deflection
from .methods import AnalyticalMethods
from .track import (
ArrangedBallastedSingleRailTrack,
ArrangedSlabSingleRailTrack,
)
[docs]
class PostProcessing(ABCHasTraits):
r"""Abstract base class for postprocessing classes."""
[docs]
@abc.abstractmethod
def validate_postprocessing(self):
"""Validate the postprocessing methods."""
[docs]
@staticmethod
def fast_fourier_tr(tsignal, dt):
"""Calculate the Fast Fourier Transform (FFT) of a time signal.
Parameters
----------
tsignal : numpy.ndarray
Time signal to transform.
dt : float
Time step between samples.
Returns
-------
tuple
Frequencies and FFT of the signal.
"""
samples = len(tsignal)
window = ones(samples)
fftrans = 2.0 / samples * fft(tsignal[:samples] * window)
fftfre = fftfreq(samples, dt)
return fftfre[0 : samples // 2], fftrans[0 : samples // 2]
[docs]
@staticmethod
def plot(
arrays, labels, title='Universal Plot', x_label='X-axis', y_label='Y-axis', colors=None, plot_type='loglog',
):
"""Universal plot function for multiple data sets.
Parameters
----------
arrays : list of tuple
List of tuples, where each tuple contains two numpy.ndarray (x and y data).
labels : list of str
List of labels for each array.
title : str, optional
Title of the plot. Default is 'Universal Plot'.
x_label : str, optional
Label for the x-axis. Default is 'X-axis'.
y_label : str, optional
Label for the y-axis. Default is 'Y-axis'.
colors : list of str, optional
List of colors for each array. Default is None.
plot_type : str, optional
Type of plot (e.g., 'loglog', 'plot'). Default is 'loglog'.
"""
plt.figure(figsize=(10, 6))
if colors is None:
colors = ['k', 'r', 'b', 'g', 'c', 'm', 'y']
for (x, y), label, color in zip(arrays, labels, colors, strict=False):
if plot_type == 'loglog':
plt.loglog(x, y, label=label, color=color)
else:
plt.plot(x, y, label=label, color=color)
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.title(title)
plt.legend()
plt.grid(True)
plt.show()
[docs]
class AnalyticPP(PostProcessing):
r"""Analytic postprocessing class.
This class is used to perform postprocessing on analytical methods.
Attributes
----------
results : AnalyticalMethods
Instance of the AnalyticalMethods class containing the results.
"""
results = Instance(AnalyticalMethods)
[docs]
def validate_postprocessing(self):
"""Validate the postprocessing methods."""
@property
def f(self):
"""Frequency vector."""
return self.results.f
@property
def vb(self):
"""Velocity vector."""
return self.results.mobility * self.results.force
@property
def ub(self):
"""Displacement vector."""
return self.vb / (self.results.omega * 1j)
[docs]
class RollandPP(PostProcessing):
r"""Rolland postprocessing base class.
This class is used to perform postprocessing on Rolland methods.
Attributes
----------
results : Deflection
Instance of the Deflection class containing the results.
f_min : float
Minimum frequency for response calculation :math:`[Hz]`.
f_max : float
Maximum frequency for response calculation :math:`[Hz]`.
"""
results = Instance(Deflection)
f_min = Float(default_value=100.0, min=0.0)
f_max = Float(default_value=3000.0, min=0.0)
[docs]
def validate_postprocessing(self):
"""Validate the postprocessing methods."""
[docs]
class Response(RollandPP):
r"""Postprocessing class for Rolland response quantities.
This class calculates and stores response quantities such as receptance,
mobility, and accelerance based on the results of the Deflection class.
Attributes
----------
results : Deflection
Instance of the Deflection class containing the results.
x_resp : list of float
List of response points in meters :math:`[m]` (default value is x_excit).
ind_resp : list of int
List of response indices (None if x_resp is provided).
freq : numpy.ndarray
Frequency vector :math:`[Hz]`.
rez : numpy.ndarray
Receptance vector :math:`[m/N]`.
mob : numpy.ndarray
Mobility vector :math:`[m/Ns]`.
accel : numpy.ndarray
Accelerance vector :math:`[m/Ns^2]`.
"""
x_resp = List(default_value=None, allow_none=True)
ind_resp = List(default_value=None, allow_none=True)
freq = Array()
rez = Array()
mob = Array()
accel = Array()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calculate_response()
#self.observe(self._on_results_change, names='results')
#@observe('results')
#def _on_results_change(self, change):
# self.calculate_response()
[docs]
def calculate_response(self):
"""Calculate and store response quantities (Receptance, Mobility, Accelerance)."""
if self.x_resp is None and self.ind_resp is None:
self.x_resp = [self.results.discr.dx * self.results.ind_excit]
self.ind_resp = [int(x / self.results.discr.dx) for x in self.x_resp]
elif self.x_resp is None and self.ind_resp is not None:
self.x_resp = [(x * self.results.discr.dx) for x in self.ind_resp]
else:
self.ind_resp = [int(x / self.results.discr.dx) for x in self.x_resp]
# Compute force FFT once
fftfre, ffft = self.fast_fourier_tr(self.results.force, self.results.discr.dt)
# Initialize arrays for results
n_points = len(self.ind_resp)
n_freq = len(fftfre)
ufft = zeros((n_points, n_freq), dtype=complex)
# Compute deflection FFTs separately for each point
for i, ind in enumerate(self.ind_resp):
defl = self.results.deflection[ind, : self.results.discr.nt]
_, ufft[i] = self.fast_fourier_tr(defl, self.results.discr.dt)
# Calculate quantities for all points
rez = ufft / ffft # Receptance
mob = 1j * fftfre * 2 * pi * rez # Mobility
accel = -((fftfre * 2 * pi) ** 2) * rez # Accelerance
# Frequency range
mask = (fftfre > self.f_min) & (fftfre <= self.f_max)
# Store results as attributes
self.freq = fftfre[mask]
self.rez = squeeze(rez[:, mask])
self.mob = squeeze(mob[:, mask])
self.accel = squeeze(accel[:, mask])
[docs]
class TDR(RollandPP):
r"""Postprocessing class for TDR (Track-Decay-Rate).
This class calculates and stores the Track-Decay-Rate (TDR) based on :cite:`EN15461:2008`.
Attributes
----------
results : Deflection
Instance of the Deflection class containing the results.
tdr : numpy.ndarray
Track-Decay-Rate vector :math:`[dB/m]`.
ind_tdr : list of int
Indices of the TDR points.
x_tdr : numpy.ndarray
Distances of the TDR points from the excitation point :math:`[m]`.
filter : str
Filter type (default is '1/3 Octave').
freq : numpy.ndarray
Frequency vector :math:`[Hz]`.
"""
tdr = Array()
filter = Unicode(default_value=None, allow_none=True)
freq = Array()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.find_tdr_points()
self.calculate_tdr()
# self.observe(self._on_results_change, names='results')
[docs]
def find_tdr_points(self):
"""Find the corresponding measurement points depending on track type."""
if isinstance(self.results.track, ArrangedSlabSingleRailTrack | ArrangedBallastedSingleRailTrack):
# TDR for non-uniform mounting positions
# Identification of TDR positions
# Determination of mounting positions
x_mp = array(list(self.results.track.mount_prop.keys())) # Position
ind_mp = (x_mp / self.results.discr.dx).astype(int) # Index
# Left sleeper Index
idx_s = int(where(ind_mp < self.results.ind_excit)[0][-1])
# Calculate distance from excitation point
x_s = x_mp[idx_s:] - x_mp[idx_s] # Sleeper distances from excitation point.
x_sc = convolve(x_s, ones(2) / 2, mode='valid') # Sleeper centers from excitation point.
def tdr_points_betw1(idx):
"""Calculate of theoretical measurement points (1st part)."""
return ((x_s[idx + 1] - x_sc[idx]) / 2) + x_sc[idx]
def tdr_points_betw2(dx):
"""Calculate of theoretical measurement points (2nd part)."""
return ((x_sc[dx] - x_s[dx]) / 2) + x_s[dx]
# Theoretical measurement points
self.x_tdr = array([x_sc[0], tdr_points_betw1(0), x_s[1], tdr_points_betw2(1), x_sc[1], tdr_points_betw1(1),
x_s[2], tdr_points_betw2(2), x_sc[2], tdr_points_betw1(2), x_s[3], x_sc[3], x_s[4], x_sc[4],
x_sc[5], x_sc[6], x_sc[7], x_sc[8], x_sc[10], x_sc[12], x_sc[16], x_sc[20], x_sc[24], x_sc[30],
x_sc[36], x_sc[42], x_sc[48], x_sc[54], x_sc[66]]) - x_sc[0]
# Determination of measurement position indices
ind_tdr = rint(round(self.x_tdr, 5) / self.results.discr.dx) + self.results.ind_excit
self.ind_tdr = list(ind_tdr.astype(int))
else:
# TDR for continuous slab and ballasted tracks
# Identification of TDR positions
ind_excit = self.results.ind_excit # Start index.
l_s = 0.6 # Theoretical Sleeper distance.
x_tdr = array([0.5, 0.75, 1, 1.25, 1.5, 1.75, 2, 2.25, 2.5, 2.75, 3, 3.5, 4, 4.5, 5.5, 6.5, 7.5, 8.5,
10.5, 12.5, 16.5, 20.5, 24.5, 30.5, 36.5, 42.5, 48.5, 54.5, 66.5]) * l_s
self.x_tdr = x_tdr - l_s / 2
ind_tdr = rint(self.x_tdr / self.results.discr.dx) + ind_excit
self.ind_tdr = list(ind_tdr.astype(int))
[docs]
def calculate_tdr(self):
"""Calculate the Track-Decay-Rate (TDR) based on the results."""
# Calculation of mobilities
resp = Response(results=self.results, ind_resp=self.ind_tdr)
mob = resp.mob
# Calculation of TDR (according to DIN)
sum_tdr = abs(mob[1, 1:]) ** 2 / abs(mob[0, 1:]) ** 2 * (self.x_tdr[1])
for n in range(2, len(self.ind_tdr)):
sum_tdr = sum_tdr + abs(mob[n, 1:]) ** 2 / abs(mob[0, 1:]) ** 2 * (self.x_tdr[n])
self.tdr = 4.343 / sum_tdr
self.freq = resp.freq[1:]