pytest/pytestpavement/labtests/sheartest.py
2022-09-27 20:18:52 +02:00

638 lines
18 KiB
Python

import os
import lmfit as lm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.fft as sfft
import seaborn as sns
from pytestpavement.analysis.regression import fit_cos, fit_cos_eval
from pytestpavement.io.geosys import read_geosys
from pytestpavement.labtests.base import DataSineLoad
from pytestpavement.models.data import DataSheartest
from pytestpavement.models.sheartest import DynamicShearTestExtension
class ShearTest(DataSineLoad):
"""
Dynamic Shear Bounding Test
"""
def __init__(self,
fname: str,
debug: bool = False,
gap_width: float = 1.0,
roundtemperature: bool = True):
#set parameter
self.gap_width = gap_width
self.debug = debug
self.file = fname
self.roundtemperature = roundtemperature
# process file
self._run()
def plot_fited_data(self, opath=None, pkname=None, r2min=0.99):
ylabel_dict = {
'F': 'Kraft in N',
's_vert_sum': 'norm. mittlerer Scherweg\n $S_{mittel}$ in mm',
's_piston': 'norm. Kolbenweg\n in mm',
's_vert_1': 'Scherweg\n $S_1$ in mm',
's_vert_2': 'Scherweg\n $S_2$ in mm'
}
columns_analyse = [
'F',
's_vert_sum',
's_vert_1',
's_vert_2',
's_piston',
]
if not (opath is None) & (pkname is None):
showplot = False
opath = os.path.join(opath, pkname, 'raw_data')
if not os.path.exists(opath):
os.makedirs(opath)
else:
showplot = True
for i, fit in self.fit.iterrows():
if not any([fit['r2_F'] < r2min, fit['r2_s_vert_sum'] < r2min]):
continue
data = self.data[int(fit['idx_data'])]
if data is None:
continue
freq = data['f'].unique()[0]
sigma = data['sigma_normal'].unique()[0]
s = data['extension'].unique()[0]
T = data['T'].unique()[0]
fig, axs = plt.subplots(len(columns_analyse),
1,
figsize=(8, len(columns_analyse) * 2),
sharex=True)
for idxcol, col in enumerate(columns_analyse):
x, y = data.index, data[col]
#add fit
f = self.fit.iloc[i]
parfit = {}
for k in ['amp', 'freq', 'phase', 'offset', 'slope']:
parfit[k] = f[f'fit_{k}_{col}']
yreg = fit_cos_eval(x, parfit)
if col in ['s_piston', 's_vert_sum']:
y = y - np.mean(y)
yreg = yreg - np.mean(yreg)
plt.sca(axs[idxcol])
plt.plot(x, y, label='Messdaten')
r2 = np.round(f[f'r2_{col}'], 3)
plt.plot(x,
yreg,
alpha=0.7,
label=f'Regression ($R^2 = {r2}$)')
if not ('F' in col):
s = f['extension']
parline = dict(lw=0.4,
ls='--',
color='lightgrey',
alpha=0.4,
label='Bereich des zul. Scherweges')
plt.axhspan(-s, s, **parline)
if idxcol == len(columns_analyse) - 1:
plt.xlabel('Zeit in s')
plt.ylabel(ylabel_dict[col])
plt.legend()
plt.tight_layout()
if showplot:
plt.show()
break
else:
ofile = f'{T}deg_{sigma}MPa_{freq}Hz_{s}mm'.replace('.', 'x')
ofile = os.path.join(opath, ofile + '.pdf')
plt.savefig(ofile)
plt.close()
class ShearTestExtension(ShearTest):
def runfit(self):
self._fit_data()
def save(self, material1, material2, bounding, meta: dict):
for i, fit in self.fit.iterrows():
data = self.data[int(fit['idx_data'])]
#check if data in db
n = DynamicShearTestExtension.objects(
f=fit['f'],
sigma_normal=fit['sigma_normal'],
T=fit['T'],
extension=fit['extension'],
material1=material1,
material2=material2,
bounding=bounding,
filehash=self.filehash,
).count()
if n > 0: continue
#save data
rdata = DataSheartest(
time=data.index.values,
F=data['F'].values,
N=data['N'].values,
s_vert_1=data['s_vert_1'].values,
s_vert_2=data['s_vert_2'].values,
s_vert_sum=data['s_vert_sum'].values,
s_piston=data['s_piston'].values,
).save()
# save fit
values = {}
for col in ['F', 's_vert_1', 's_vert_2', 's_vert_sum']:
values[f'fit_amp_{col}'] = fit[f'fit_amp_{col}']
values[f'fit_freq_{col}'] = fit[f'fit_freq_{col}']
values[f'fit_phase_{col}'] = fit[f'fit_phase_{col}']
values[f'fit_offset_{col}'] = fit[f'fit_offset_{col}']
values[f'fit_slope_{col}'] = fit[f'fit_slope_{col}']
values.update(meta)
try:
r = DynamicShearTestExtension(
#metadata
f=fit['f'],
sigma_normal=fit['sigma_normal'],
T=fit['T'],
extension=fit['extension'],
filehash=self.filehash,
material1=material1,
material2=material2,
bounding=bounding,
#results
data=rdata,
stiffness=fit['G'],
#
**values).save()
except:
rdata.delete()
def _set_parameter(self):
self.split_data_based_on_parameter = [
'T', 'sigma_normal', 'f', 'extension'
]
self.col_as_int = ['N']
self.col_as_float = ['T', 'F', 'f', 's_vert_sum']
self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_vert_sum']
# Header names after standardization; check if exists
self.val_header_names = ['speciment_diameter']
self.columns_analyse = [
'F', 's_vert_sum', 's_vert_1', 's_vert_2', 's_piston'
]
self.number_of_load_cycles_for_analysis = 5
def _calc_missiong_values(self):
cols = self.data.columns
for c in ['vert']:
if not f's_{c}_sum' in cols:
self.data[f's_{c}_sum'] = self.data[[f's_{c}_1', f's_{c}_2'
]].sum(axis=1).div(2.0)
def _fit_data(self):
self.fit = []
for idx_data, data in enumerate(self.data):
if data is None: continue
data.index = data.index - data.index[0]
res = {}
res['idx_data'] = int(idx_data)
# Fitting
freq = float(np.round(data['f'].mean(), 4))
if (self.debug):
sigma_normal = np.round(data['sigma_normal'].mean(), 3)
T = np.round(data['T'].mean(), 3)
for idxcol, col in enumerate(self.columns_analyse):
if not col in data.columns: continue
x = data.index.values
y = data[col].values
# Fourier Transformation
"""
dt = np.diff(x).mean() #mean sampling rate
n = len(x)
res[f'psd_{col}'] = sfft.rfft(y) #compute the FFT
res[f'freq_{col}'] = sfft.rfftfreq(n, dt)
"""
res_fit = fit_cos(x, y, freq=freq, constfreq=True)
res[f'r2_{col}'] = res_fit['r2']
res[f'fit_amp_{col}'] = res_fit['amp']
res[f'fit_freq_{col}'] = res_fit['freq']
res[f'fit_phase_{col}'] = res_fit['phase']
res[f'fit_offset_{col}'] = res_fit['offset']
res[f'fit_slope_{col}'] = res_fit['slope']
## Schersteifigkeit berechnen
deltaF = res['fit_amp_F']
deltaS = res['fit_amp_s_vert_sum']
A = np.pi * self.meta['speciment_diameter']**2 / 4
tau = deltaF / A
gamma = deltaS / self.gap_width
res['G'] = tau / gamma
#metadaten
for c in ['T', 'extension', 'sigma_normal', 'f']:
res[c] = data[c][0]
self.fit.append(res)
if (self.debug) & (len(self.fit) > 5):
break
self.fit = pd.DataFrame.from_records(self.fit)
def plot_results(self, opath=None, pkname=None, r2min=0.96):
if not (opath is None) & (pkname is None):
showplot = False
opath = os.path.join(opath, pkname)
if not os.path.exists(opath):
os.makedirs(opath)
else:
showplot = True
dfplot = self.fit.copy()
for col in ['extension', 'fit_amp_s_vert_sum']:
dfplot[col] = dfplot[col].mul(1000)
fig, ax = plt.subplots()
xticks = list(dfplot['extension'].unique())
df = dfplot
df = df[(df['r2_F'] >= r2min) & (df['r2_s_vert_sum'] >= r2min)]
sns.scatterplot(
data=df,
x='fit_amp_s_vert_sum',
y='G',
hue='T',
ax=ax,
alpha=0.7,
#size=150,
size="G",
sizes=(50, 160),
edgecolor='k',
palette='muted',
zorder=10)
df = dfplot
df = df[(df['r2_F'] < r2min) & (df['r2_s_vert_sum'] < r2min)]
if not df.empty:
sns.scatterplot(data=df,
x='fit_amp_s_vert_sum',
y='G',
facecolor='grey',
alpha=0.5,
legend=False,
zorder=1,
ax=ax)
ax.set_xlabel('gemessene Scherwegamplitude in $\mu m$')
ax.set_ylabel('Scherseteifigkeit in MPa/mm')
ax.set_xticks(xticks)
ax.grid()
if not showplot:
ofile = os.path.join(opath, 'shearstiffness.pdf')
plt.savefig(ofile)
plt.show()
def plot_stats(self, opath=None, pkname=None, r2min=0.96):
if not (opath is None) & (pkname is None):
showplot = False
opath = os.path.join(opath, pkname)
if not os.path.exists(opath):
os.makedirs(opath)
else:
showplot = True
dfplot = self.fit.copy()
for col in ['extension', 'fit_amp_s_vert_sum']:
dfplot[col] = dfplot[col].mul(1000)
#r2
df = self.fit
fig, axs = plt.subplots(1, 2, sharey=True, sharex=True)
parscatter = dict(palette='muted', alpha=0.7, edgecolor='k', lw=0.3)
# r2
ax = axs[0]
sns.scatterplot(data=df,
x='fit_amp_s_vert_sum',
y='r2_F',
hue='T',
ax=ax,
**parscatter)
ax.set_ylabel('Bestimmtheitsmaß $R^2$')
ax.set_title('Kraft')
ax = axs[1]
sns.scatterplot(data=df,
x='fit_amp_s_vert_sum',
y='r2_s_vert_sum',
hue='T',
legend=False,
ax=ax,
**parscatter)
ax.set_ylabel('$R^2$ (S_{mittel})')
ax.set_title('mittlerer Scherweg')
for ax in axs.flatten():
ax.grid()
ax.set_xlabel('gemessene Scherwegamplitude in $\mu m$')
plt.tight_layout()
if not showplot:
ofile = os.path.join(opath, 'stats_r2.pdf')
plt.savefig(ofile)
plt.show()
class ShearTestExtensionLaborHart(ShearTestExtension):
def _define_units(self):
self.unit_F = 1 / 1000.0 #N
self.unit_t = 1 / 1000. #s
def _set_units(self):
#for col in ['F']:
# self.data[col] = self.data[col].mul(self.unit_F)
for col in ['time']:
self.data[col] = self.data[col].mul(self.unit_t)
return True
def _read_data(self):
"""
read data from Labor Hart
"""
# parameter
encoding = 'latin-1'
skiprows = 14
hasunits = True
splitsign = ':;'
# metadata from file
meta = {}
with open(self.file, 'r', encoding=encoding) as f:
count = 0
for line in f:
count += 1
#remove whitespace
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
if len(linesplit) == 2:
meta[linesplit[0]] = linesplit[1]
if count >= skiprows:
break
# data
data = pd.read_csv(self.file,
encoding=encoding,
skiprows=skiprows,
decimal=',',
sep=';')
## add header to df
with open(self.file, 'r', encoding=encoding) as f:
count = 0
for line in f:
count += 1
if count >= skiprows:
break
head = line.split(';')
data.columns = head
#clean data
data = data.dropna(axis=1)
#define in class
self.meta = meta
self.data = data
return True
def _standardize_meta(self):
keys = list(self.meta.keys())
for key in keys:
if any(map(key.__contains__, ['Probenbezeichnung'])):
self.meta['speciment'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Datum/Uhrzeit'])):
self.meta['datetime'] = self.meta.pop(key)
try:
self.meta['datetime'] = pd.to_datetime(
self.meta['datetime'])
except:
pass
elif any(map(key.__contains__, ['Probenhöhe'])):
self.meta['speciment_height'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Probendurchmesser'])):
self.meta['speciment_diameter'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Solltemperatur'])):
self.meta['temperature'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Prüfbedingungen'])):
self.meta['test_version'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Name des VersAblf'])):
self.meta['test'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Prüfer'])):
self.meta['examiner'] = self.meta.pop(key)
return True
def _standardize_data(self):
colnames = list(self.data.columns)
for i, col in enumerate(colnames):
if col == 'TIME':
colnames[i] = 'time'
#set values
elif col == 'Sollwert Frequenz':
colnames[i] = 'f'
elif col == 'SollTemperatur':
colnames[i] = 'T'
elif col == 'Max Scherweg':
colnames[i] = 'extension'
elif col == 'Sollwert Normalspannung':
colnames[i] = 'sigma_normal'
elif col == 'Impulsnummer':
colnames[i] = 'N'
# measurements
elif col == 'Load':
colnames[i] = 'F'
elif col == 'Position':
colnames[i] = 's_piston'
elif col == 'VERTIKAL Links':
colnames[i] = 's_vert_1'
elif col == 'VERTIKAL Rechts':
colnames[i] = 's_vert_2'
elif col == 'HORIZONTAL links':
colnames[i] = 's_hor_1'
elif col == 'HOIZONTAL Rechts':
colnames[i] = 's_hor_2'
self.data.columns = colnames
class ShearTestExtensionTUDresdenGeosys(ShearTestExtension):
def _define_units(self):
self.unit_S = 1 / 1000.0 #N
def _set_units(self):
for col in [
's_vert_sum', 's_vert_1', 's_vert_2', 's_piston', 'extension'
]:
self.data[col] = self.data[col].mul(self.unit_S)
return True
def _read_data(self):
"""
read data from Labor Hart
"""
# parameter
encoding = 'latin-1'
skiprows = 14
hasunits = True
splitsign = ':;'
head, data = read_geosys(self.file, '015')
#define in class
self.meta = head
self.data = data
return True
def _standardize_meta(self):
keys = list(self.meta.keys())
for key in keys:
if key == 'd':
self.meta['speciment_diameter'] = self.meta.pop(key)
return True
def _standardize_data(self):
colnames = list(self.data.columns)
for i, col in enumerate(colnames):
#set values
if col == 'soll temperature':
colnames[i] = 'T'
elif col == 'soll extension':
colnames[i] = 'extension'
elif col == 'soll sigma':
colnames[i] = 'sigma_normal'
elif col == 'soll frequency':
colnames[i] = 'f'
elif col == 'Number of vertical cycles':
colnames[i] = 'N'
# measurements
elif col == 'vertical load from hydraulic pressure':
colnames[i] = 'F'
elif col == 'vertical position from hydraulic pressure':
colnames[i] = 's_piston'
elif col == 'Vertical position from LVDT 1':
colnames[i] = 's_vert_1'
elif col == 'Vertical position from LVDT 2':
colnames[i] = 's_vert_2'
self.data.columns = colnames