From 6e4bef19cb7bfff87424b0ec26210c5caa542586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Clau=C3=9F?= Date: Tue, 27 Sep 2022 20:18:52 +0200 Subject: [PATCH] Sheartest added --- pytestpavement/__init__.py | 15 +- pytestpavement/analysis/regression.py | 12 +- pytestpavement/helper/filehasher.py | 18 +- pytestpavement/io/geosys.py | 17 +- pytestpavement/labtests/__init__.py | 2 + pytestpavement/{versuche => labtests}/base.py | 116 ++-- .../{versuche/spz.py => labtests/citt.py} | 0 pytestpavement/labtests/sheartest.py | 637 ++++++++++++++++++ pytestpavement/models/__init__.py | 7 - pytestpavement/models/data.py | 33 + pytestpavement/models/material.py | 2 +- pytestpavement/models/sheartest.py | 76 +-- pytestpavement/versuche/__init__.py | 9 - pytestpavement/versuche/fit.py | 5 - pytestpavement/versuche/schichtenverbund.py | 369 ---------- 15 files changed, 807 insertions(+), 511 deletions(-) create mode 100644 pytestpavement/labtests/__init__.py rename pytestpavement/{versuche => labtests}/base.py (87%) rename pytestpavement/{versuche/spz.py => labtests/citt.py} (100%) create mode 100644 pytestpavement/labtests/sheartest.py create mode 100644 pytestpavement/models/data.py delete mode 100644 pytestpavement/versuche/__init__.py delete mode 100644 pytestpavement/versuche/fit.py delete mode 100644 pytestpavement/versuche/schichtenverbund.py diff --git a/pytestpavement/__init__.py b/pytestpavement/__init__.py index 2edff70..97418af 100644 --- a/pytestpavement/__init__.py +++ b/pytestpavement/__init__.py @@ -3,18 +3,5 @@ from .analysis import * from .helper import * from .io import * +from .labtests import * from .models import * -from .versuche import * - -__all__ = [ - # IO - "connect_mongo_db", - "read_geosys", - # Versuche - "TestSchichtenverbundV2GeoSys", - "TestSchichtenverbundV2GeoSysExtractedEMPA", - # Analyse - "fit_cos_eval", - "fit_cos_simple", - "fit_cos", -] diff --git a/pytestpavement/analysis/regression.py b/pytestpavement/analysis/regression.py index d89118d..72eb6ef 100644 --- a/pytestpavement/analysis/regression.py +++ b/pytestpavement/analysis/regression.py @@ -73,7 +73,7 @@ def fit_cos_simple(x, y, freq=10.0): } -def fit_cos(x, y, freq=10.0): +def fit_cos(x, y, freq=10.0, constfreq=False): """ sine regression @@ -92,8 +92,6 @@ def fit_cos(x, y, freq=10.0): res_step1 = fit_cos_simple(x, y, freq=freq) # step 2: lmfit - res = {} - mod = lm.models.Model(cosfunc) mod.set_param_hint( @@ -106,9 +104,9 @@ def fit_cos(x, y, freq=10.0): mod.set_param_hint( 'w', value=freq, - vary=True, - #min=res_step1['freq'] - 0.3 * abs(res_step1['freq']), - #max=res_step1['freq'] + 0.3 * abs(res_step1['freq']) + vary=not constfreq, + #min=freq - 0.1 * freq, + #max=freq + 0.1 * freq, ) mod.set_param_hint('p', value=res_step1['phase'], vary=True) @@ -147,7 +145,7 @@ def fit_cos(x, y, freq=10.0): chis.append(chi) results.append(result) - ret = {} + res = {} best = np.nanargmax(r2) res[f'amp'] = results[best].best_values['A'] diff --git a/pytestpavement/helper/filehasher.py b/pytestpavement/helper/filehasher.py index 3b95e58..d384839 100644 --- a/pytestpavement/helper/filehasher.py +++ b/pytestpavement/helper/filehasher.py @@ -2,13 +2,9 @@ import hashlib from io import BytesIO -def calc_hash_of_file(file): +def calc_hash_of_bytes(buf: BytesIO): """ calculate the hash of the file """ - #read file - with open(file, 'rb') as fh: - buf = BytesIO(fh.read()) - algo = hashlib.sha1() buffer_size = 65536 @@ -23,3 +19,15 @@ def calc_hash_of_file(file): hex = algo.hexdigest() return hex + + +def calc_hash_of_file(file): + """ calculate the hash of the file """ + + #read file + with open(file, 'rb') as fh: + buf = BytesIO(fh.read()) + + hex = calc_hash_of_bytes(buf) + + return hex diff --git a/pytestpavement/io/geosys.py b/pytestpavement/io/geosys.py index a46b897..66f3a62 100644 --- a/pytestpavement/io/geosys.py +++ b/pytestpavement/io/geosys.py @@ -72,9 +72,12 @@ def read_geosys(filename, #Einlesen with open(filename, 'r', encoding=encoding) as inFile: reader = csv.reader(inFile, delimiter='\t') - for row in reader: - if len(row) > 2: - data.append(row) + try: + for row in reader: + if len(row) > 2: + data.append(row) + except: + pass if debug: print('Anz. Datensätze: ', str(len(data)), getsizeof(data)) @@ -189,14 +192,12 @@ def read_geosys(filename, print(data_head, data_units) ## Bezeichnungen der Daten normalisieren - data_head = normalice_header(data_head) - # Pandas DataFrame erstellen data = DataFrame(data=data, columns=data_head) if debug: print(data.head()) - data = data.set_index('t') + #data = data.set_index('t') #data._units = data_units @@ -205,10 +206,10 @@ def read_geosys(filename, data['N'] = data['N'].astype(int) # Daten sortieren - data.sort_index() + #data.sort_index() # Index normieren - data.index = data.index - data.index[0] + #data.index = data.index - data.index[0] return header, data diff --git a/pytestpavement/labtests/__init__.py b/pytestpavement/labtests/__init__.py new file mode 100644 index 0000000..4a53c8a --- /dev/null +++ b/pytestpavement/labtests/__init__.py @@ -0,0 +1,2 @@ +from .citt import * +from .sheartest import * diff --git a/pytestpavement/versuche/base.py b/pytestpavement/labtests/base.py similarity index 87% rename from pytestpavement/versuche/base.py rename to pytestpavement/labtests/base.py index ac232be..9d94f30 100644 --- a/pytestpavement/versuche/base.py +++ b/pytestpavement/labtests/base.py @@ -1,4 +1,14 @@ -from .fit import model_cos +import os +from csv import reader +from io import BytesIO + +import lmfit as lm +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import scipy.fft as sfft +from pytestpavement.analysis import cosfunc +from pytestpavement.helper.filehasher import calc_hash_of_file class DataSineLoad(): @@ -7,36 +17,45 @@ class DataSineLoad(): """ - def __init__(self, fname: str, debug: bool = False): + def __init__(self, + fname: str, + debug: bool = False, + roundtemperature: bool = True): - self.meta = {'d': 150, 'speciment_height': 60} + self.debug = debug + self.roundtemperature = roundtemperature self.file = fname - self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_hor_sum'] + self._run() - # Header names after standardization; check if exists - self.val_header_names = ['speciment_height'] + def _run(self): + self._file_exists() self._set_parameter() + self._define_units() - self._file_to_bytesio() self._calc_hash() self._read_data() self._standardize_data() self._standardize_meta() - if not debug: - self._calc_missiong_values() - self._set_units() + if self.roundtemperature: + self._replace_temperature() - self._validate_data() - self._postprocess_data() + self._calc_missiong_values() + self._set_units() - self._split_data() - self._select_data() + self._validate_data() + self._postprocess_data() - self._fit_data() + self._split_data() + self._select_data() + + self._fit_data() + + def _file_exists(self): + assert os.path.exists(self.file) def _set_parameter(self): @@ -45,47 +64,35 @@ class DataSineLoad(): self.col_as_int = ['N'] self.col_as_float = ['T', 'F', 's_piston', 's_hor_1', 'f', 's_hor_sum'] + self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_hor_sum'] + # Header names after standardization; check if exists + self.val_header_names = ['speciment_height'] + self.number_of_load_cycles_for_analysis = 5 + def _define_units(self): + self.unit_s = 1 #mm self.unit_F = 1 #N self.unit_t = 1 / 1000. #s - def _file_to_bytesio(self): - """ read data and save in memory """ - - with open(self.file, 'rb') as fh: - self.buf = BytesIO(fh.read()) - def _calc_hash(self): """ calculate the hash of the file """ + self.filehash = calc_hash_of_file(self.file) - #read t - - algo = hashlib.sha1() - - buffer_size = 65536 - buffer_size = buffer_size * 1024 * 1024 - - while True: - data = self.buf.read(buffer_size) - if not data: - break - algo.update(data) - - self.hex = algo.hexdigest() - self.buf.seek(0) - - def _read_data(self, encoding='latin-1', skiprows=14, hasunits=True): + def _read_data(self): """ read data from Labor Hart, Spaltzugversuche Steifigkeit """ + # parameter + encoding = 'latin-1' + skiprows = 14 + hasunits = True + splitsign = ':;' # metadata from file meta = {} - splitsign = ':;' - with open(self.file, 'r', encoding=encoding) as f: count = 0 @@ -230,6 +237,16 @@ class DataSineLoad(): return True + def _replace_temperature(self): + + temperatures = self.data['T'].unique() + + Tset = {} + for temperature in temperatures: + Tset[temperature] = round(temperature, -1) + + self.data['T'] = self.data['T'].replace(Tset) + def _calc_missiong_values(self): cols = self.data.columns @@ -292,7 +309,7 @@ class DataSineLoad(): N = df['N'].unique() if len(N) > num: - df_sel = df[(df['N'] >= N[-num - 1, ]) & (df['N'] <= N[-2, ])] + df_sel = df[(df['N'] >= N[-num - 1]) & (df['N'] <= N[-2])] return df_sel else: ValueError( @@ -342,10 +359,10 @@ class DataSineLoad(): 's_hor_2': 'Verformung ($S_2$) in mm' } - fig, axs = plt.subplots(len(columns_analyse), - 1, - figsize=(8, len(columns_analyse) * 2), - sharex=True) + #fig, axs = plt.subplots(len(columns_analyse), + # 1, + # figsize=(8, len(columns_analyse) * 2), + # sharex=True) for idxcol, col in enumerate(columns_analyse): @@ -367,7 +384,7 @@ class DataSineLoad(): res_step1 = fit_sin_anstieg(x, y) - mod = lm.models.Model(model_cos) + mod = lm.models.Model(cosfunc) mod.set_param_hint( 'd', @@ -443,7 +460,7 @@ class DataSineLoad(): yreg = model_cos(x, res[f'fit_a_{col}'], res[f'fit_b_{col}'], res[f'fit_d_{col}'], res[f'fit_e_{col}'], res[f'fit_f_{col}']) - + """ plt.sca(axs[idxcol]) plt.plot(x, y, label='Messdaten') @@ -469,6 +486,7 @@ class DataSineLoad(): plt.savefig(ofile) plt.close() + """ ## Stiffness deltaF = res['fit_a_F'] @@ -479,6 +497,8 @@ class DataSineLoad(): res['E'] = (deltaF * (0.274 + nu)) / (h * deltaU) self.fit.append(res) - #break + + if self.debug: + break self.fit = pd.DataFrame.from_records(self.fit) diff --git a/pytestpavement/versuche/spz.py b/pytestpavement/labtests/citt.py similarity index 100% rename from pytestpavement/versuche/spz.py rename to pytestpavement/labtests/citt.py diff --git a/pytestpavement/labtests/sheartest.py b/pytestpavement/labtests/sheartest.py new file mode 100644 index 0000000..3503476 --- /dev/null +++ b/pytestpavement/labtests/sheartest.py @@ -0,0 +1,637 @@ +import os + +import lmfit as lm +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import scipy.fft as sfft +import seaborn as sns +from pytestpavement.analysis.regression import fit_cos, fit_cos_eval +from pytestpavement.io.geosys import read_geosys +from pytestpavement.labtests.base import DataSineLoad +from pytestpavement.models.data import DataSheartest +from pytestpavement.models.sheartest import DynamicShearTestExtension + + +class ShearTest(DataSineLoad): + """ + Dynamic Shear Bounding Test + """ + + def __init__(self, + fname: str, + debug: bool = False, + gap_width: float = 1.0, + roundtemperature: bool = True): + + #set parameter + self.gap_width = gap_width + self.debug = debug + self.file = fname + self.roundtemperature = roundtemperature + + # process file + self._run() + + def plot_fited_data(self, opath=None, pkname=None, r2min=0.99): + + ylabel_dict = { + 'F': 'Kraft in N', + 's_vert_sum': 'norm. mittlerer Scherweg\n $S_{mittel}$ in mm', + 's_piston': 'norm. Kolbenweg\n in mm', + 's_vert_1': 'Scherweg\n $S_1$ in mm', + 's_vert_2': 'Scherweg\n $S_2$ in mm' + } + + columns_analyse = [ + 'F', + 's_vert_sum', + 's_vert_1', + 's_vert_2', + 's_piston', + ] + + if not (opath is None) & (pkname is None): + showplot = False + + opath = os.path.join(opath, pkname, 'raw_data') + if not os.path.exists(opath): + os.makedirs(opath) + + else: + showplot = True + + for i, fit in self.fit.iterrows(): + + if not any([fit['r2_F'] < r2min, fit['r2_s_vert_sum'] < r2min]): + continue + + data = self.data[int(fit['idx_data'])] + + if data is None: + continue + + freq = data['f'].unique()[0] + sigma = data['sigma_normal'].unique()[0] + s = data['extension'].unique()[0] + T = data['T'].unique()[0] + + fig, axs = plt.subplots(len(columns_analyse), + 1, + figsize=(8, len(columns_analyse) * 2), + sharex=True) + + for idxcol, col in enumerate(columns_analyse): + x, y = data.index, data[col] + + #add fit + f = self.fit.iloc[i] + parfit = {} + for k in ['amp', 'freq', 'phase', 'offset', 'slope']: + parfit[k] = f[f'fit_{k}_{col}'] + + yreg = fit_cos_eval(x, parfit) + + if col in ['s_piston', 's_vert_sum']: + y = y - np.mean(y) + yreg = yreg - np.mean(yreg) + + plt.sca(axs[idxcol]) + plt.plot(x, y, label='Messdaten') + + r2 = np.round(f[f'r2_{col}'], 3) + plt.plot(x, + yreg, + alpha=0.7, + label=f'Regression ($R^2 = {r2}$)') + + if not ('F' in col): + s = f['extension'] + parline = dict(lw=0.4, + ls='--', + color='lightgrey', + alpha=0.4, + label='Bereich des zul. Scherweges') + plt.axhspan(-s, s, **parline) + + if idxcol == len(columns_analyse) - 1: + plt.xlabel('Zeit in s') + + plt.ylabel(ylabel_dict[col]) + plt.legend() + + plt.tight_layout() + + if showplot: + plt.show() + break + + else: + ofile = f'{T}deg_{sigma}MPa_{freq}Hz_{s}mm'.replace('.', 'x') + ofile = os.path.join(opath, ofile + '.pdf') + + plt.savefig(ofile) + plt.close() + + +class ShearTestExtension(ShearTest): + + def runfit(self): + self._fit_data() + + def save(self, material1, material2, bounding, meta: dict): + + for i, fit in self.fit.iterrows(): + + data = self.data[int(fit['idx_data'])] + + #check if data in db + n = DynamicShearTestExtension.objects( + f=fit['f'], + sigma_normal=fit['sigma_normal'], + T=fit['T'], + extension=fit['extension'], + material1=material1, + material2=material2, + bounding=bounding, + filehash=self.filehash, + ).count() + if n > 0: continue + + #save data + rdata = DataSheartest( + time=data.index.values, + F=data['F'].values, + N=data['N'].values, + s_vert_1=data['s_vert_1'].values, + s_vert_2=data['s_vert_2'].values, + s_vert_sum=data['s_vert_sum'].values, + s_piston=data['s_piston'].values, + ).save() + + # save fit + + values = {} + for col in ['F', 's_vert_1', 's_vert_2', 's_vert_sum']: + values[f'fit_amp_{col}'] = fit[f'fit_amp_{col}'] + values[f'fit_freq_{col}'] = fit[f'fit_freq_{col}'] + values[f'fit_phase_{col}'] = fit[f'fit_phase_{col}'] + values[f'fit_offset_{col}'] = fit[f'fit_offset_{col}'] + values[f'fit_slope_{col}'] = fit[f'fit_slope_{col}'] + + values.update(meta) + + try: + r = DynamicShearTestExtension( + #metadata + f=fit['f'], + sigma_normal=fit['sigma_normal'], + T=fit['T'], + extension=fit['extension'], + filehash=self.filehash, + material1=material1, + material2=material2, + bounding=bounding, + #results + data=rdata, + stiffness=fit['G'], + # + **values).save() + except: + rdata.delete() + + def _set_parameter(self): + + self.split_data_based_on_parameter = [ + 'T', 'sigma_normal', 'f', 'extension' + ] + + self.col_as_int = ['N'] + self.col_as_float = ['T', 'F', 'f', 's_vert_sum'] + + self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_vert_sum'] + # Header names after standardization; check if exists + self.val_header_names = ['speciment_diameter'] + + self.columns_analyse = [ + 'F', 's_vert_sum', 's_vert_1', 's_vert_2', 's_piston' + ] + + self.number_of_load_cycles_for_analysis = 5 + + def _calc_missiong_values(self): + + cols = self.data.columns + + for c in ['vert']: + if not f's_{c}_sum' in cols: + self.data[f's_{c}_sum'] = self.data[[f's_{c}_1', f's_{c}_2' + ]].sum(axis=1).div(2.0) + + def _fit_data(self): + + self.fit = [] + + for idx_data, data in enumerate(self.data): + + if data is None: continue + + data.index = data.index - data.index[0] + + res = {} + res['idx_data'] = int(idx_data) + + # Fitting + freq = float(np.round(data['f'].mean(), 4)) + if (self.debug): + sigma_normal = np.round(data['sigma_normal'].mean(), 3) + T = np.round(data['T'].mean(), 3) + + for idxcol, col in enumerate(self.columns_analyse): + + if not col in data.columns: continue + + x = data.index.values + y = data[col].values + + # Fourier Transformation + """ + dt = np.diff(x).mean() #mean sampling rate + n = len(x) + + res[f'psd_{col}'] = sfft.rfft(y) #compute the FFT + res[f'freq_{col}'] = sfft.rfftfreq(n, dt) + """ + + res_fit = fit_cos(x, y, freq=freq, constfreq=True) + + res[f'r2_{col}'] = res_fit['r2'] + + res[f'fit_amp_{col}'] = res_fit['amp'] + res[f'fit_freq_{col}'] = res_fit['freq'] + res[f'fit_phase_{col}'] = res_fit['phase'] + res[f'fit_offset_{col}'] = res_fit['offset'] + res[f'fit_slope_{col}'] = res_fit['slope'] + + ## Schersteifigkeit berechnen + deltaF = res['fit_amp_F'] + deltaS = res['fit_amp_s_vert_sum'] + + A = np.pi * self.meta['speciment_diameter']**2 / 4 + tau = deltaF / A + gamma = deltaS / self.gap_width + + res['G'] = tau / gamma + + #metadaten + for c in ['T', 'extension', 'sigma_normal', 'f']: + res[c] = data[c][0] + + self.fit.append(res) + + if (self.debug) & (len(self.fit) > 5): + break + + self.fit = pd.DataFrame.from_records(self.fit) + + def plot_results(self, opath=None, pkname=None, r2min=0.96): + if not (opath is None) & (pkname is None): + showplot = False + + opath = os.path.join(opath, pkname) + if not os.path.exists(opath): + os.makedirs(opath) + else: + showplot = True + + dfplot = self.fit.copy() + for col in ['extension', 'fit_amp_s_vert_sum']: + dfplot[col] = dfplot[col].mul(1000) + + fig, ax = plt.subplots() + + xticks = list(dfplot['extension'].unique()) + + df = dfplot + df = df[(df['r2_F'] >= r2min) & (df['r2_s_vert_sum'] >= r2min)] + + sns.scatterplot( + data=df, + x='fit_amp_s_vert_sum', + y='G', + hue='T', + ax=ax, + alpha=0.7, + #size=150, + size="G", + sizes=(50, 160), + edgecolor='k', + palette='muted', + zorder=10) + + df = dfplot + df = df[(df['r2_F'] < r2min) & (df['r2_s_vert_sum'] < r2min)] + + if not df.empty: + sns.scatterplot(data=df, + x='fit_amp_s_vert_sum', + y='G', + facecolor='grey', + alpha=0.5, + legend=False, + zorder=1, + ax=ax) + + ax.set_xlabel('gemessene Scherwegamplitude in $\mu m$') + ax.set_ylabel('Scherseteifigkeit in MPa/mm') + + ax.set_xticks(xticks) + ax.grid() + + if not showplot: + ofile = os.path.join(opath, 'shearstiffness.pdf') + + plt.savefig(ofile) + plt.show() + + def plot_stats(self, opath=None, pkname=None, r2min=0.96): + if not (opath is None) & (pkname is None): + showplot = False + + opath = os.path.join(opath, pkname) + if not os.path.exists(opath): + os.makedirs(opath) + else: + showplot = True + + dfplot = self.fit.copy() + for col in ['extension', 'fit_amp_s_vert_sum']: + dfplot[col] = dfplot[col].mul(1000) + + #r2 + + df = self.fit + + fig, axs = plt.subplots(1, 2, sharey=True, sharex=True) + + parscatter = dict(palette='muted', alpha=0.7, edgecolor='k', lw=0.3) + + # r2 + ax = axs[0] + sns.scatterplot(data=df, + x='fit_amp_s_vert_sum', + y='r2_F', + hue='T', + ax=ax, + **parscatter) + ax.set_ylabel('Bestimmtheitsmaß $R^2$') + ax.set_title('Kraft') + + ax = axs[1] + sns.scatterplot(data=df, + x='fit_amp_s_vert_sum', + y='r2_s_vert_sum', + hue='T', + legend=False, + ax=ax, + **parscatter) + ax.set_ylabel('$R^2$ (S_{mittel})') + ax.set_title('mittlerer Scherweg') + + for ax in axs.flatten(): + ax.grid() + ax.set_xlabel('gemessene Scherwegamplitude in $\mu m$') + + plt.tight_layout() + + if not showplot: + ofile = os.path.join(opath, 'stats_r2.pdf') + plt.savefig(ofile) + plt.show() + + +class ShearTestExtensionLaborHart(ShearTestExtension): + + def _define_units(self): + + self.unit_F = 1 / 1000.0 #N + self.unit_t = 1 / 1000. #s + + def _set_units(self): + + #for col in ['F']: + # self.data[col] = self.data[col].mul(self.unit_F) + + for col in ['time']: + self.data[col] = self.data[col].mul(self.unit_t) + + return True + + def _read_data(self): + """ + read data from Labor Hart + """ + + # parameter + encoding = 'latin-1' + skiprows = 14 + hasunits = True + splitsign = ':;' + + # metadata from file + meta = {} + + with open(self.file, 'r', encoding=encoding) as f: + count = 0 + + for line in f: + count += 1 + + #remove whitespace + linesplit = line.strip() + linesplit = linesplit.split(splitsign) + + if len(linesplit) == 2: + + meta[linesplit[0]] = linesplit[1] + + if count >= skiprows: + break + + # data + data = pd.read_csv(self.file, + encoding=encoding, + skiprows=skiprows, + decimal=',', + sep=';') + + ## add header to df + with open(self.file, 'r', encoding=encoding) as f: + count = 0 + + for line in f: + count += 1 + + if count >= skiprows: + break + + head = line.split(';') + data.columns = head + + #clean data + data = data.dropna(axis=1) + + #define in class + self.meta = meta + self.data = data + return True + + def _standardize_meta(self): + + keys = list(self.meta.keys()) + for key in keys: + + if any(map(key.__contains__, ['Probenbezeichnung'])): + self.meta['speciment'] = self.meta.pop(key) + + elif any(map(key.__contains__, ['Datum/Uhrzeit'])): + self.meta['datetime'] = self.meta.pop(key) + try: + self.meta['datetime'] = pd.to_datetime( + self.meta['datetime']) + except: + pass + + elif any(map(key.__contains__, ['Probenhöhe'])): + self.meta['speciment_height'] = float( + self.meta.pop(key).replace(',', '.')) + elif any(map(key.__contains__, ['Probendurchmesser'])): + self.meta['speciment_diameter'] = float( + self.meta.pop(key).replace(',', '.')) + elif any(map(key.__contains__, ['Solltemperatur'])): + self.meta['temperature'] = float( + self.meta.pop(key).replace(',', '.')) + elif any(map(key.__contains__, ['Prüfbedingungen'])): + self.meta['test_version'] = self.meta.pop(key) + elif any(map(key.__contains__, ['Name des VersAblf'])): + self.meta['test'] = self.meta.pop(key) + elif any(map(key.__contains__, ['Prüfer'])): + self.meta['examiner'] = self.meta.pop(key) + + return True + + def _standardize_data(self): + + colnames = list(self.data.columns) + + for i, col in enumerate(colnames): + if col == 'TIME': + colnames[i] = 'time' + + #set values + elif col == 'Sollwert Frequenz': + colnames[i] = 'f' + elif col == 'SollTemperatur': + colnames[i] = 'T' + elif col == 'Max Scherweg': + colnames[i] = 'extension' + elif col == 'Sollwert Normalspannung': + colnames[i] = 'sigma_normal' + elif col == 'Impulsnummer': + colnames[i] = 'N' + + # measurements + + elif col == 'Load': + colnames[i] = 'F' + elif col == 'Position': + colnames[i] = 's_piston' + + elif col == 'VERTIKAL Links': + colnames[i] = 's_vert_1' + elif col == 'VERTIKAL Rechts': + colnames[i] = 's_vert_2' + + elif col == 'HORIZONTAL links': + colnames[i] = 's_hor_1' + + elif col == 'HOIZONTAL Rechts': + colnames[i] = 's_hor_2' + + self.data.columns = colnames + + +class ShearTestExtensionTUDresdenGeosys(ShearTestExtension): + + def _define_units(self): + + self.unit_S = 1 / 1000.0 #N + + def _set_units(self): + + for col in [ + 's_vert_sum', 's_vert_1', 's_vert_2', 's_piston', 'extension' + ]: + self.data[col] = self.data[col].mul(self.unit_S) + + return True + + def _read_data(self): + """ + read data from Labor Hart + """ + + # parameter + encoding = 'latin-1' + skiprows = 14 + hasunits = True + splitsign = ':;' + + head, data = read_geosys(self.file, '015') + + #define in class + self.meta = head + self.data = data + return True + + def _standardize_meta(self): + + keys = list(self.meta.keys()) + for key in keys: + + if key == 'd': + self.meta['speciment_diameter'] = self.meta.pop(key) + + return True + + def _standardize_data(self): + + colnames = list(self.data.columns) + + for i, col in enumerate(colnames): + + #set values + if col == 'soll temperature': + colnames[i] = 'T' + elif col == 'soll extension': + colnames[i] = 'extension' + elif col == 'soll sigma': + colnames[i] = 'sigma_normal' + elif col == 'soll frequency': + colnames[i] = 'f' + + elif col == 'Number of vertical cycles': + colnames[i] = 'N' + + # measurements + elif col == 'vertical load from hydraulic pressure': + colnames[i] = 'F' + elif col == 'vertical position from hydraulic pressure': + colnames[i] = 's_piston' + + elif col == 'Vertical position from LVDT 1': + colnames[i] = 's_vert_1' + elif col == 'Vertical position from LVDT 2': + colnames[i] = 's_vert_2' + + self.data.columns = colnames diff --git a/pytestpavement/models/__init__.py b/pytestpavement/models/__init__.py index 4b83126..2ea917c 100644 --- a/pytestpavement/models/__init__.py +++ b/pytestpavement/models/__init__.py @@ -1,10 +1,3 @@ from .citt import * from .material import * from .sheartest import * - -__all__ = [ - # Spaltzug - "CITTSiffness", - #Dynamischer Schertest - "DynamicShearTestExtension", -] diff --git a/pytestpavement/models/data.py b/pytestpavement/models/data.py new file mode 100644 index 0000000..f422ae0 --- /dev/null +++ b/pytestpavement/models/data.py @@ -0,0 +1,33 @@ +import datetime +from xml.dom.minidom import Document + +from mongoengine import * + + +class RawData(Document): + + date = DateTimeField(default=datetime.datetime.now, + wtf_options={"render_kw": { + "step": "60" + }}) + + meta = { + 'allow_inheritance': True, + 'index_opts': {}, + 'index_background': True, + 'index_cls': False, + 'auto_create_index': True, + 'collection': 'rawdata', + } + + +class DataSheartest(RawData): + + # data + time = ListField(FloatField()) + F = ListField(FloatField()) + N = ListField(IntField()) + s_vert_1 = ListField(FloatField()) + s_vert_2 = ListField(FloatField()) + s_vert_sum = ListField(FloatField()) + s_piston = ListField(FloatField()) diff --git a/pytestpavement/models/material.py b/pytestpavement/models/material.py index e9a4837..8a2506e 100644 --- a/pytestpavement/models/material.py +++ b/pytestpavement/models/material.py @@ -38,7 +38,7 @@ class Bitumen(Material): young_modulus = FloatField() -class Expoxy(Material): +class Epoxy(Material): name = StringField() material = StringField() diff --git a/pytestpavement/models/sheartest.py b/pytestpavement/models/sheartest.py index 3c0837a..eab5a5e 100644 --- a/pytestpavement/models/sheartest.py +++ b/pytestpavement/models/sheartest.py @@ -2,6 +2,7 @@ import datetime from mongoengine import * +from .data import DataSheartest from .material import Material @@ -22,10 +23,12 @@ class DynamicShearTest(Document): project = StringField(required=True) workpackage = StringField() - - material1 = ReferenceField(Material, required=True) - material2 = ReferenceField(Material, required=True) - bounding = ReferenceField(Material, required=True) + + material1 = LazyReferenceField(Material, required=True) + material2 = LazyReferenceField(Material, required=True) + bounding = LazyReferenceField(Material, required=True) + + gap_width = FloatField(default=1.0) meta = { 'allow_inheritance': True, @@ -40,45 +43,42 @@ class DynamicShearTest(Document): class DynamicShearTestExtension(DynamicShearTest): #metadata - f_set = FloatField() - sigma_hor_set = FloatField() - T_set = FloatField() - s_set = FloatField() + f = FloatField() + sigma_normal = FloatField() + T = FloatField() + extension = FloatField() #results + data = LazyReferenceField(DataSheartest, + required=True, + reverse_delete_rule=CASCADE) + stiffness = FloatField() #fit parameter ## F - fit_a_F = FloatField() - fit_b_F = FloatField() - fit_d_F = FloatField() - fit_e_F = FloatField() - fit_f_F = FloatField() - r2_F = FloatField() + fit_amp_F = FloatField() + fit_freq_F = FloatField() + fit_phase_F = FloatField() + fit_offset_F = FloatField() + fit_slope_F = FloatField() ## S1 - fit_a_s_hor_1 = FloatField() - fit_b_s_hor_1 = FloatField() - fit_d_s_hor_1 = FloatField() - fit_e_s_hor_1 = FloatField() - fit_f_s_hor_1 = FloatField() - r2_s_hor_1 = FloatField() + fit_amp_s_vert_1 = FloatField() + fit_freq_s_vert_1 = FloatField() + fit_phase_s_vert_1 = FloatField() + fit_offset_s_vert_1 = FloatField() + fit_slope_s_vert_1 = FloatField() + r2_s_vert_1 = FloatField() ## S2 - fit_a_s_hor_2 = FloatField() - fit_b_s_hor_2 = FloatField() - fit_d_s_hor_2 = FloatField() - fit_e_s_hor_2 = FloatField() - fit_f_s_hor_2 = FloatField() - r2_s_hor_2 = FloatField() + fit_amp_s_vert_2 = FloatField() + fit_freq_s_vert_2 = FloatField() + fit_phase_s_vert_2 = FloatField() + fit_offset_s_vert_2 = FloatField() + fit_slope_s_vert_2 = FloatField() + r2_s_vert_2 = FloatField() ## S-Sum - fit_a_s_hor_sum = FloatField() - fit_b_s_hor_sum = FloatField() - fit_d_s_hor_sum = FloatField() - fit_e_s_hor_sum = FloatField() - fit_f_s_hor_sum = FloatField() - r2_s_hor_sum = FloatField() - # data - time = ListField(FloatField()) - F = ListField(FloatField()) - N = ListField(IntField()) - s_hor_1 = ListField(FloatField()) - s_hor_2 = ListField(FloatField()) + fit_amp_s_vert_sum = FloatField() + fit_freq_s_vert_sum = FloatField() + fit_phase_s_vert_sum = FloatField() + fit_offset_s_vert_sum = FloatField() + fit_slope_s_vert_sum = FloatField() + r2_s_vert_sum = FloatField() diff --git a/pytestpavement/versuche/__init__.py b/pytestpavement/versuche/__init__.py deleted file mode 100644 index b5b524a..0000000 --- a/pytestpavement/versuche/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# versuche - -from .schichtenverbund import * - -__all__ = [ - "fit_single_data", - "TestSchichtenverbundV2GeoSys", - "TestSchichtenverbundV2GeoSysExtractedEMPA", -] diff --git a/pytestpavement/versuche/fit.py b/pytestpavement/versuche/fit.py deleted file mode 100644 index b9fe5ab..0000000 --- a/pytestpavement/versuche/fit.py +++ /dev/null @@ -1,5 +0,0 @@ -import numpy as np - - -def model_cos(t, a, b, d, e, f): - return a * np.cos(2 * np.pi * f * t + b) + e * t + d diff --git a/pytestpavement/versuche/schichtenverbund.py b/pytestpavement/versuche/schichtenverbund.py deleted file mode 100644 index 1ea9d7b..0000000 --- a/pytestpavement/versuche/schichtenverbund.py +++ /dev/null @@ -1,369 +0,0 @@ -import os -import sys -from multiprocessing import Pool, cpu_count - -import matplotlib.pyplot as plt -import numpy as np -import pandas as pd -from aenum import enum -from fsutil import exists -from pytestpavement.analysis import fit_cos, fit_cos_eval -from pytestpavement.io import read_geosys - - -def fit_single_data(g): - """ - iterate over data and fit - - """ - - try: - - i, d = g - - d = d.loc[i] - - #d = d.reset_index() - - Ns = d['N'].unique() - - e = d[(d['N'] > Ns[-7]) & (d['N'] <= Ns[-2])].copy() - - if e.empty: - return - - e.index = e.index - e.index[0] - - e = e.reset_index() - - res_par = {} - - res_par['T_set'] = i[0] - res_par['sigma_set'] = i[1] - res_par['f_set'] = float(i[2]) - res_par['ext_set'] = i[3] - - r2 = [] - for col in ['F', 's1', 's2']: - - x = e['time'].values - y = e[col].values - - res_step = fit_cos(x, y, freq=res_par['f_set']) - - r2.append(res_step['r2']) - - for key in res_step.keys(): - res_par[key + f'_{col}'] = res_step[key] - - except: - return - - return res_par - - -class TestSchichtenverbundV2GeoSys(): - """ - read and process test of type Schichtenverbund - - Configuration created for TU Dresden - - ... - - Attributes - ---------------- - filename : str - filename to read - tablenum : str - table number of geosys file - - Methodes - ------------- - - - Returns - ------------- - - """ - - def __init__(self, - filename: str, - diameter: float = 100.0, - spalt: float = 1.0, - tablenum: str = '013', - debug: bool = False, - plot_fit: bool = False, - plot_fit_error: bool = True): - - self.file = filename - - self.diameter = diameter - self.spalt = spalt - - self._tablenum = tablenum - - self._plot = plot_fit - self._plot_on_error = plot_fit_error - - self._debug = debug - - self.data = None - - self._check_file_exists() - self._run() - - def _run(self): - - if self._debug: - print('debug mode') - - self._read() - self._normalize_data() - self._set_units() - - self._check_data() - self._transform_data() - - self._fit_data() - - self._calc_Es() - - def __str__(self): - return f"filename: {self.file}, table number: {self._tablenum}" - - def _check_file_exists(self): - - assert os.path.exists(self.file) - - def _read(self): - - meta, data = read_geosys('./data/raw/TU Dresden/PK4.txt', - self._tablenum, - debug=self._debug) - - self.diameter = meta['d'] - - data = data.reset_index() - - self.data = data - - def _normalize_data(self): - - col = list(self.data.columns) - - for i, d in enumerate(col): - - if d == 't': - col[i] = 'time' - elif d == 'f': - col[i] = 'f_set' - elif d == 's_vert_1': - col[i] = 's1' - elif d == 's_vert_2': - col[i] = 's2' - - self.data.columns = col - - if self._debug: - print(self.data.columns) - - def _set_units(self): - - return - - def _check_data(self): - - must_have_values = [ - 'T_set', - 'sigma_set', - 'f_set', - 'ext_set', - 'time', - 'F', - 's1', - 's2', - 'N', - ] - - check = [item in self.data.columns for item in must_have_values] - - if not all(check): - print('Error in Parameters:') - for i, c in enumerate(check): - if c == False: - p = must_have_values[i] - print(f'\t - {p}') - - print(self.data.head()) - - assert all(check) - - pass - - # - - def _transform_data(self): - - self.data = self.data.set_index( - ['T_set', 'sigma_set', 'f_set', 'ext_set', 'time']).sort_index() - - def _fit_data(self): - - if not self._debug: - - with Pool(cpu_count()) as pool: - ret_list = pool.map( - fit_single_data, - [(i, d) for i, d in self.data.groupby(level=[0, 1, 2, 3])]) - - else: - - ret_list = [] - - for i, d in self.data.groupby(level=[0, 1, 2, 3]): - - ret_list.append(fit_single_data((i, d))) - - self.res = pd.DataFrame.from_dict( - [r for r in ret_list if isinstance(r, dict)]) - - self.res = self.res.set_index( - ['T_set', 'sigma_set', 'f_set', 'ext_set']).sort_index() - - #self.res.sort_index(axis=0, inplace=True) - #self.res.sort_index(axis=1, inplace=True) - - def _plot_single_data(self, i): - - ylabels = { - 'F': 'Force in N', - 's1': 'Displacement $s_1$ in $\mu m$', - 's2': 'Displacement $s_2$ in $\mu m$' - } - - par = self.res.loc[i].to_dict() - - df = self.data.loc[i] - Ns = df['N'].unique() - - e = df[(df['N'] > Ns[-7]) & (df['N'] <= Ns[-2])].copy() - - e.index = e.index - e.index[0] - - if e.empty: - return - - fig, axs = plt.subplots(3, 1, sharex=True) - fig.set_figheight(1.5 * fig.get_figheight()) - - for i, col in enumerate(['F', 's1', 's2']): - - ax = axs[i] - x, y = e.index, e[col] - - ax.plot(x, y, c='k', label='data') - - par_sel = [key for key in par.keys() if col in key] - - par_sel = dict((k.split('_')[0], par[k]) for k in par_sel) - - ys = fit_cos_eval(x, par_sel) - - r2_sel = np.round(par_sel['r2'], 3) - ax.plot(x, ys, c='C1', label=f'fit (R² = {r2_sel}') - - ax.legend(loc=0) - - ax.set_ylabel(ylabels[col]) - - ax.set_xlabel('Time in s') - plt.tight_layout() - plt.show() - - def plot_fitted_data(self, num: int = 7): - """ - plot fit - """ - - counter = 0 - - for i, r in self.res.groupby(level=[0, 1, 2, 3]): - - self._plot_single_data(i) - - counter += 1 - - if (num != None) & (counter >= num): - break - - def plot_fitted_data_error(self, rmin: float = 0.9): - """ - plot fit - """ - - sel_res = self.res[(self.res['r2_F'] <= rmin)] - - if sel_res.empty: - print('no errors') - return - - for i, r in sel_res.groupby(level=[0, 1, 2, 3]): - - self._plot_single_data(i) - - def _calc_Es(self): - - area = np.pi * self.diameter**2 / 4 - - ampF = self.res['amp_F'] - ampS = self.res[['amp_s1', 'amp_s2']].mean(axis=1) - - tau = ampF.div(area) - gamma = ampS.div(1000.0).div(self.spalt) - - self.res['Es'] = tau / gamma - - -class TestSchichtenverbundV2GeoSysExtractedEMPA(TestSchichtenverbundV2GeoSys): - - def _read(self): - if self._debug: - nrows = 15000 - else: - nrows = None - - self.data = pd.read_csv(self.file, sep='\t', decimal='.', nrows=nrows) - - self.data.drop(index=[0], inplace=True) - self.data.dropna(axis=0, inplace=True) - - for col in self.data.columns: - self.data[col] = pd.to_numeric(self.data[col]) - - def _normalize_data(self): - - col = list(self.data.columns) - - for i, d in enumerate(col): - - if 'soll temperature' in d: - col[i] = 'T_set' - elif 'soll sigma' in d: - col[i] = 'sigma_set' - elif 'soll frequency' in d: - col[i] = 'f_set' - elif 'soll extension' in d: - col[i] = 'ext_set' - - elif 'vertical load from hydraulic pressure' in d: - col[i] = 'F' - - elif 'Vertical position from LVDT 1' in d: - col[i] = 's1' - elif 'Vertical position from LVDT 2' in d: - col[i] = 's2' - - elif 'Zyklenzähler' in d: - col[i] = 'N' - - self.data.columns = col