import os from csv import reader from io import BytesIO import lmfit as lm import matplotlib.pyplot as plt import numpy as np import pandas as pd import scipy.fft as sfft from pytestpavement.analysis import cosfunc from pytestpavement.helper.filehasher import calc_hash_of_file class DataSineLoad(): """ Base class for lab tests with sine load """ def __init__(self, fname: str, debug: bool = False, roundtemperature: bool = True): self.debug = debug self.roundtemperature = roundtemperature self.file = fname self._run() def _run(self): self._file_exists() self._set_parameter() self._define_units() self._calc_hash() def fit(self): self._read_data() self._standardize_data() self._standardize_meta() if self.roundtemperature: self._replace_temperature() self._calc_missiong_values() self._set_units() self._validate_data() self._postprocess_data() self._split_data() self._select_data() self._fit_data() def _file_exists(self): assert os.path.exists(self.file) def _set_parameter(self): self.split_data_based_on_parameter = ['f'] self.col_as_int = ['N'] self.col_as_float = ['T', 'F', 's_piston', 's_hor_1', 'f', 's_hor_sum'] self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_hor_sum'] # Header names after standardization; check if exists self.val_header_names = ['speciment_height'] self.number_of_load_cycles_for_analysis = 5 def _define_units(self): self.unit_s = 1 #mm self.unit_F = 1 #N self.unit_t = 1 / 1000. #s def _calc_hash(self): """ calculate the hash of the file """ self.filehash = calc_hash_of_file(self.file) def _read_data(self): """ read data from Labor Hart, Spaltzugversuche Steifigkeit """ # parameter encoding = 'latin-1' skiprows = 14 hasunits = True splitsign = ':;' # metadata from file meta = {} with open(self.file, 'r', encoding=encoding) as f: count = 0 for line in f: count += 1 #remove whitespace linesplit = line.strip() linesplit = linesplit.split(splitsign) if len(linesplit) == 2: meta[linesplit[0]] = linesplit[1] if count >= skiprows: break # data data = pd.read_csv(self.file, encoding=encoding, skiprows=skiprows, decimal=',', sep=';') ## add header to df with open(self.file, 'r', encoding=encoding) as f: count = 0 for line in f: count += 1 if count >= skiprows: break head = line.split(';') data.columns = head sigma = float( os.path.split(self.file)[-1].split('MPa')[0].strip().replace( ',', '.')) data['sigma'] = sigma #clean data data = data.dropna(axis=1) #define in class self.meta = meta self.data = data return True def _validate_data(self): """ check if all column names are standardized""" cols = self.data.columns for val in self.val_header_names: if not val in self.meta.keys(): raise ValueError(f"{val} not in header of data") for col in self.val_col_names: if not col in cols: raise ValueError(f'{col} not standardized') def _standardize_meta(self): keys = list(self.meta.keys()) for key in keys: if any(map(key.__contains__, ['Probenbezeichnung'])): self.meta['speciment'] = self.meta.pop(key) elif any(map(key.__contains__, ['Datum/Uhrzeit'])): self.meta['datetime'] = self.meta.pop(key) try: self.meta['datetime'] = pd.to_datetime( self.meta['datetime']) except: pass elif any(map(key.__contains__, ['Probenhöhe'])): self.meta['speciment_height'] = float( self.meta.pop(key).replace(',', '.')) elif any(map(key.__contains__, ['Probendurchmesser'])): self.meta['speciment_diameter'] = float( self.meta.pop(key).replace(',', '.')) elif any(map(key.__contains__, ['Solltemperatur'])): self.meta['temperature'] = float( self.meta.pop(key).replace(',', '.')) elif any(map(key.__contains__, ['Prüfbedingungen'])): self.meta['test_version'] = self.meta.pop(key) elif any(map(key.__contains__, ['Name des VersAblf'])): self.meta['test'] = self.meta.pop(key) elif any(map(key.__contains__, ['Prüfer'])): self.meta['examiner'] = self.meta.pop(key) return True def _standardize_data(self): colnames = list(self.data.columns) for i, col in enumerate(colnames): if any(map(col.__contains__, ['TIME'])): colnames[i] = 'time' elif any(map(col.__contains__, ['Temperatur'])): colnames[i] = 'T' elif any(map(col.__contains__, ['Load'])): colnames[i] = 'F' elif any(map(col.__contains__, ['Position'])): colnames[i] = 's_piston' elif any(map(col.__contains__, ['FREQUENZ'])): colnames[i] = 'f' elif any(map(col.__contains__, ['mpulsnummer_fortlaufend'])): colnames[i] = 'Ncum' elif any(map(col.__contains__, ['Impulsnummer'])): colnames[i] = 'N' elif any(map(col.__contains__, ['SENSOR 4'])): colnames[i] = 's_hor_1' elif any(map(col.__contains__, ['SENSOR Extension'])): colnames[i] = 's_hor_2' self.data.columns = colnames def _set_units(self): for col in ['s_hor_sum', 's_hor_1', 's_hor_2']: self.data[col] = self.data[col].mul(self.unit_s) for col in ['F']: self.data[col] = self.data[col].mul(self.unit_F) for col in ['time']: self.data[col] = self.data[col].mul(self.unit_t) return True def _replace_temperature(self): temperatures = self.data['T'].unique() Tset = {} for temperature in temperatures: Tset[temperature] = round(temperature, -1) self.data['T'] = self.data['T'].replace(Tset) def _calc_missiong_values(self): cols = self.data.columns if not 's_hor_sum' in cols: self.data['s_hor_sum'] = self.data[['s_hor_1', 's_hor_2']].sum(axis=1) def _postprocess_data(self): #set dtypes: for col in self.col_as_int: self.data[col] = self.data[col].astype('int') for col in self.col_as_float: try: self.data[col] = self.data[col].astype('float') except: pass #set index self.data = self.data.set_index('time') return True def _split_data(self): data_gp = self.data.groupby(self.split_data_based_on_parameter) data_list = [] for idx, d in data_gp: idx_diff = np.diff(d.index) dt_mean = idx_diff.mean() gaps = idx_diff > (2 * dt_mean) has_gaps = any(gaps) if has_gaps == False: data_list.append(d) else: idx_gaps = (np.where(gaps)[0] - 1)[0] data_list.append(d.iloc[0:idx_gaps]) #add self. if len(data_list) == 0: self.num_tests = 0 self.data = data_list[0] else: self.num_tests = len(data_list) self.data = data_list #break def _select_data(self): """ select N load cycles from original data """ def sel_df(df, num=5): N = df['N'].unique() if len(N) > num: df_sel = df[(df['N'] >= N[-num - 1]) & (df['N'] <= N[-2])] return df_sel else: ValueError( 'Number of load cycles smaller than selectect values') if not isinstance(self.data, list): df_sel = [ sel_df(self.data, num=self.number_of_load_cycles_for_analysis) ] else: df_sel = [] for d in self.data: d_sel = sel_df(d, num=self.number_of_load_cycles_for_analysis) df_sel.append(d_sel) # replace data self.data = df_sel return True def _fit_data(self): self.fit = [] for idx_data, data in enumerate(self.data): if data is None: continue data.index = data.index - data.index[0] res = {} columns_analyse = [ 'F', 's_hor_sum', 's_hor_1', 's_hor_2', 's_piston', ] ylabel_dict = { 'F': 'Kraft in N', 's_hor_sum': 'Verformung (Summe) in mm', 's_piston': 'Verformung Kolbenweg in mm', 's_hor_1': 'Verformung ($S_1$) in mm', 's_hor_2': 'Verformung ($S_2$) in mm' } #fig, axs = plt.subplots(len(columns_analyse), # 1, # figsize=(8, len(columns_analyse) * 2), # sharex=True) for idxcol, col in enumerate(columns_analyse): if not col in data.columns: continue x = data.index.values y = data[col].values # Fourier Transformation dt = np.diff(x).mean() #mean sampling rate n = len(x) res[f'psd_{col}'] = sfft.rfft(y) #compute the FFT res[f'freq_{col}'] = sfft.rfftfreq(n, dt) # Fitting freq = np.round(data['f'].mean(), 3) sigma = np.round(data['sigma'].mean(), 3) res_step1 = fit_sin_anstieg(x, y) mod = lm.models.Model(cosfunc) mod.set_param_hint( 'd', value=res_step1['offset'], #min=res_step1['offset'] - 0.5*abs(res_step1['offset']), #max=res_step1['offset'] + 0.5*abs(res_step1['offset']) ) mod.set_param_hint( 'a', value=res_step1['amp'], #min=res_step1['amp'] - 0.5*abs(res_step1['amp']), #max=res_step1['amp'] + 0.5*abs(res_step1['amp']) ) mod.set_param_hint('b', value=res_step1['phase']) mod.set_param_hint('e', value=0) #, min = -0.5, max = 0.5) mod.set_param_hint('f', value=freq, vary=True) parms_fit = [ mod.param_hints['a']['value'], mod.param_hints['b']['value'], mod.param_hints['d']['value'], mod.param_hints['e']['value'], mod.param_hints['f']['value'] ] abweichung = [] chis = [] chis_red = [] results = [] r2 = [] methods = ['leastsq', 'powell'] dof = len(y) - len(parms_fit) for method in methods: #print(method) result = mod.fit(y, t=x, method=method, verbose=False) r2temp = 1 - result.residual.var() / np.var(y) # r2temp = result.redchi / np.var(yfit, ddof=2) if r2temp < 0.: r2temp = 0 r2.append(r2temp) chi = result.chisqr chis_red.append(result.redchi) #print(chi) abweichung.append(sf.gammaincc(dof / 2., chi / 2)) chis.append(chi) results.append(result) ret = {} best = np.nanargmax(r2) ret['model'] = mod ret['x'] = x ret['result'] = results[best] res[f'r2_{col}'] = r2[best] res[f'fit_a_{col}'] = results[best].best_values['a'] res[f'fit_b_{col}'] = results[best].best_values['b'] res[f'fit_d_{col}'] = results[best].best_values['d'] res[f'fit_e_{col}'] = results[best].best_values['e'] res[f'fit_f_{col}'] = results[best].best_values['f'] res['f'] = freq res['sigma'] = sigma res['filename'] = self.file yreg = model_cos(x, res[f'fit_a_{col}'], res[f'fit_b_{col}'], res[f'fit_d_{col}'], res[f'fit_e_{col}'], res[f'fit_f_{col}']) """ plt.sca(axs[idxcol]) plt.plot(x, y, label='Messdaten') r2 = res[f'r2_{col}'] lstr = f'Fit (r² = {r2:0.4f})' plt.plot(x, yreg, label=lstr) plt.xlim([x[0], x[-1]]) plt.xlabel('Zeit in s') plt.ylabel(ylabel_dict[col]) plt.legend() #data[['F', 's_hor_sum']].plot(subplots=True) plt.tight_layout() ofile = self.file ofile = ofile.replace('/raw/', '/plots/')[:-4] ofile = ofile + f'_{idx_data:03.0f}.pdf' ofolder = os.path.split(ofile)[0] if not os.path.exists(ofolder): os.makedirs(ofolder) plt.savefig(ofile) plt.close() """ ## Stiffness deltaF = res['fit_a_F'] nu = 0.298 h = float(self.meta['speciment_height']) deltaU = res['fit_a_s_hor_sum'] res['E'] = (deltaF * (0.274 + nu)) / (h * deltaU) self.fit.append(res) if self.debug: break self.fit = pd.DataFrame.from_records(self.fit)