import numpy as np
import pandas as pd
from scipy.integrate import cumulative_trapezoid
from scipy.signal import savgol_filter
from .utils import lowpass_butter, find_peaks
from .move import rotate_matrix
[docs]
def auto_process(
data,
wheelsize=0.31,
rimsize=0.27,
sfreq=200,
co_f=15,
ord_f=2,
co_s=6,
ord_s=2,
force=True,
speed=True,
variable="torque",
cutoff=0.0,
wl=201,
ord_a=2,
minpeak=5.0,
):
"""
Top level processing function that performs all processing steps for mw/ergo data.
Contains all signal processing steps in fixed order. It is advised to use this function for all (pre-)processing.
If needed to take a look at a specific function to see how it works.
Parameters
----------
data : pd.DataFrame, dict
raw ergometer or measurement wheel data
wheelsize : float
wheel radius [m]
rimsize : float
handrim radius [m]
sfreq : int
sample frequency [Hz]
co_f : int
cutoff frequency force filter [Hz]
ord_f : int
order force filter [..]
co_s : int
cutoff frequency force filter [Hz]
ord_s : int
order speed filter [..]
force : bool
force filter toggle, default is True
speed : bool
speed filter toggle, default is True
variable : str
variable name used for peak (push) detection
cutoff : float
noise level for peak (push) detection
wl : int
window length angle filter
ord_a : int
order angle filter [..]
minpeak : float
min peak height for peak (push) detection
Returns
-------
data : pd.DataFrame, dict
pushes : pd.DataFrame, dict
See Also
--------
filter_mw, process_mw, push_by_push_mw, filter_ergo, process_ergo, push_by_push_ergo
"""
if "right" in data:
data = filter_ergo(data, co_f, ord_f, co_s, ord_s, force, speed)
data = process_ergo(data, wheelsize, rimsize)
pushes = push_by_push_ergo(data, variable, cutoff, minpeak)
else:
data = filter_mw(data, sfreq, co_f, ord_f, wl, ord_a, force, speed)
data = process_mw(data, wheelsize, rimsize, sfreq)
pushes = push_by_push_mw(data, variable, cutoff, minpeak)
return data, pushes
[docs]
def filter_mw(data, sfreq=200.0, co_f=15.0, ord_f=2, wl=201, ord_a=2, force=True, speed=True):
"""
Filters measurement wheel data.
Filters raw measurement wheel data. Should be used before further processing.
Parameters
----------
data : pd.DataFrame
raw measurement wheel data
sfreq : float
sample frequency [Hz]
co_f : float
cutoff frequency force filter [Hz]
ord_f : int
order force filter [..]
wl : int
window length angle filter
ord_a : int
order angle filter [..]
force : bool
force filter toggle, default is True
speed : bool
speed filter toggle, default is True
Returns
-------
data : pd.DataFrame
Same data but filtered.
See Also
--------
.utils.lowpass_butter
"""
if force:
frel = ["fx", "fy", "fz", "mx", "my", "torque"]
for var in frel:
data[var] = lowpass_butter(data[var], cutoff=co_f, order=ord_f, sfreq=sfreq)
if speed:
data["angle"] = savgol_filter(data["angle"], window_length=wl, polyorder=ord_a)
return data
[docs]
def filter_ergo(data, co_f=15.0, ord_f=2, co_s=6.0, ord_s=2, force=True, speed=True):
"""
Filters ergometer data.
Filters raw ergometer data. Should be used before further processing.
Parameters
----------
data : dict
raw measurement wheel data
co_f : float
cutoff frequency force filter [Hz]
ord_f : int
order force filter [..]
co_s : float
cutoff frequency speed filter [Hz]
ord_s : int
order speed filter [..]
force : bool
force filter toggle, default is True
speed : bool
speed filter toggle, default is True
Returns
-------
data : dict
Same data but filtered.
See Also
--------
.utils.lowpass_butter
"""
sfreq = 100
for side in data:
if force:
data[side]["force"] = lowpass_butter(data[side]["force"], cutoff=co_f, order=ord_f, sfreq=sfreq)
if speed:
data[side]["speed"] = lowpass_butter(data[side]["speed"], cutoff=co_s, order=ord_s, sfreq=sfreq)
return data
[docs]
def process_mw(data, wheelsize=0.31, rimsize=0.275, sfreq=200):
"""
Basic processing for measurement wheel data.
Basic processing for measurement wheel data (e.g. speed to distance). Should be performed after filtering.
Added columns:
+------------+----------------------+-----------+
| Column | Data | Unit |
+============+======================+===========+
| aspeed | angular velocity | rad/s |
+------------+----------------------+-----------+
| speed | velocity | m/s |
+------------+----------------------+-----------+
| dist | cumulative distance | m |
+------------+----------------------+-----------+
| acc | acceleration | m/s^2 |
+------------+----------------------+-----------+
| ftot | total combined force | N |
+------------+----------------------+-----------+
| uforce | effective force | N |
+------------+----------------------+-----------+
| force | force on wheel | N |
+------------+----------------------+-----------+
| power | power | W |
+------------+----------------------+-----------+
| work | instantaneous work | J |
+------------+----------------------+-----------+
Parameters
----------
data : pd.DataFrame
raw measurement wheel data
wheelsize : float
wheel radius [m]
rimsize : float
handrim radius [m]
sfreq : int
sample frequency [Hz]
Returns
-------
data : pd.DataFrame
See Also
--------
.com.load_opti, .com.load_sw
"""
data["aspeed"] = np.gradient(data["angle"]) * sfreq
data["speed"] = data["aspeed"] * wheelsize
data["dist"] = cumulative_trapezoid(data["speed"], initial=0.0) / sfreq
data["acc"] = np.gradient(data["speed"]) * sfreq
data["ftot"] = (data["fx"] ** 2 + data["fy"] ** 2 + data["fz"] ** 2) ** 0.5
data["uforce"] = data["torque"] / rimsize
data["feff"] = (data["uforce"] / data["ftot"]) * 100
data["force"] = data["torque"] / wheelsize
data["power"] = data["torque"] * data["aspeed"]
data["work"] = data["power"] / sfreq
return data
[docs]
def process_ergo(data, wheelsize=0.31, rimsize=0.275, unit="ms"):
"""
Basic processing for ergometer data.
Basic processing for ergometer data (e.g. speed to distance). Should be performed after filtering.
Returned columns:
+------------+----------------------+-----------+
| Column | Data | Unit |
+============+======================+===========+
| time | time | s |
+------------+----------------------+-----------+
| force | force (on wheel) | N |
+------------+----------------------+-----------+
| speed | speed | m/s |
+------------+----------------------+-----------+
| acc | acceleration | m/s^2 |
+------------+----------------------+-----------+
| aspeed | angular velocity | rad/s |
+------------+----------------------+-----------+
| angle | angle | rad |
+------------+----------------------+-----------+
| dist | cumulative distance | m |
+------------+----------------------+-----------+
| power | power | W |
+------------+----------------------+-----------+
| torque | torque around wheel | Nm |
+------------+----------------------+-----------+
| uforce | effective force | N |
+------------+----------------------+-----------+
| work | instantaneous work | J |
+------------+----------------------+-----------+
.. note:: the force column contains force on the wheels, uforce (user force) is force on the handrim
Parameters
----------
data : dict
raw ergometer data
wheelsize : float
wheel radius [m]
rimsize : float
handrim radius [m]
unit : str
unit of measured 'speed' column; ms, kmh or mph
Returns
-------
data : dict
See Also
--------
.com.load_esseda
"""
sfreq = 100 # ergometer is always 100Hz
for side in data:
if unit == "kmh":
data[side]["speed"] /= 3.6
elif unit == "mph":
data[side]["speed"] /= 2.23694
elif unit == "ms":
data[side]["speed"] = data[side]["speed"]
else:
raise Exception("Please specify either 'ms', 'kmh' or 'mph', data not processed!")
data[side]["acc"] = np.gradient(data[side]["speed"]) * sfreq
data[side]["aspeed"] = data[side]["speed"] / wheelsize
data[side]["angle"] = cumulative_trapezoid(data[side]["aspeed"], initial=0.0) / sfreq
data[side]["dist"] = cumulative_trapezoid(data[side]["speed"], initial=0.0) / sfreq
data[side]["power"] = data[side]["speed"] * data[side]["force"]
data[side]["torque"] = data[side]["force"] * wheelsize
data[side]["uforce"] = data[side]["force"] * (wheelsize / rimsize)
data[side]["work"] = data[side]["power"] / sfreq
return data
[docs]
def push_by_push_mw(data, variable="torque", cutoff=0.0, minpeak=5.0, mindist=5, verbose=True):
"""
Push-by-push analysis for measurement wheel data.
Push detection and push-by-push analysis for measurement wheel data. Returns a pandas DataFrame with:
+--------------------+----------------------+-----------+
| Column | Data | Unit |
+====================+======================+===========+
| start/stop/peak | respective indices | |
+--------------------+----------------------+-----------+
| tstart/tstop/tpeak | respective samples | s |
+--------------------+----------------------+-----------+
| cangle | contact angle | rad |
+--------------------+----------------------+-----------+
| cangle_deg | contact angle | degrees |
+--------------------+----------------------+-----------+
| mean/maxpower | power per push | W |
+--------------------+----------------------+-----------+
| mean/maxtorque | torque per push | Nm |
+--------------------+----------------------+-----------+
| mean/maxforce | force per push | N |
+--------------------+----------------------+-----------+
| mean/maxuforce | (rim) force per push | N |
+--------------------+----------------------+-----------+
| mean/maxfeff | feffective per push | % |
+--------------------+----------------------+-----------+
| mean/maxftot | ftotal per push | N |
+--------------------+----------------------+-----------+
| work | work per push | J |
+--------------------+----------------------+-----------+
| cwork | work per cycle | J |
+--------------------+----------------------+-----------+
| negwork | negative work/cycle | J |
+--------------------+----------------------+-----------+
| slope | slope onset to peak | Nm/s |
+--------------------+----------------------+-----------+
| smoothness | mean/peak force | |
+--------------------+----------------------+-----------+
| ptime | push time | s |
+--------------------+----------------------+-----------+
| ctime | cycle time | s |
+--------------------+----------------------+-----------+
| reltime | relative push/cycle | % |
+--------------------+----------------------+-----------+
Parameters
----------
data : pd.DataFrame
measurement wheel DataFrame
variable : str
variable name used for peak (push) detection
cutoff : float
noise level for peak (push) detection
minpeak : float
min peak height for peak (push) detection
mindist : int
minimum sample distance between peak candidates, can be used to speed up algorithm
verbose : Boolean
can be used to print out the number of pushes, default = True
Returns
-------
pbp : pd.DataFrame
push-by-push DataFrame
"""
peaks = find_peaks(data[variable], cutoff, minpeak, mindist)
keys = [
"start",
"stop",
"peak",
"tstart",
"tstop",
"tpeak",
"cangle",
"cangle_deg",
"ptime",
"meanpower",
"maxpower",
"meantorque",
"maxtorque",
"meanuforce",
"maxuforce",
"meanforce",
"maxforce",
"work",
"meanfeff",
"maxfeff",
"meanftot",
"maxftot",
"slope",
"smoothness",
"ctime",
"reltime",
"cwork",
"negwork",
]
pbp = pd.DataFrame(data=np.full((len(peaks["start"]), len(keys)), np.nan), columns=keys) # preallocate dataframe
pbp["start"] = peaks["start"]
pbp["peak"] = peaks["peak"]
pbp["stop"] = peaks["stop"]
pbp["tstart"] = data["time"][pbp["start"]].values # get .values so indices align
pbp["tstop"] = data["time"][pbp["stop"]].values
pbp["tpeak"] = data["time"][pbp["peak"]].values
pbp["ptime"] = pbp["tstop"] - pbp["tstart"].values
pbp["ctime"] = pbp["tstart"].iloc[1:].reset_index(drop=True) - pbp["tstart"].iloc[:-1].reset_index(drop=True)
pbp["reltime"] = (pbp["ptime"] / pbp["ctime"]) * 100
pbp["cangle"] = data["angle"][pbp["stop"]].values - data["angle"][pbp["start"]].values
pbp["cangle_deg"] = np.rad2deg(pbp["cangle"])
bins = pbp[["start", "stop"]].values
bins[:, 1] += 1
push_bins = np.digitize(data.index, bins.ravel()) # slice dataframe from push start:stop
push_group = data.groupby(push_bins)
grouped = push_group.agg(["mean", "max"])[1::2].reset_index(drop=True)
grouped.columns = [f"{col[1]}{col[0]}" for col in grouped.columns] # collapse multiindex to match cols with pbp
mean_max_variables = [var for var in keys if "mean" in var or "max" in var]
pbp[mean_max_variables] = grouped[mean_max_variables]
pbp["slope"] = pbp["maxtorque"] / (pbp["tpeak"] - pbp["tstart"])
pbp["smoothness"] = pbp["meanforce"] / pbp["maxforce"]
pbp["work"] = push_group["work"].sum()[1::2].reset_index(drop=True)
cycle_bins = np.digitize(data.index, pbp["start"].values)
pbp["cwork"] = data[["work"]].groupby(cycle_bins).sum()[1:].reset_index(drop=True)
negative_work = data["work"].copy()
negative_work[negative_work >= 0] = 0
pbp["negwork"] = negative_work.groupby(cycle_bins).sum()[1:].reset_index(drop=True)
pbp.loc[len(pbp) - 1, ["cwork", "negwork"]] = np.nan
if verbose:
print("\n" + "=" * 80 + f"\nFound {len(pbp)} pushes!\n" + "=" * 80 + "\n")
return pbp
[docs]
def push_by_push_ergo(data, variable="power", cutoff=0.0, minpeak=50.0, mindist=5, verbose=True):
"""
Push-by-push analysis for wheelchair ergometer data.
Push detection and push-by-push analysis for ergometer data. Returns a pandas DataFrame with:
+--------------------+-----------------------+-----------+
| Column | Data | Unit |
+====================+=======================+===========+
| start/stop/peak | respective indices | |
+--------------------+-----------------------+-----------+
| tstart/tstop/tpeak | respective samples | s |
+--------------------+-----------------------+-----------+
| cangle | contact angle | rad |
+--------------------+-----------------------+-----------+
| cangle_deg | contact angle | degrees |
+--------------------+-----------------------+-----------+
| mean/maxpower | power per push | W |
+--------------------+-----------------------+-----------+
| mean/maxtorque | torque per push | Nm |
+--------------------+-----------------------+-----------+
| mean/maxforce | force per push | N |
+--------------------+-----------------------+-----------+
| mean/maxuforce | (rim) force per push | N |
+--------------------+-----------------------+-----------+
| mean/maxspeed | velocity per push | ms |
+--------------------+-----------------------+-----------+
| work | work per push | J |
+--------------------+-----------------------+-----------+
| cwork | work per cycle | J |
+--------------------+-----------------------+-----------+
| negwork | negative work/cycle | J |
+--------------------+-----------------------+-----------+
| slope | slope onset to peak | Nm/s |
+--------------------+-----------------------+-----------+
| smoothness | mean/peak force | N |
+--------------------+-----------------------+-----------+
| ptime | push time | s |
+--------------------+-----------------------+-----------+
| ctime | cycle time | s |
+--------------------+-----------------------+-----------+
| reltime | relative push/cycle | % |
+--------------------+-----------------------+-----------+
| pnegpos | neg power start push | index |
+--------------------+-----------------------+-----------+
| negpos | neg power start push | W |
+--------------------+-----------------------+-----------+
| pnegpoe | neg power end push | index |
+--------------------+-----------------------+-----------+
| negpoe | neg power end push | W |
+--------------------+-----------------------+-----------+
Parameters
----------
data : dict
wheelchair ergometer dictionary with left, right and mean DataFrame
variable : str
variable name used for peak (push) detection, default = power
cutoff : float
noise level for peak (push) detection, default = 0
minpeak : float
min peak height for peak (push) detection, default = 50.0
mindist : int
minimum sample distance between peak candidates, can be used to speed up algorithm
verbose : Boolean
can be used to print out the number of pushes for left, right and mean, default = True
Returns
-------
pbp : dict
dictionary with left, right and mean push-by-push DataFrame
"""
pbp_sides = {"left": [], "right": [], "mean": []}
keys = [
"start",
"stop",
"peak",
"tstart",
"tstop",
"tpeak",
"cangle",
"cangle_deg",
"ptime",
"meanpower",
"maxpower",
"meantorque",
"maxtorque",
"meanuforce",
"maxuforce",
"meanforce",
"maxforce",
"meanspeed",
"maxspeed",
"work",
"slope",
"smoothness",
"ctime",
"reltime",
"cwork",
"negwork",
"pnegpos",
"negpos",
"pnegpoe",
"negpoe",
]
for side in data:
if (side == "left") | (side == "right"):
peaks = find_peaks(data[side][variable], cutoff, minpeak, mindist)
else:
peaks = find_peaks(data[side][variable], cutoff, (minpeak * 2), mindist)
pbp = pd.DataFrame(data=np.full((len(peaks["start"]), len(keys)), np.nan), columns=keys) # preallocate
pbp["start"] = peaks["start"]
pbp["peak"] = peaks["peak"]
pbp["stop"] = peaks["stop"]
pbp["tstart"] = data[side]["time"][pbp["start"]].values # get .values so indices align
pbp["tstop"] = data[side]["time"][pbp["stop"]].values
pbp["tpeak"] = data[side]["time"][pbp["peak"]].values
pbp["ptime"] = pbp["tstop"] - pbp["tstart"].values
pbp["ctime"] = pbp["tstart"].iloc[1:].reset_index(drop=True) - pbp["tstart"].iloc[:-1].reset_index(drop=True)
pbp["reltime"] = (pbp["ptime"] / pbp["ctime"]) * 100
pbp["cangle"] = data[side]["angle"][pbp["stop"]].values - data[side]["angle"][pbp["start"]].values
pbp["cangle_deg"] = np.rad2deg(pbp["cangle"])
bins = pbp[["start", "stop"]].values
bins[:, 1] += 1
push_bins = np.digitize(data[side].index, bins.ravel()) # slice dataframe from push start:stop
push_group = data[side].groupby(push_bins)
grouped = push_group.agg(["mean", "max"])[1::2].reset_index(drop=True)
grouped.columns = [f"{col[1]}{col[0]}" for col in grouped.columns] # collapse multiindex to match cols with pbp
mean_max_variables = [var for var in keys if "mean" in var or "max" in var]
pbp[mean_max_variables] = grouped[mean_max_variables]
pbp["slope"] = pbp["maxtorque"] / (pbp["tpeak"] - pbp["tstart"])
pbp["smoothness"] = pbp["meanforce"] / pbp["maxforce"]
pbp["work"] = push_group["work"].sum()[1::2].reset_index(drop=True)
cycle_bins = np.digitize(data[side].index, pbp["start"].values)
pbp["cwork"] = data[side][["work"]].groupby(cycle_bins).sum()[1:].reset_index(drop=True)
negative_work = data[side]["work"].copy()
negative_work[negative_work >= 0] = 0
pbp["negwork"] = negative_work.groupby(cycle_bins).sum()[1:].reset_index(drop=True)
pbp.loc[len(pbp) - 1, ["cwork", "negwork"]] = np.nan
neg_before_all = []
neg_after_all = []
for sample, sample2 in zip(pbp['start'], pbp['stop']):
neg_before = data['mean'].loc[sample - 10:sample]
neg_before_min = neg_before[neg_before['power'] == min(neg_before['power'])]
neg_before_all.append(neg_before_min)
neg_after = data['mean'].loc[sample2:sample2 + 10]
neg_after_min = neg_after[neg_after['power'] == min(neg_after['power'])]
neg_after_all.append(neg_after_min)
for push in range(0, len(pbp)):
pbp.loc[push, "pnegpos"] = neg_before_all[push].index.item()
pbp.loc[push, "negpos"] = neg_before_all[push]['power'].item()
pbp.loc[push, "pnegpoe"] = neg_after_all[push].index.item()
pbp.loc[push, "negpoe"] = neg_after_all[push]['power'].item()
pbp_sides[side] = pd.DataFrame(pbp)
if verbose:
print(
"\n" + "=" * 80 + f"\nFound left: {len(pbp_sides['left'])} , "
f"right: {len(pbp_sides['right'])} and "
f"mean: {len(pbp_sides['mean'])} pushes!\n" + "=" * 80 + "\n"
)
return pbp_sides
[docs]
def camber_correct(data, ang):
"""Correct for camber angle in measurement wheel data
Parameters
----------
data : pd.DataFrame()
measurement wheel data with forces/torques in 3D
ang : int
camber angle to correct
Returns
-------
data: pd.DataFrame()
measurement wheel data with forces/torques in 3D
corrected for camber angle
"""
force = data[["fx", "fy", "fz"]].T
torques = data[["mx", "my", "torque"]].T
rotmat = rotate_matrix(ang / (180 * np.pi), axis="x")
data[["fx", "fy", "fz"]] = np.dot(rotmat, force).T
data[["mx", "my", "torque"]] = np.dot(rotmat, torques).T
return data