import os import sys from multiprocessing import Pool, cpu_count import matplotlib.pyplot as plt import numpy as np import pandas as pd from aenum import enum from fsutil import exists from pytestpavement.analysis import fit_cos, fit_cos_eval from pytestpavement.io import read_geosys def fit_single_data(g): """ iterate over data and fit """ try: i, d = g d = d.loc[i] #d = d.reset_index() Ns = d['N'].unique() e = d[(d['N'] > Ns[-7]) & (d['N'] <= Ns[-2])].copy() if e.empty: return e.index = e.index - e.index[0] e = e.reset_index() res_par = {} res_par['T_set'] = i[0] res_par['sigma_set'] = i[1] res_par['f_set'] = float(i[2]) res_par['ext_set'] = i[3] r2 = [] for col in ['F', 's1', 's2']: x = e['time'].values y = e[col].values res_step = fit_cos(x, y, freq=res_par['f_set']) r2.append(res_step['r2']) for key in res_step.keys(): res_par[key + f'_{col}'] = res_step[key] except: return return res_par class TestSchichtenverbundV2GeoSys(): """ read and process test of type Schichtenverbund Configuration created for TU Dresden ... Attributes ---------------- filename : str filename to read tablenum : str table number of geosys file Methodes ------------- Returns ------------- """ def __init__(self, filename: str, diameter: float = 100.0, spalt: float = 1.0, tablenum: str = '013', debug: bool = False, plot_fit: bool = False, plot_fit_error: bool = True): self.file = filename self.diameter = diameter self.spalt = spalt self._tablenum = tablenum self._plot = plot_fit self._plot_on_error = plot_fit_error self._debug = debug self.data = None self._check_file_exists() self._run() def _run(self): if self._debug: print('debug mode') self._read() self._normalize_data() self._set_units() self._check_data() self._transform_data() self._fit_data() self._calc_Es() def __str__(self): return f"filename: {self.file}, table number: {self._tablenum}" def _check_file_exists(self): assert os.path.exists(self.file) def _read(self): meta, data = read_geosys('./data/raw/TU Dresden/PK4.txt', self._tablenum, debug=self._debug) self.diameter = meta['d'] data = data.reset_index() self.data = data def _normalize_data(self): col = list(self.data.columns) for i, d in enumerate(col): if d == 't': col[i] = 'time' elif d == 'f': col[i] = 'f_set' elif d == 's_vert_1': col[i] = 's1' elif d == 's_vert_2': col[i] = 's2' self.data.columns = col if self._debug: print(self.data.columns) def _set_units(self): return def _check_data(self): must_have_values = [ 'T_set', 'sigma_set', 'f_set', 'ext_set', 'time', 'F', 's1', 's2', 'N', ] check = [item in self.data.columns for item in must_have_values] if not all(check): print('Error in Parameters:') for i, c in enumerate(check): if c == False: p = must_have_values[i] print(f'\t - {p}') print(self.data.head()) assert all(check) pass # def _transform_data(self): self.data = self.data.set_index( ['T_set', 'sigma_set', 'f_set', 'ext_set', 'time']).sort_index() def _fit_data(self): if not self._debug: with Pool(cpu_count()) as pool: ret_list = pool.map( fit_single_data, [(i, d) for i, d in self.data.groupby(level=[0, 1, 2, 3])]) else: ret_list = [] for i, d in self.data.groupby(level=[0, 1, 2, 3]): ret_list.append(fit_single_data((i, d))) self.res = pd.DataFrame.from_dict( [r for r in ret_list if isinstance(r, dict)]) self.res = self.res.set_index( ['T_set', 'sigma_set', 'f_set', 'ext_set']).sort_index() #self.res.sort_index(axis=0, inplace=True) #self.res.sort_index(axis=1, inplace=True) def _plot_single_data(self, i): ylabels = { 'F': 'Force in N', 's1': 'Displacement $s_1$ in $\mu m$', 's2': 'Displacement $s_2$ in $\mu m$' } par = self.res.loc[i].to_dict() df = self.data.loc[i] Ns = df['N'].unique() e = df[(df['N'] > Ns[-7]) & (df['N'] <= Ns[-2])].copy() e.index = e.index - e.index[0] if e.empty: return fig, axs = plt.subplots(3, 1, sharex=True) fig.set_figheight(1.5 * fig.get_figheight()) for i, col in enumerate(['F', 's1', 's2']): ax = axs[i] x, y = e.index, e[col] ax.plot(x, y, c='k', label='data') par_sel = [key for key in par.keys() if col in key] par_sel = dict((k.split('_')[0], par[k]) for k in par_sel) ys = fit_cos_eval(x, par_sel) r2_sel = np.round(par_sel['r2'], 3) ax.plot(x, ys, c='C1', label=f'fit (R² = {r2_sel}') ax.legend(loc=0) ax.set_ylabel(ylabels[col]) ax.set_xlabel('Time in s') plt.tight_layout() plt.show() def plot_fitted_data(self, num: int = 7): """ plot fit """ counter = 0 for i, r in self.res.groupby(level=[0, 1, 2, 3]): self._plot_single_data(i) counter += 1 if (num != None) & (counter >= num): break def plot_fitted_data_error(self, rmin: float = 0.9): """ plot fit """ sel_res = self.res[(self.res['r2_F'] <= rmin)] if sel_res.empty: print('no errors') return for i, r in sel_res.groupby(level=[0, 1, 2, 3]): self._plot_single_data(i) def _calc_Es(self): area = np.pi * self.diameter**2 / 4 ampF = self.res['amp_F'] ampS = self.res[['amp_s1', 'amp_s2']].mean(axis=1) tau = ampF.div(area) gamma = ampS.div(1000.0).div(self.spalt) self.res['Es'] = tau / gamma class TestSchichtenverbundV2GeoSysExtractedEMPA(TestSchichtenverbundV2GeoSys): def _read(self): if self._debug: nrows = 15000 else: nrows = None self.data = pd.read_csv(self.file, sep='\t', decimal='.', nrows=nrows) self.data.drop(index=[0], inplace=True) self.data.dropna(axis=0, inplace=True) for col in self.data.columns: self.data[col] = pd.to_numeric(self.data[col]) def _normalize_data(self): col = list(self.data.columns) for i, d in enumerate(col): if 'soll temperature' in d: col[i] = 'T_set' elif 'soll sigma' in d: col[i] = 'sigma_set' elif 'soll frequency' in d: col[i] = 'f_set' elif 'soll extension' in d: col[i] = 'ext_set' elif 'vertical load from hydraulic pressure' in d: col[i] = 'F' elif 'Vertical position from LVDT 1' in d: col[i] = 's1' elif 'Vertical position from LVDT 2' in d: col[i] = 's2' elif 'Zyklenzähler' in d: col[i] = 'N' self.data.columns = col