import os import tempfile import lmfit as lm import matplotlib.pyplot as plt import numpy as np import pandas as pd import py7zr import scipy.fft as sfft import seaborn as sns from pytestpavement.analysis.regression import fit_cos, fit_cos_eval from pytestpavement.io.geosys import read_geosys from pytestpavement.io.minio import MinioClient from pytestpavement.labtests.base import DataSineLoad from pytestpavement.models.data import DataSheartest from pytestpavement.models.sheartest import DynamicShearTestExtension class ShearTest(DataSineLoad): """ Dynamic Shear Bounding Test """ def __init__(self, fname: str, debug: bool = False, gap_width: float = 1.0, roundtemperature: bool = True, archive_file=False, s3_params: dict = {}): #set parameter self.gap_width = gap_width self.debug = debug self.file = fname self.roundtemperature = roundtemperature self.archive_file = archive_file self.s3_params = s3_params # process file self._run() def plot_fited_data(self, opath=None, pkname=None, r2min=0.99): ylabel_dict = { 'F': 'Kraft in N', 's_vert_sum': 'norm. mittlerer Scherweg\n $S_{mittel}$ in mm', 's_piston': 'norm. Kolbenweg\n in mm', 's_vert_1': 'Scherweg\n $S_1$ in mm', 's_vert_2': 'Scherweg\n $S_2$ in mm' } columns_analyse = [ 'F', 's_vert_sum', 's_vert_1', 's_vert_2', 's_piston', ] if not (opath is None) & (pkname is None): showplot = False opath = os.path.join(opath, pkname, 'raw_data') if not os.path.exists(opath): os.makedirs(opath) else: showplot = True for i, fit in self.fit.iterrows(): if not any([fit['r2_F'] < r2min, fit['r2_s_vert_sum'] < r2min]): continue data = self.data[int(fit['idx_data'])] if data is None: continue freq = data['f'].unique()[0] sigma = data['sigma_normal'].unique()[0] s = data['extension'].unique()[0] T = data['T'].unique()[0] fig, axs = plt.subplots(len(columns_analyse), 1, figsize=(8, len(columns_analyse) * 2), sharex=True) for idxcol, col in enumerate(columns_analyse): x, y = data.index, data[col] #add fit f = self.fit.iloc[i] parfit = {} for k in ['amp', 'freq', 'phase', 'offset', 'slope']: parfit[k] = f[f'fit_{k}_{col}'] yreg = fit_cos_eval(x, parfit) if col in ['s_piston', 's_vert_sum']: y = y - np.mean(y) yreg = yreg - np.mean(yreg) plt.sca(axs[idxcol]) plt.plot(x, y, label='Messdaten') r2 = np.round(f[f'r2_{col}'], 3) plt.plot(x, yreg, alpha=0.7, label=f'Regression ($R^2 = {r2}$)') if not ('F' in col): s = f['extension'] parline = dict(lw=0.4, ls='--', color='lightgrey', alpha=0.4, label='Bereich des zul. Scherweges') plt.axhspan(-s, s, **parline) if idxcol == len(columns_analyse) - 1: plt.xlabel('Zeit in s') plt.ylabel(ylabel_dict[col]) plt.legend() plt.tight_layout() if showplot: plt.show() break else: ofile = f'{T}deg_{sigma}MPa_{freq}Hz_{s}mm'.replace('.', 'x') ofile = os.path.join(opath, ofile + '.pdf') plt.savefig(ofile) plt.close() class ShearTestExtension(ShearTest): def runfit(self): self._fit_data() def file_in_db(self): n = DynamicShearTestExtension.objects(filehash=self.filehash).count() if n > 0: return True else: return False def save(self, material1, material2, bounding, meta: dict): for i, fit in self.fit.iterrows(): data = self.data[int(fit['idx_data'])] #check if data in db n = DynamicShearTestExtension.objects( f=fit['f'], sigma_normal=fit['sigma_normal'], T=fit['T'], extension=fit['extension'], material1=material1, material2=material2, bounding=bounding, filehash=self.filehash, ).count() if n > 0: continue # save fit values = {} for col in ['F', 's_vert_1', 's_vert_2', 's_vert_sum']: values[f'fit_amp_{col}'] = fit[f'fit_amp_{col}'] values[f'fit_freq_{col}'] = fit[f'fit_freq_{col}'] values[f'fit_phase_{col}'] = fit[f'fit_phase_{col}'] values[f'fit_offset_{col}'] = fit[f'fit_offset_{col}'] values[f'fit_slope_{col}'] = fit[f'fit_slope_{col}'] values[f'r2_{col}'] = fit[f'r2_{col}'] values.update(meta) try: r = DynamicShearTestExtension( #metadata f=fit['f'], sigma_normal=fit['sigma_normal'], T=fit['T'], extension=fit['extension'], filehash=self.filehash, material1=material1, material2=material2, bounding=bounding, #results stiffness=fit['G'], # **values).save() #save raw data rdata = DataSheartest( result_id=r.id, time=data.index.values, F=data['F'].values, N=data['N'].values, s_vert_1=data['s_vert_1'].values, s_vert_2=data['s_vert_2'].values, s_vert_sum=data['s_vert_sum'].values, s_piston=data['s_piston'].values, ).save() except: print('error saving data') raise rdata.delete() if self.archive_file: mclient = MinioClient(self.s3_params['S3_URL'], self.s3_params['S3_ACCESS_KEY'], self.s3_params['S3_SECRET_KEY']) bucket = str(meta['org_id']) mclient.create_bucket(bucket) extension = os.path.splitext(self.file)[-1] ofilename = self.filehash + extension outpath = 'sheartest' metadata_s3 = { 'project_id': str(meta['project_id']),· 'user_id': str(meta['user_id']), 'filename': os.path.split(self.file)[-1], 'speciment': meta['speciment_name'] } #compress data to tmpfolder with tempfile.TemporaryDirectory() as tmpdirname: ofilename_compressed = ofilename + '.7z' compressed_file = os.path.join(tmpdirname, ofilename_compressed) with py7zr.SevenZipFile(compressed_file, 'w') as archive: archive.writeall(self.file) mclient.upload_file(bucket, compressed_file, ofilename_compressed, outpath=outpath, content_type="application/raw", metadata=metadata_s3) def _set_parameter(self): self.split_data_based_on_parameter = [ 'T', 'sigma_normal', 'f', 'extension' ] self.col_as_int = ['N'] self.col_as_float = ['T', 'F', 'f', 's_vert_sum'] self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_vert_sum'] # Header names after standardization; check if exists self.val_header_names = ['speciment_diameter'] self.columns_analyse = [ 'F', 's_vert_sum', 's_vert_1', 's_vert_2', 's_piston' ] self.number_of_load_cycles_for_analysis = 5 def _calc_missiong_values(self): cols = self.data.columns for c in ['vert']: if not f's_{c}_sum' in cols: self.data[f's_{c}_sum'] = self.data[[f's_{c}_1', f's_{c}_2' ]].sum(axis=1).div(2.0) def _fit_data(self): self.fit = [] for idx_data, data in enumerate(self.data): if data is None: continue data.index = data.index - data.index[0] res = {} res['idx_data'] = int(idx_data) # Fitting freq = float(np.round(data['f'].mean(), 4)) if (self.debug): sigma_normal = np.round(data['sigma_normal'].mean(), 3) T = np.round(data['T'].mean(), 3) for idxcol, col in enumerate(self.columns_analyse): if not col in data.columns: continue x = data.index.values y = data[col].values # Fourier Transformation """ dt = np.diff(x).mean() #mean sampling rate n = len(x) res[f'psd_{col}'] = sfft.rfft(y) #compute the FFT res[f'freq_{col}'] = sfft.rfftfreq(n, dt) """ res_fit = fit_cos(x, y, freq=freq, constfreq=True) res[f'r2_{col}'] = res_fit['r2'] res[f'fit_amp_{col}'] = res_fit['amp'] res[f'fit_freq_{col}'] = res_fit['freq'] res[f'fit_phase_{col}'] = res_fit['phase'] res[f'fit_offset_{col}'] = res_fit['offset'] res[f'fit_slope_{col}'] = res_fit['slope'] ## Schersteifigkeit berechnen deltaF = res['fit_amp_F'] deltaS = res['fit_amp_s_vert_sum'] A = np.pi * self.meta['speciment_diameter']**2 / 4 tau = deltaF / A gamma = deltaS / self.gap_width res['G'] = tau / gamma #metadaten for c in ['T', 'extension', 'sigma_normal', 'f']: res[c] = data[c][0] self.fit.append(res) if (self.debug) & (len(self.fit) > 5): break self.fit = pd.DataFrame.from_records(self.fit) def plot_results(self, opath=None, pkname=None, r2min=0.96): if not (opath is None) & (pkname is None): showplot = False opath = os.path.join(opath, pkname) if not os.path.exists(opath): os.makedirs(opath) else: showplot = True dfplot = self.fit.copy() for col in ['extension', 'fit_amp_s_vert_sum']: dfplot[col] = dfplot[col].mul(1000) fig, ax = plt.subplots() xticks = list(dfplot['extension'].unique()) df = dfplot df = df[(df['r2_F'] >= r2min) & (df['r2_s_vert_sum'] >= r2min)] sns.scatterplot( data=df, x='fit_amp_s_vert_sum', y='G', hue='T', ax=ax, alpha=0.7, #size=150, size="G", sizes=(50, 160), edgecolor='k', palette='muted', zorder=10) df = dfplot df = df[(df['r2_F'] < r2min) & (df['r2_s_vert_sum'] < r2min)] if not df.empty: sns.scatterplot(data=df, x='fit_amp_s_vert_sum', y='G', facecolor='grey', alpha=0.5, legend=False, zorder=1, ax=ax) ax.set_xlabel('gemessene Scherwegamplitude in $\mu m$') ax.set_ylabel('Scherseteifigkeit in MPa/mm') ax.set_xticks(xticks) ax.grid() if not showplot: ofile = os.path.join(opath, 'shearstiffness.pdf') plt.savefig(ofile) plt.show() def plot_stats(self, opath=None, pkname=None, r2min=0.96): if not (opath is None) & (pkname is None): showplot = False opath = os.path.join(opath, pkname) if not os.path.exists(opath): os.makedirs(opath) else: showplot = True dfplot = self.fit.copy() for col in ['extension', 'fit_amp_s_vert_sum']: dfplot[col] = dfplot[col].mul(1000) #r2 df = self.fit fig, axs = plt.subplots(1, 2, sharey=True, sharex=True) parscatter = dict(palette='muted', alpha=0.7, edgecolor='k', lw=0.3) # r2 ax = axs[0] sns.scatterplot(data=df, x='fit_amp_s_vert_sum', y='r2_F', hue='T', ax=ax, **parscatter) ax.set_ylabel('Bestimmtheitsmaß $R^2$') ax.set_title('Kraft') ax = axs[1] sns.scatterplot(data=df, x='fit_amp_s_vert_sum', y='r2_s_vert_sum', hue='T', legend=False, ax=ax, **parscatter) ax.set_ylabel('$R^2$ (S_{mittel})') ax.set_title('mittlerer Scherweg') for ax in axs.flatten(): ax.grid() ax.set_xlabel('gemessene Scherwegamplitude in $\mu m$') plt.tight_layout() if not showplot: ofile = os.path.join(opath, 'stats_r2.pdf') plt.savefig(ofile) plt.show() class ShearTestExtensionLaborHart(ShearTestExtension): def _define_units(self): self.unit_F = 1 / 1000.0 #N self.unit_t = 1 / 1000. #s def _set_units(self): #for col in ['F']: # self.data[col] = self.data[col].mul(self.unit_F) for col in ['time']: self.data[col] = self.data[col].mul(self.unit_t) return True def _read_data(self): """ read data from Labor Hart """ # parameter encoding = 'latin-1' skiprows = 14 hasunits = True splitsign = ':;' # metadata from file meta = {} with open(self.file, 'r', encoding=encoding) as f: count = 0 for line in f: count += 1 #remove whitespace linesplit = line.strip() linesplit = linesplit.split(splitsign) if len(linesplit) == 2: meta[linesplit[0]] = linesplit[1] if count >= skiprows: break # data data = pd.read_csv(self.file, encoding=encoding, skiprows=skiprows, decimal=',', sep=';') ## add header to df with open(self.file, 'r', encoding=encoding) as f: count = 0 for line in f: count += 1 if count >= skiprows: break head = line.split(';') data.columns = head #clean data data = data.dropna(axis=1) #define in class self.meta = meta self.data = data return True def _standardize_meta(self): keys = list(self.meta.keys()) for key in keys: if any(map(key.__contains__, ['Probenbezeichnung'])): self.meta['speciment'] = self.meta.pop(key) elif any(map(key.__contains__, ['Datum/Uhrzeit'])): self.meta['datetime'] = self.meta.pop(key) try: self.meta['datetime'] = pd.to_datetime( self.meta['datetime']) except: pass elif any(map(key.__contains__, ['Probenhöhe'])): self.meta['speciment_height'] = float( self.meta.pop(key).replace(',', '.')) elif any(map(key.__contains__, ['Probendurchmesser'])): self.meta['speciment_diameter'] = float( self.meta.pop(key).replace(',', '.')) elif any(map(key.__contains__, ['Solltemperatur'])): self.meta['temperature'] = float( self.meta.pop(key).replace(',', '.')) elif any(map(key.__contains__, ['Prüfbedingungen'])): self.meta['test_version'] = self.meta.pop(key) elif any(map(key.__contains__, ['Name des VersAblf'])): self.meta['test'] = self.meta.pop(key) elif any(map(key.__contains__, ['Prüfer'])): self.meta['examiner'] = self.meta.pop(key) return True def _standardize_data(self): colnames = list(self.data.columns) for i, col in enumerate(colnames): if col == 'TIME': colnames[i] = 'time' #set values elif col == 'Sollwert Frequenz': colnames[i] = 'f' elif col == 'SollTemperatur': colnames[i] = 'T' elif col == 'Max Scherweg': colnames[i] = 'extension' elif col == 'Sollwert Normalspannung': colnames[i] = 'sigma_normal' elif col == 'Impulsnummer': colnames[i] = 'N' # measurements elif col == 'Load': colnames[i] = 'F' elif col == 'Position': colnames[i] = 's_piston' elif col == 'VERTIKAL Links': colnames[i] = 's_vert_1' elif col == 'VERTIKAL Rechts': colnames[i] = 's_vert_2' elif col == 'HORIZONTAL links': colnames[i] = 's_hor_1' elif col == 'HOIZONTAL Rechts': colnames[i] = 's_hor_2' self.data.columns = colnames class ShearTestExtensionTUDresdenGeosys(ShearTestExtension): def _define_units(self): self.unit_S = 1 / 1000.0 #N def _set_units(self): for col in [ 's_vert_sum', 's_vert_1', 's_vert_2', 's_piston', 'extension' ]: self.data[col] = self.data[col].mul(self.unit_S) #convert internal units to global f = np.mean([0.9 / 355, 0.6 / 234.0, 0.3 / 116.0]) self.data['sigma_normal'] = self.data['sigma_normal'].mul(f).apply( lambda x: np.round(x, 1)) return True def _read_data(self): """ read data from Labor Hart """ # parameter encoding = 'latin-1' skiprows = 14 hasunits = True splitsign = ':;' head, data = read_geosys(self.file, '015') #define in class self.meta = head self.data = data return True def _standardize_meta(self): keys = list(self.meta.keys()) for key in keys: if key == 'd': self.meta['speciment_diameter'] = self.meta.pop(key) return True def _standardize_data(self): colnames = list(self.data.columns) for i, col in enumerate(colnames): #set values if col == 'soll temperature': colnames[i] = 'T' elif col == 'soll extension': colnames[i] = 'extension' elif col == 'soll sigma': colnames[i] = 'sigma_normal' elif col == 'soll frequency': colnames[i] = 'f' elif col == 'Number of vertical cycles': colnames[i] = 'N' # measurements elif col == 'vertical load from hydraulic pressure': colnames[i] = 'F' elif col == 'vertical position from hydraulic pressure': colnames[i] = 's_piston' elif col == 'Vertical position from LVDT 1': colnames[i] = 's_vert_1' elif col == 'Vertical position from LVDT 2': colnames[i] = 's_vert_2' self.data.columns = colnames