pytest/pytestpavement/labtests/base.py
2022-11-08 14:17:52 +01:00

507 lines
14 KiB
Python

import os
from csv import reader
from io import BytesIO
import lmfit as lm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.fft as sfft
from pytestpavement.analysis import cosfunc
from pytestpavement.helper.filehasher import calc_hash_of_file
class DataSineLoad():
"""
Base class for lab tests with sine load
"""
def __init__(self,
fname: str,
debug: bool = False,
roundtemperature: bool = True):
self.debug = debug
self.roundtemperature = roundtemperature
self.file = fname
self._run()
def _run(self):
self._file_exists()
self._set_parameter()
self._define_units()
self._calc_hash()
def fit(self):
self._read_data()
self._standardize_data()
self._standardize_meta()
if self.roundtemperature:
self._replace_temperature()
self._calc_missiong_values()
self._set_units()
self._validate_data()
self._postprocess_data()
self._split_data()
self._select_data()
self._fit_data()
def _file_exists(self):
assert os.path.exists(self.file)
def _set_parameter(self):
self.split_data_based_on_parameter = ['f']
self.col_as_int = ['N']
self.col_as_float = ['T', 'F', 's_piston', 's_hor_1', 'f', 's_hor_sum']
self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_hor_sum']
# Header names after standardization; check if exists
self.val_header_names = ['speciment_height']
self.number_of_load_cycles_for_analysis = 5
def _define_units(self):
self.unit_s = 1 #mm
self.unit_F = 1 #N
self.unit_t = 1 / 1000. #s
def _calc_hash(self):
""" calculate the hash of the file """
self.filehash = calc_hash_of_file(self.file)
def _read_data(self):
"""
read data from Labor Hart, Spaltzugversuche Steifigkeit
"""
# parameter
encoding = 'latin-1'
skiprows = 14
hasunits = True
splitsign = ':;'
# metadata from file
meta = {}
with open(self.file, 'r', encoding=encoding) as f:
count = 0
for line in f:
count += 1
#remove whitespace
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
if len(linesplit) == 2:
meta[linesplit[0]] = linesplit[1]
if count >= skiprows:
break
# data
data = pd.read_csv(self.file,
encoding=encoding,
skiprows=skiprows,
decimal=',',
sep=';')
## add header to df
with open(self.file, 'r', encoding=encoding) as f:
count = 0
for line in f:
count += 1
if count >= skiprows:
break
head = line.split(';')
data.columns = head
sigma = float(
os.path.split(self.file)[-1].split('MPa')[0].strip().replace(
',', '.'))
data['sigma'] = sigma
#clean data
data = data.dropna(axis=1)
#define in class
self.meta = meta
self.data = data
return True
def _validate_data(self):
""" check if all column names are standardized"""
cols = self.data.columns
for val in self.val_header_names:
if not val in self.meta.keys():
raise ValueError(f"{val} not in header of data")
for col in self.val_col_names:
if not col in cols:
raise ValueError(f'{col} not standardized')
def _standardize_meta(self):
keys = list(self.meta.keys())
for key in keys:
if any(map(key.__contains__, ['Probenbezeichnung'])):
self.meta['speciment'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Datum/Uhrzeit'])):
self.meta['datetime'] = self.meta.pop(key)
try:
self.meta['datetime'] = pd.to_datetime(
self.meta['datetime'])
except:
pass
elif any(map(key.__contains__, ['Probenhöhe'])):
self.meta['speciment_height'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Probendurchmesser'])):
self.meta['speciment_diameter'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Solltemperatur'])):
self.meta['temperature'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Prüfbedingungen'])):
self.meta['test_version'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Name des VersAblf'])):
self.meta['test'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Prüfer'])):
self.meta['examiner'] = self.meta.pop(key)
return True
def _standardize_data(self):
colnames = list(self.data.columns)
for i, col in enumerate(colnames):
if any(map(col.__contains__, ['TIME'])):
colnames[i] = 'time'
elif any(map(col.__contains__, ['Temperatur'])):
colnames[i] = 'T'
elif any(map(col.__contains__, ['Load'])):
colnames[i] = 'F'
elif any(map(col.__contains__, ['Position'])):
colnames[i] = 's_piston'
elif any(map(col.__contains__, ['FREQUENZ'])):
colnames[i] = 'f'
elif any(map(col.__contains__, ['mpulsnummer_fortlaufend'])):
colnames[i] = 'Ncum'
elif any(map(col.__contains__, ['Impulsnummer'])):
colnames[i] = 'N'
elif any(map(col.__contains__, ['SENSOR 4'])):
colnames[i] = 's_hor_1'
elif any(map(col.__contains__, ['SENSOR Extension'])):
colnames[i] = 's_hor_2'
self.data.columns = colnames
def _set_units(self):
for col in ['s_hor_sum', 's_hor_1', 's_hor_2']:
self.data[col] = self.data[col].mul(self.unit_s)
for col in ['F']:
self.data[col] = self.data[col].mul(self.unit_F)
for col in ['time']:
self.data[col] = self.data[col].mul(self.unit_t)
return True
def _replace_temperature(self):
temperatures = self.data['T'].unique()
Tset = {}
for temperature in temperatures:
Tset[temperature] = round(temperature, -1)
self.data['T'] = self.data['T'].replace(Tset)
def _calc_missiong_values(self):
cols = self.data.columns
if not 's_hor_sum' in cols:
self.data['s_hor_sum'] = self.data[['s_hor_1',
's_hor_2']].sum(axis=1)
def _postprocess_data(self):
#set dtypes:
for col in self.col_as_int:
self.data[col] = self.data[col].astype('int')
for col in self.col_as_float:
try:
self.data[col] = self.data[col].astype('float')
except:
pass
#set index
self.data = self.data.set_index('time')
return True
def _split_data(self):
data_gp = self.data.groupby(self.split_data_based_on_parameter)
data_list = []
for idx, d in data_gp:
idx_diff = np.diff(d.index)
dt_mean = idx_diff.mean()
gaps = idx_diff > (2 * dt_mean)
has_gaps = any(gaps)
if has_gaps == False:
data_list.append(d)
else:
idx_gaps = (np.where(gaps)[0] - 1)[0]
data_list.append(d.iloc[0:idx_gaps])
#add self.
if len(data_list) == 0:
self.num_tests = 0
self.data = data_list[0]
else:
self.num_tests = len(data_list)
self.data = data_list
#break
def _select_data(self):
""" select N load cycles from original data """
def sel_df(df, num=5):
N = df['N'].unique()
if len(N) > num:
df_sel = df[(df['N'] >= N[-num - 1]) & (df['N'] <= N[-2])]
return df_sel
else:
ValueError(
'Number of load cycles smaller than selectect values')
if not isinstance(self.data, list):
df_sel = [
sel_df(self.data, num=self.number_of_load_cycles_for_analysis)
]
else:
df_sel = []
for d in self.data:
d_sel = sel_df(d, num=self.number_of_load_cycles_for_analysis)
df_sel.append(d_sel)
# replace data
self.data = df_sel
return True
def _fit_data(self):
self.fit = []
for idx_data, data in enumerate(self.data):
if data is None: continue
data.index = data.index - data.index[0]
res = {}
columns_analyse = [
'F',
's_hor_sum',
's_hor_1',
's_hor_2',
's_piston',
]
ylabel_dict = {
'F': 'Kraft in N',
's_hor_sum': 'Verformung (Summe) in mm',
's_piston': 'Verformung Kolbenweg in mm',
's_hor_1': 'Verformung ($S_1$) in mm',
's_hor_2': 'Verformung ($S_2$) in mm'
}
#fig, axs = plt.subplots(len(columns_analyse),
# 1,
# figsize=(8, len(columns_analyse) * 2),
# sharex=True)
for idxcol, col in enumerate(columns_analyse):
if not col in data.columns: continue
x = data.index.values
y = data[col].values
# Fourier Transformation
dt = np.diff(x).mean() #mean sampling rate
n = len(x)
res[f'psd_{col}'] = sfft.rfft(y) #compute the FFT
res[f'freq_{col}'] = sfft.rfftfreq(n, dt)
# Fitting
freq = np.round(data['f'].mean(), 3)
sigma = np.round(data['sigma'].mean(), 3)
res_step1 = fit_sin_anstieg(x, y)
mod = lm.models.Model(cosfunc)
mod.set_param_hint(
'd',
value=res_step1['offset'],
#min=res_step1['offset'] - 0.5*abs(res_step1['offset']),
#max=res_step1['offset'] + 0.5*abs(res_step1['offset'])
)
mod.set_param_hint(
'a',
value=res_step1['amp'],
#min=res_step1['amp'] - 0.5*abs(res_step1['amp']),
#max=res_step1['amp'] + 0.5*abs(res_step1['amp'])
)
mod.set_param_hint('b', value=res_step1['phase'])
mod.set_param_hint('e', value=0) #, min = -0.5, max = 0.5)
mod.set_param_hint('f', value=freq, vary=True)
parms_fit = [
mod.param_hints['a']['value'],
mod.param_hints['b']['value'],
mod.param_hints['d']['value'],
mod.param_hints['e']['value'],
mod.param_hints['f']['value']
]
abweichung = []
chis = []
chis_red = []
results = []
r2 = []
methods = ['leastsq', 'powell']
dof = len(y) - len(parms_fit)
for method in methods:
#print(method)
result = mod.fit(y, t=x, method=method, verbose=False)
r2temp = 1 - result.residual.var() / np.var(y)
# r2temp = result.redchi / np.var(yfit, ddof=2)
if r2temp < 0.:
r2temp = 0
r2.append(r2temp)
chi = result.chisqr
chis_red.append(result.redchi)
#print(chi)
abweichung.append(sf.gammaincc(dof / 2., chi / 2))
chis.append(chi)
results.append(result)
ret = {}
best = np.nanargmax(r2)
ret['model'] = mod
ret['x'] = x
ret['result'] = results[best]
res[f'r2_{col}'] = r2[best]
res[f'fit_a_{col}'] = results[best].best_values['a']
res[f'fit_b_{col}'] = results[best].best_values['b']
res[f'fit_d_{col}'] = results[best].best_values['d']
res[f'fit_e_{col}'] = results[best].best_values['e']
res[f'fit_f_{col}'] = results[best].best_values['f']
res['f'] = freq
res['sigma'] = sigma
res['filename'] = self.file
yreg = model_cos(x, res[f'fit_a_{col}'], res[f'fit_b_{col}'],
res[f'fit_d_{col}'], res[f'fit_e_{col}'],
res[f'fit_f_{col}'])
"""
plt.sca(axs[idxcol])
plt.plot(x, y, label='Messdaten')
r2 = res[f'r2_{col}']
lstr = f'Fit (r² = {r2:0.4f})'
plt.plot(x, yreg, label=lstr)
plt.xlim([x[0], x[-1]])
plt.xlabel('Zeit in s')
plt.ylabel(ylabel_dict[col])
plt.legend()
#data[['F', 's_hor_sum']].plot(subplots=True)
plt.tight_layout()
ofile = self.file
ofile = ofile.replace('/raw/', '/plots/')[:-4]
ofile = ofile + f'_{idx_data:03.0f}.pdf'
ofolder = os.path.split(ofile)[0]
if not os.path.exists(ofolder):
os.makedirs(ofolder)
plt.savefig(ofile)
plt.close()
"""
## Stiffness
deltaF = res['fit_a_F']
nu = 0.298
h = float(self.meta['speciment_height'])
deltaU = res['fit_a_s_hor_sum']
res['E'] = (deltaF * (0.274 + nu)) / (h * deltaU)
self.fit.append(res)
if self.debug:
break
self.fit = pd.DataFrame.from_records(self.fit)