pytest/pytestpavement/versuche/base.py
2022-09-27 08:18:52 +02:00

485 lines
14 KiB
Python

from .fit import model_cos
class DataSineLoad():
"""
Base class for lab tests with sine load
"""
def __init__(self, fname: str, debug: bool = False):
self.meta = {'d': 150, 'speciment_height': 60}
self.file = fname
self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_hor_sum']
# Header names after standardization; check if exists
self.val_header_names = ['speciment_height']
self._set_parameter()
self._file_to_bytesio()
self._calc_hash()
self._read_data()
self._standardize_data()
self._standardize_meta()
if not debug:
self._calc_missiong_values()
self._set_units()
self._validate_data()
self._postprocess_data()
self._split_data()
self._select_data()
self._fit_data()
def _set_parameter(self):
self.split_data_based_on_parameter = ['f']
self.col_as_int = ['N']
self.col_as_float = ['T', 'F', 's_piston', 's_hor_1', 'f', 's_hor_sum']
self.number_of_load_cycles_for_analysis = 5
self.unit_s = 1 #mm
self.unit_F = 1 #N
self.unit_t = 1 / 1000. #s
def _file_to_bytesio(self):
""" read data and save in memory """
with open(self.file, 'rb') as fh:
self.buf = BytesIO(fh.read())
def _calc_hash(self):
""" calculate the hash of the file """
#read t
algo = hashlib.sha1()
buffer_size = 65536
buffer_size = buffer_size * 1024 * 1024
while True:
data = self.buf.read(buffer_size)
if not data:
break
algo.update(data)
self.hex = algo.hexdigest()
self.buf.seek(0)
def _read_data(self, encoding='latin-1', skiprows=14, hasunits=True):
"""
read data from Labor Hart, Spaltzugversuche Steifigkeit
"""
# metadata from file
meta = {}
splitsign = ':;'
with open(self.file, 'r', encoding=encoding) as f:
count = 0
for line in f:
count += 1
#remove whitespace
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
if len(linesplit) == 2:
meta[linesplit[0]] = linesplit[1]
if count >= skiprows:
break
# data
data = pd.read_csv(self.file,
encoding=encoding,
skiprows=skiprows,
decimal=',',
sep=';')
## add header to df
with open(self.file, 'r', encoding=encoding) as f:
count = 0
for line in f:
count += 1
if count >= skiprows:
break
head = line.split(';')
data.columns = head
sigma = float(
os.path.split(self.file)[-1].split('MPa')[0].strip().replace(
',', '.'))
data['sigma'] = sigma
#clean data
data = data.dropna(axis=1)
#define in class
self.meta = meta
self.data = data
return True
def _validate_data(self):
""" check if all column names are standardized"""
cols = self.data.columns
for val in self.val_header_names:
if not val in self.meta.keys():
raise ValueError(f"{val} not in header of data")
for col in self.val_col_names:
if not col in cols:
raise ValueError(f'{col} not standardized')
def _standardize_meta(self):
keys = list(self.meta.keys())
for key in keys:
if any(map(key.__contains__, ['Probenbezeichnung'])):
self.meta['speciment'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Datum/Uhrzeit'])):
self.meta['datetime'] = self.meta.pop(key)
try:
self.meta['datetime'] = pd.to_datetime(
self.meta['datetime'])
except:
pass
elif any(map(key.__contains__, ['Probenhöhe'])):
self.meta['speciment_height'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Probendurchmesser'])):
self.meta['speciment_diameter'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Solltemperatur'])):
self.meta['temperature'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Prüfbedingungen'])):
self.meta['test_version'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Name des VersAblf'])):
self.meta['test'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Prüfer'])):
self.meta['examiner'] = self.meta.pop(key)
return True
def _standardize_data(self):
colnames = list(self.data.columns)
for i, col in enumerate(colnames):
if any(map(col.__contains__, ['TIME'])):
colnames[i] = 'time'
elif any(map(col.__contains__, ['Temperatur'])):
colnames[i] = 'T'
elif any(map(col.__contains__, ['Load'])):
colnames[i] = 'F'
elif any(map(col.__contains__, ['Position'])):
colnames[i] = 's_piston'
elif any(map(col.__contains__, ['FREQUENZ'])):
colnames[i] = 'f'
elif any(map(col.__contains__, ['mpulsnummer_fortlaufend'])):
colnames[i] = 'Ncum'
elif any(map(col.__contains__, ['Impulsnummer'])):
colnames[i] = 'N'
elif any(map(col.__contains__, ['SENSOR 4'])):
colnames[i] = 's_hor_1'
elif any(map(col.__contains__, ['SENSOR Extension'])):
colnames[i] = 's_hor_2'
self.data.columns = colnames
def _set_units(self):
for col in ['s_hor_sum', 's_hor_1', 's_hor_2']:
self.data[col] = self.data[col].mul(self.unit_s)
for col in ['F']:
self.data[col] = self.data[col].mul(self.unit_F)
for col in ['time']:
self.data[col] = self.data[col].mul(self.unit_t)
return True
def _calc_missiong_values(self):
cols = self.data.columns
if not 's_hor_sum' in cols:
self.data['s_hor_sum'] = self.data[['s_hor_1',
's_hor_2']].sum(axis=1)
def _postprocess_data(self):
#set dtypes:
for col in self.col_as_int:
self.data[col] = self.data[col].astype('int')
for col in self.col_as_float:
try:
self.data[col] = self.data[col].astype('float')
except:
pass
#set index
self.data = self.data.set_index('time')
return True
def _split_data(self):
data_gp = self.data.groupby(self.split_data_based_on_parameter)
data_list = []
for idx, d in data_gp:
idx_diff = np.diff(d.index)
dt_mean = idx_diff.mean()
gaps = idx_diff > (2 * dt_mean)
has_gaps = any(gaps)
if has_gaps == False:
data_list.append(d)
else:
idx_gaps = (np.where(gaps)[0] - 1)[0]
data_list.append(d.iloc[0:idx_gaps])
#add self.
if len(data_list) == 0:
self.num_tests = 0
self.data = data_list[0]
else:
self.num_tests = len(data_list)
self.data = data_list
#break
def _select_data(self):
""" select N load cycles from original data """
def sel_df(df, num=5):
N = df['N'].unique()
if len(N) > num:
df_sel = df[(df['N'] >= N[-num - 1, ]) & (df['N'] <= N[-2, ])]
return df_sel
else:
ValueError(
'Number of load cycles smaller than selectect values')
if not isinstance(self.data, list):
df_sel = [
sel_df(self.data, num=self.number_of_load_cycles_for_analysis)
]
else:
df_sel = []
for d in self.data:
d_sel = sel_df(d, num=self.number_of_load_cycles_for_analysis)
df_sel.append(d_sel)
# replace data
self.data = df_sel
return True
def _fit_data(self):
self.fit = []
for idx_data, data in enumerate(self.data):
if data is None: continue
data.index = data.index - data.index[0]
res = {}
columns_analyse = [
'F',
's_hor_sum',
's_hor_1',
's_hor_2',
's_piston',
]
ylabel_dict = {
'F': 'Kraft in N',
's_hor_sum': 'Verformung (Summe) in mm',
's_piston': 'Verformung Kolbenweg in mm',
's_hor_1': 'Verformung ($S_1$) in mm',
's_hor_2': 'Verformung ($S_2$) in mm'
}
fig, axs = plt.subplots(len(columns_analyse),
1,
figsize=(8, len(columns_analyse) * 2),
sharex=True)
for idxcol, col in enumerate(columns_analyse):
if not col in data.columns: continue
x = data.index.values
y = data[col].values
# Fourier Transformation
dt = np.diff(x).mean() #mean sampling rate
n = len(x)
res[f'psd_{col}'] = sfft.rfft(y) #compute the FFT
res[f'freq_{col}'] = sfft.rfftfreq(n, dt)
# Fitting
freq = np.round(data['f'].mean(), 3)
sigma = np.round(data['sigma'].mean(), 3)
res_step1 = fit_sin_anstieg(x, y)
mod = lm.models.Model(model_cos)
mod.set_param_hint(
'd',
value=res_step1['offset'],
#min=res_step1['offset'] - 0.5*abs(res_step1['offset']),
#max=res_step1['offset'] + 0.5*abs(res_step1['offset'])
)
mod.set_param_hint(
'a',
value=res_step1['amp'],
#min=res_step1['amp'] - 0.5*abs(res_step1['amp']),
#max=res_step1['amp'] + 0.5*abs(res_step1['amp'])
)
mod.set_param_hint('b', value=res_step1['phase'])
mod.set_param_hint('e', value=0) #, min = -0.5, max = 0.5)
mod.set_param_hint('f', value=freq, vary=True)
parms_fit = [
mod.param_hints['a']['value'],
mod.param_hints['b']['value'],
mod.param_hints['d']['value'],
mod.param_hints['e']['value'],
mod.param_hints['f']['value']
]
abweichung = []
chis = []
chis_red = []
results = []
r2 = []
methods = ['leastsq', 'powell']
dof = len(y) - len(parms_fit)
for method in methods:
#print(method)
result = mod.fit(y, t=x, method=method, verbose=False)
r2temp = 1 - result.residual.var() / np.var(y)
# r2temp = result.redchi / np.var(yfit, ddof=2)
if r2temp < 0.:
r2temp = 0
r2.append(r2temp)
chi = result.chisqr
chis_red.append(result.redchi)
#print(chi)
abweichung.append(sf.gammaincc(dof / 2., chi / 2))
chis.append(chi)
results.append(result)
ret = {}
best = np.nanargmax(r2)
ret['model'] = mod
ret['x'] = x
ret['result'] = results[best]
res[f'r2_{col}'] = r2[best]
res[f'fit_a_{col}'] = results[best].best_values['a']
res[f'fit_b_{col}'] = results[best].best_values['b']
res[f'fit_d_{col}'] = results[best].best_values['d']
res[f'fit_e_{col}'] = results[best].best_values['e']
res[f'fit_f_{col}'] = results[best].best_values['f']
res['f'] = freq
res['sigma'] = sigma
res['filename'] = self.file
yreg = model_cos(x, res[f'fit_a_{col}'], res[f'fit_b_{col}'],
res[f'fit_d_{col}'], res[f'fit_e_{col}'],
res[f'fit_f_{col}'])
plt.sca(axs[idxcol])
plt.plot(x, y, label='Messdaten')
r2 = res[f'r2_{col}']
lstr = f'Fit (r² = {r2:0.4f})'
plt.plot(x, yreg, label=lstr)
plt.xlim([x[0], x[-1]])
plt.xlabel('Zeit in s')
plt.ylabel(ylabel_dict[col])
plt.legend()
#data[['F', 's_hor_sum']].plot(subplots=True)
plt.tight_layout()
ofile = self.file
ofile = ofile.replace('/raw/', '/plots/')[:-4]
ofile = ofile + f'_{idx_data:03.0f}.pdf'
ofolder = os.path.split(ofile)[0]
if not os.path.exists(ofolder):
os.makedirs(ofolder)
plt.savefig(ofile)
plt.close()
## Stiffness
deltaF = res['fit_a_F']
nu = 0.298
h = float(self.meta['speciment_height'])
deltaU = res['fit_a_s_hor_sum']
res['E'] = (deltaF * (0.274 + nu)) / (h * deltaU)
self.fit.append(res)
#break
self.fit = pd.DataFrame.from_records(self.fit)