pytest/pytestpavement/versuche/schichtenverbund.py
2022-08-22 16:18:58 +02:00

336 lines
7.1 KiB
Python

import os
import sys
from multiprocessing import Pool, cpu_count
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from aenum import enum
from fsutil import exists
from pytestpavement.analysis import fit_cos, fit_cos_eval
def fit_single_data(g):
"""
iterate over data and fit
"""
try:
i, d = g
d = d.loc[i]
#d = d.reset_index()
Ns = d['N'].unique()
e = d[(d['N'] > Ns[-7]) & (d['N'] <= Ns[-2])].copy()
if e.empty:
return
e.index = e.index - e.index[0]
e = e.reset_index()
res_par = {}
res_par['T_set'] = i[0]
res_par['sigma_set'] = i[1]
res_par['f_set'] = float(i[2])
res_par['ext_set'] = i[3]
r2 = []
for col in ['F', 's1', 's2']:
x = e['time'].values
y = e[col].values
res_step = fit_cos(x, y, freq=res_par['f_set'])
r2.append(res_step['r2'])
for key in res_step.keys():
res_par[key + f'_{col}'] = res_step[key]
except:
return
return res_par
class TestSchichtenverbundV2GeoSys():
"""
read and process test of type Schichtenverbund
Configuration created for TU Dresden
...
Attributes
----------------
filename : str
filename to read
tablenum : str
table number of geosys file
Methodes
-------------
Returns
-------------
"""
def __init__(self,
filename: str,
diameter: float,
spalt: float = 1.0,
tablenum: str = '038',
debug: bool = False,
plot_fit: bool = False,
plot_fit_error: bool = True):
self.file = filename
self.diameter = diameter
self.spalt = spalt
self._tablenum = tablenum
self._plot = plot_fit
self._plot_on_error = plot_fit_error
self._debug = debug
self.data = None
self._check_file_exists()
self._run()
def _run(self):
if self._debug:
print('debug mode')
self._read()
self._normalize_data()
self._set_units()
self._check_data()
self._transform_data()
self._fit_data()
self._calc_Es()
def __str__(self):
return f"filename: {self.file}, table number: {self._tablenum}"
def _check_file_exists(self):
assert os.path.exists(self.file)
def _read(self):
self.data = []
def _normalize_data(self):
return
def _set_units(self):
return
def _check_data(self):
must_have_values = [
'T_set',
'sigma_set',
'f_set',
'ext_set',
'time',
'F',
's1',
's2',
'N',
]
check = [item in self.data.columns for item in must_have_values]
assert all(check)
pass
#
def _transform_data(self):
self.data = self.data.set_index(
['T_set', 'sigma_set', 'f_set', 'ext_set', 'time']).sort_index()
def _fit_data(self):
if not self._debug:
with Pool(cpu_count()) as pool:
ret_list = pool.map(
fit_single_data,
[(i, d) for i, d in self.data.groupby(level=[0, 1, 2, 3])])
else:
ret_list = []
for i, d in self.data.groupby(level=[0, 1, 2, 3]):
ret_list.append(fit_single_data((i, d)))
self.res = pd.DataFrame.from_dict(
[r for r in ret_list if isinstance(r, dict)])
self.res = self.res.set_index(
['T_set', 'sigma_set', 'f_set', 'ext_set']).sort_index()
#self.res.sort_index(axis=0, inplace=True)
#self.res.sort_index(axis=1, inplace=True)
def _plot_single_data(self, i):
ylabels = {
'F': 'Force in N',
's1': 'Displacement $s_1$ in $\mu m$',
's2': 'Displacement $s_2$ in $\mu m$'
}
par = self.res.loc[i].to_dict()
df = self.data.loc[i]
Ns = df['N'].unique()
e = df[(df['N'] > Ns[-7]) & (df['N'] <= Ns[-2])].copy()
e.index = e.index - e.index[0]
if e.empty:
return
fig, axs = plt.subplots(3, 1, sharex=True)
fig.set_figheight(1.5 * fig.get_figheight())
for i, col in enumerate(['F', 's1', 's2']):
ax = axs[i]
x, y = e.index, e[col]
ax.plot(x, y, c='k', label='data')
par_sel = [key for key in par.keys() if col in key]
par_sel = dict((k.split('_')[0], par[k]) for k in par_sel)
ys = fit_cos_eval(x, par_sel)
r2_sel = np.round(par_sel['r2'], 3)
ax.plot(x, ys, c='C1', label=f'fit (R² = {r2_sel}')
ax.legend(loc=0)
ax.set_ylabel(ylabels[col])
ax.set_xlabel('Time in s')
plt.tight_layout()
plt.show()
def plot_fitted_data(self, num: int = 7):
"""
plot fit
"""
counter = 0
for i, r in self.res.groupby(level=[0, 1, 2, 3]):
self._plot_single_data(i)
counter += 1
if (num != None) & (counter >= num):
break
def plot_fitted_data_error(self, rmin: float = 0.9):
"""
plot fit
"""
sel_res = self.res[(self.res['r2_F'] <= rmin)]
if sel_res.empty:
print('no errors')
return
for i, r in sel_res.groupby(level=[0, 1, 2, 3]):
self._plot_single_data(i)
def _calc_Es(self):
area = np.pi * self.diameter**2 / 4
ampF = self.res['amp_F']
ampS = self.res[['amp_s1', 'amp_s2']].mean(axis=1)
tau = ampF.div(area)
gamma = ampS.div(1000.0).div(self.spalt)
self.res['Es'] = tau / gamma
class TestSchichtenverbundV2GeoSysExtractedEMPA(TestSchichtenverbundV2GeoSys):
def _read(self):
if self._debug:
nrows = 15000
else:
nrows = None
self.data = pd.read_csv(self.file, sep='\t', decimal='.', nrows=nrows)
self.data.drop(index=[0], inplace=True)
self.data.dropna(axis=0, inplace=True)
for col in self.data.columns:
self.data[col] = pd.to_numeric(self.data[col])
def _normalize_data(self):
col = list(self.data.columns)
for i, d in enumerate(col):
if 'soll temperature' in d:
col[i] = 'T_set'
elif 'soll sigma' in d:
col[i] = 'sigma_set'
elif 'soll frequency' in d:
col[i] = 'f_set'
elif 'soll extension' in d:
col[i] = 'ext_set'
elif 'vertical load from hydraulic pressure' in d:
col[i] = 'F'
elif 'Vertical position from LVDT 1' in d:
col[i] = 's1'
elif 'Vertical position from LVDT 2' in d:
col[i] = 's2'
elif 'Zyklenzähler' in d:
col[i] = 'N'
self.data.columns = col