515 lines
15 KiB
Python
515 lines
15 KiB
Python
import os
|
|
from csv import reader
|
|
from io import BytesIO
|
|
|
|
import lmfit as lm
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
import pandas as pd
|
|
import scipy.fft as sfft
|
|
from pytestpavement.analysis import cosfunc
|
|
from pytestpavement.helper.filehasher import calc_hash_of_file
|
|
|
|
|
|
class DataSineLoad():
|
|
"""
|
|
Base class for lab tests with sine load
|
|
|
|
"""
|
|
|
|
def __init__(self,
|
|
fname: str,
|
|
debug: bool = False,
|
|
roundtemperature: bool = True):
|
|
|
|
self.debug = debug
|
|
self.roundtemperature = roundtemperature
|
|
|
|
self.file = fname
|
|
|
|
self._run()
|
|
|
|
def _run(self):
|
|
self._file_exists()
|
|
self._set_parameter()
|
|
self._define_units()
|
|
|
|
self._calc_hash()
|
|
|
|
def fit(self):
|
|
self._read_data()
|
|
|
|
self._standardize_data()
|
|
self._standardize_meta()
|
|
|
|
if self.roundtemperature:
|
|
self._replace_temperature()
|
|
|
|
self._calc_missiong_values()
|
|
self._set_units()
|
|
|
|
self._validate_data()
|
|
self._postprocess_data()
|
|
|
|
self._split_data()
|
|
self._select_data()
|
|
|
|
self._fit_data()
|
|
|
|
def _file_exists(self):
|
|
assert os.path.exists(self.file)
|
|
|
|
def _set_parameter(self):
|
|
|
|
self.split_data_based_on_parameter = ['f']
|
|
|
|
self.col_as_int = ['N']
|
|
self.col_as_float = ['T', 'F', 's_piston', 's_hor_1', 'f', 's_hor_sum']
|
|
|
|
self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_hor_sum']
|
|
# Header names after standardization; check if exists
|
|
self.val_header_names = ['speciment_height']
|
|
|
|
self.number_of_load_cycles_for_analysis = 5
|
|
|
|
def _define_units(self):
|
|
|
|
self.unit_s = 1 #mm
|
|
self.unit_F = 1 #N
|
|
self.unit_t = 1 / 1000. #s
|
|
|
|
def _calc_hash(self):
|
|
""" calculate the hash of the file """
|
|
self.filehash = calc_hash_of_file(self.file)
|
|
|
|
def _read_data(self):
|
|
"""
|
|
read data from Labor Hart, Spaltzugversuche Steifigkeit
|
|
"""
|
|
# parameter
|
|
encoding = 'latin-1'
|
|
skiprows = 14
|
|
hasunits = True
|
|
splitsign = ':;'
|
|
|
|
# metadata from file
|
|
meta = {}
|
|
|
|
with open(self.file, 'r', encoding=encoding) as f:
|
|
count = 0
|
|
|
|
for line in f:
|
|
count += 1
|
|
|
|
#remove whitespace
|
|
linesplit = line.strip()
|
|
linesplit = linesplit.split(splitsign)
|
|
|
|
if len(linesplit) == 2:
|
|
|
|
meta[linesplit[0]] = linesplit[1]
|
|
|
|
if count >= skiprows:
|
|
break
|
|
|
|
# data
|
|
data = pd.read_csv(self.file,
|
|
encoding=encoding,
|
|
skiprows=skiprows,
|
|
decimal=',',
|
|
sep=';')
|
|
|
|
## add header to df
|
|
with open(self.file, 'r', encoding=encoding) as f:
|
|
count = 0
|
|
|
|
for line in f:
|
|
count += 1
|
|
|
|
if count >= skiprows:
|
|
break
|
|
|
|
head = line.split(';')
|
|
data.columns = head
|
|
|
|
sigma = float(
|
|
os.path.split(self.file)[-1].split('MPa')[0].strip().replace(
|
|
',', '.'))
|
|
data['sigma'] = sigma
|
|
|
|
#clean data
|
|
data = data.dropna(axis=1)
|
|
|
|
#define in class
|
|
self.meta = meta
|
|
self.data = data
|
|
return True
|
|
|
|
def _validate_data(self):
|
|
""" check if all column names are standardized"""
|
|
|
|
cols = self.data.columns
|
|
|
|
for val in self.val_header_names:
|
|
if not val in self.meta.keys():
|
|
raise ValueError(f"{val} not in header of data")
|
|
|
|
for col in self.val_col_names:
|
|
if not col in cols:
|
|
raise ValueError(f'{col} not standardized')
|
|
|
|
def _standardize_meta(self):
|
|
|
|
keys = list(self.meta.keys())
|
|
for key in keys:
|
|
|
|
if any(map(key.__contains__, ['Probenbezeichnung'])):
|
|
self.meta['speciment'] = self.meta.pop(key)
|
|
|
|
elif any(map(key.__contains__, ['Datum/Uhrzeit'])):
|
|
self.meta['datetime'] = self.meta.pop(key)
|
|
try:
|
|
self.meta['datetime'] = pd.to_datetime(
|
|
self.meta['datetime'])
|
|
except:
|
|
pass
|
|
|
|
elif any(map(key.__contains__, ['Probenhöhe'])):
|
|
self.meta['speciment_height'] = float(
|
|
self.meta.pop(key).replace(',', '.'))
|
|
elif any(map(key.__contains__, ['Probendurchmesser'])):
|
|
self.meta['speciment_diameter'] = float(
|
|
self.meta.pop(key).replace(',', '.'))
|
|
elif any(map(key.__contains__, ['Solltemperatur'])):
|
|
self.meta['temperature'] = float(
|
|
self.meta.pop(key).replace(',', '.'))
|
|
elif any(map(key.__contains__, ['Prüfbedingungen'])):
|
|
self.meta['test_version'] = self.meta.pop(key)
|
|
elif any(map(key.__contains__, ['Name des VersAblf'])):
|
|
self.meta['test'] = self.meta.pop(key)
|
|
elif any(map(key.__contains__, ['Prüfer'])):
|
|
self.meta['examiner'] = self.meta.pop(key)
|
|
|
|
return True
|
|
|
|
def _standardize_data(self):
|
|
|
|
colnames = list(self.data.columns)
|
|
|
|
for i, col in enumerate(colnames):
|
|
if any(map(col.__contains__, ['TIME'])):
|
|
colnames[i] = 'time'
|
|
|
|
elif any(map(col.__contains__, ['Temperatur'])):
|
|
colnames[i] = 'T'
|
|
|
|
elif any(map(col.__contains__, ['Load'])):
|
|
colnames[i] = 'F'
|
|
|
|
elif any(map(col.__contains__, ['Position'])):
|
|
colnames[i] = 's_piston'
|
|
|
|
elif any(map(col.__contains__, ['FREQUENZ'])):
|
|
colnames[i] = 'f'
|
|
|
|
elif any(map(col.__contains__, ['mpulsnummer_fortlaufend'])):
|
|
colnames[i] = 'Ncum'
|
|
|
|
elif any(map(col.__contains__, ['Impulsnummer'])):
|
|
colnames[i] = 'N'
|
|
|
|
elif any(map(col.__contains__, ['SENSOR 4'])):
|
|
colnames[i] = 's_hor_1'
|
|
|
|
elif any(map(col.__contains__, ['SENSOR Extension'])):
|
|
colnames[i] = 's_hor_2'
|
|
|
|
self.data.columns = colnames
|
|
|
|
def _set_units(self):
|
|
|
|
for col in ['s_hor_sum', 's_hor_1', 's_hor_2']:
|
|
self.data[col] = self.data[col].mul(self.unit_s)
|
|
|
|
for col in ['F']:
|
|
self.data[col] = self.data[col].mul(self.unit_F)
|
|
|
|
for col in ['time']:
|
|
self.data[col] = self.data[col].mul(self.unit_t)
|
|
|
|
return True
|
|
|
|
def _replace_temperature(self):
|
|
|
|
temperatures = self.data['T'].unique()
|
|
|
|
Tset = {}
|
|
for temperature in temperatures:
|
|
Tset[temperature] = round(temperature, -1)
|
|
|
|
self.data['T'] = self.data['T'].replace(Tset)
|
|
|
|
def _calc_missiong_values(self):
|
|
|
|
cols = self.data.columns
|
|
|
|
if not 's_hor_sum' in cols:
|
|
self.data['s_hor_sum'] = self.data[['s_hor_1',
|
|
's_hor_2']].sum(axis=1)
|
|
|
|
def _postprocess_data(self):
|
|
|
|
#set dtypes:
|
|
for col in self.col_as_int:
|
|
self.data[col] = self.data[col].astype('int')
|
|
for col in self.col_as_float:
|
|
try:
|
|
self.data[col] = self.data[col].astype('float')
|
|
except:
|
|
pass
|
|
|
|
#set index
|
|
self.data = self.data.set_index('time')
|
|
|
|
return True
|
|
|
|
def _split_data(self):
|
|
|
|
data_gp = self.data.groupby(self.split_data_based_on_parameter)
|
|
|
|
data_list = []
|
|
|
|
for idx, d in data_gp:
|
|
|
|
idx_diff = np.diff(d.index)
|
|
dt_mean = idx_diff.mean()
|
|
|
|
gaps = idx_diff > (2 * dt_mean)
|
|
has_gaps = any(gaps)
|
|
|
|
if has_gaps == False:
|
|
data_list.append(d)
|
|
|
|
else:
|
|
idx_gaps = (np.where(gaps)[0] - 1)[0]
|
|
data_list.append(d.iloc[0:idx_gaps])
|
|
|
|
#add self.
|
|
if len(data_list) == 0:
|
|
self.num_tests = 0
|
|
self.data = data_list[0]
|
|
|
|
else:
|
|
self.num_tests = len(data_list)
|
|
self.data = data_list
|
|
#break
|
|
|
|
def _select_data(self):
|
|
""" select N load cycles from original data """
|
|
|
|
def sel_df(df, num=5):
|
|
N = df['N'].unique()
|
|
|
|
if len(N) > num:
|
|
df_sel = df[(df['N'] >= N[-num - 1]) & (df['N'] <= N[-2])]
|
|
return df_sel
|
|
else:
|
|
ValueError(
|
|
'Number of load cycles smaller than selectect values')
|
|
|
|
if not isinstance(self.data, list):
|
|
if self.number_of_load_cycles_for_analysis > 1:
|
|
df_sel = [
|
|
sel_df(self.data,
|
|
num=self.number_of_load_cycles_for_analysis)
|
|
]
|
|
else:
|
|
df_sel = [self.data]
|
|
|
|
else:
|
|
df_sel = []
|
|
for d in self.data:
|
|
if self.number_of_load_cycles_for_analysis > 1:
|
|
d_sel = sel_df(d,
|
|
num=self.number_of_load_cycles_for_analysis)
|
|
else:
|
|
d_sel = d
|
|
|
|
df_sel.append(d_sel)
|
|
|
|
# replace data
|
|
self.data = df_sel
|
|
|
|
return True
|
|
|
|
def _fit_data(self):
|
|
|
|
self.fit = []
|
|
|
|
for idx_data, data in enumerate(self.data):
|
|
|
|
if data is None: continue
|
|
|
|
data.index = data.index - data.index[0]
|
|
|
|
res = {}
|
|
|
|
columns_analyse = [
|
|
'F',
|
|
's_hor_sum',
|
|
's_hor_1',
|
|
's_hor_2',
|
|
's_piston',
|
|
]
|
|
ylabel_dict = {
|
|
'F': 'Kraft in N',
|
|
's_hor_sum': 'Verformung (Summe) in mm',
|
|
's_piston': 'Verformung Kolbenweg in mm',
|
|
's_hor_1': 'Verformung ($S_1$) in mm',
|
|
's_hor_2': 'Verformung ($S_2$) in mm'
|
|
}
|
|
|
|
#fig, axs = plt.subplots(len(columns_analyse),
|
|
# 1,
|
|
# figsize=(8, len(columns_analyse) * 2),
|
|
# sharex=True)
|
|
|
|
for idxcol, col in enumerate(columns_analyse):
|
|
|
|
if not col in data.columns: continue
|
|
|
|
x = data.index.values
|
|
y = data[col].values
|
|
|
|
# Fourier Transformation
|
|
dt = np.diff(x).mean() #mean sampling rate
|
|
n = len(x)
|
|
|
|
res[f'psd_{col}'] = sfft.rfft(y) #compute the FFT
|
|
res[f'freq_{col}'] = sfft.rfftfreq(n, dt)
|
|
|
|
# Fitting
|
|
freq = np.round(data['f'].mean(), 3)
|
|
sigma = np.round(data['sigma'].mean(), 3)
|
|
|
|
res_step1 = fit_sin_anstieg(x, y)
|
|
|
|
mod = lm.models.Model(cosfunc)
|
|
|
|
mod.set_param_hint(
|
|
'd',
|
|
value=res_step1['offset'],
|
|
#min=res_step1['offset'] - 0.5*abs(res_step1['offset']),
|
|
#max=res_step1['offset'] + 0.5*abs(res_step1['offset'])
|
|
)
|
|
|
|
mod.set_param_hint(
|
|
'a',
|
|
value=res_step1['amp'],
|
|
#min=res_step1['amp'] - 0.5*abs(res_step1['amp']),
|
|
#max=res_step1['amp'] + 0.5*abs(res_step1['amp'])
|
|
)
|
|
|
|
mod.set_param_hint('b', value=res_step1['phase'])
|
|
mod.set_param_hint('e', value=0) #, min = -0.5, max = 0.5)
|
|
|
|
mod.set_param_hint('f', value=freq, vary=True)
|
|
|
|
parms_fit = [
|
|
mod.param_hints['a']['value'],
|
|
mod.param_hints['b']['value'],
|
|
mod.param_hints['d']['value'],
|
|
mod.param_hints['e']['value'],
|
|
mod.param_hints['f']['value']
|
|
]
|
|
|
|
abweichung = []
|
|
chis = []
|
|
chis_red = []
|
|
results = []
|
|
r2 = []
|
|
|
|
methods = ['leastsq', 'powell']
|
|
dof = len(y) - len(parms_fit)
|
|
|
|
for method in methods:
|
|
#print(method)
|
|
result = mod.fit(y, t=x, method=method, verbose=False)
|
|
r2temp = 1 - result.residual.var() / np.var(y)
|
|
# r2temp = result.redchi / np.var(yfit, ddof=2)
|
|
if r2temp < 0.:
|
|
r2temp = 0
|
|
r2.append(r2temp)
|
|
chi = result.chisqr
|
|
chis_red.append(result.redchi)
|
|
#print(chi)
|
|
abweichung.append(sf.gammaincc(dof / 2., chi / 2))
|
|
chis.append(chi)
|
|
results.append(result)
|
|
|
|
ret = {}
|
|
best = np.nanargmax(r2)
|
|
|
|
ret['model'] = mod
|
|
ret['x'] = x
|
|
ret['result'] = results[best]
|
|
|
|
res[f'r2_{col}'] = r2[best]
|
|
|
|
res[f'fit_a_{col}'] = results[best].best_values['a']
|
|
res[f'fit_b_{col}'] = results[best].best_values['b']
|
|
res[f'fit_d_{col}'] = results[best].best_values['d']
|
|
res[f'fit_e_{col}'] = results[best].best_values['e']
|
|
res[f'fit_f_{col}'] = results[best].best_values['f']
|
|
|
|
res['f'] = freq
|
|
res['sigma'] = sigma
|
|
|
|
res['filename'] = self.file
|
|
|
|
yreg = model_cos(x, res[f'fit_a_{col}'], res[f'fit_b_{col}'],
|
|
res[f'fit_d_{col}'], res[f'fit_e_{col}'],
|
|
res[f'fit_f_{col}'])
|
|
"""
|
|
plt.sca(axs[idxcol])
|
|
plt.plot(x, y, label='Messdaten')
|
|
|
|
r2 = res[f'r2_{col}']
|
|
lstr = f'Fit (r² = {r2:0.4f})'
|
|
plt.plot(x, yreg, label=lstr)
|
|
plt.xlim([x[0], x[-1]])
|
|
plt.xlabel('Zeit in s')
|
|
|
|
plt.ylabel(ylabel_dict[col])
|
|
plt.legend()
|
|
|
|
#data[['F', 's_hor_sum']].plot(subplots=True)
|
|
plt.tight_layout()
|
|
|
|
ofile = self.file
|
|
ofile = ofile.replace('/raw/', '/plots/')[:-4]
|
|
ofile = ofile + f'_{idx_data:03.0f}.pdf'
|
|
|
|
ofolder = os.path.split(ofile)[0]
|
|
if not os.path.exists(ofolder):
|
|
os.makedirs(ofolder)
|
|
|
|
plt.savefig(ofile)
|
|
plt.close()
|
|
"""
|
|
|
|
## Stiffness
|
|
deltaF = res['fit_a_F']
|
|
nu = 0.298
|
|
h = float(self.meta['speciment_height'])
|
|
deltaU = res['fit_a_s_hor_sum']
|
|
|
|
res['E'] = (deltaF * (0.274 + nu)) / (h * deltaU)
|
|
|
|
self.fit.append(res)
|
|
|
|
if self.debug:
|
|
break
|
|
|
|
self.fit = pd.DataFrame.from_records(self.fit)
|