import pandas as pd
from typing import Callable, List, Dict, Tuple
import statistics
import numpy as np
import matplotlib.pyplot as plt
from skimage import io, color
from skimage.filters import threshold_multiotsu, threshold_yen
import warnings
from tqdm import tqdm
[docs]
def multi_threshold_with_nan_by_column(matrix, num_thresholds, algorithm='otsu'):
# Apply thresholds and segmentation column-wise
"""
The matrix quantization algorithm.
:param np.ndarray matrix: Input matrix.
:param int num_thresholds: number of quantized levels.
:param str algorithm: quantization algorithm type 'otsu|linspace|yen' (default: 'otsu').
:returns: The quantized array and the thresholds used.
:rtype: Tuple[np.ndarray, c]
:example:
.. code-block:: python
# Example usage
data_matrix = np.array([[1.2, 2.1, 3.6, np.nan],
[4.5, 5.7, np.nan, 6.3],
[4.1, 8.4, 3.0, 10.2],
[7.7, np.nan, 9, 10],
[2.9, 12.5, 8.2, 1.0],
[1.1, 2.2, 9.0, np.nan]])
num_thresholds = 3 # Adjust the number of thresholds as needed
segmented_matrix, thres = multi_threshold_with_nan_by_column(data_matrix, num_thresholds, mode='otsu')
"""
segmented_matrix = np.empty_like(matrix, dtype=float)
segmented_matrix.fill(0)
thresholds = []
for col_idx in tqdm(range(matrix.shape[1])):
col_digitize = np.empty_like(matrix[:,col_idx], dtype=int)
col_digitize.fill(0)
col = matrix[:, col_idx]
valid_values = col[~np.isnan(col)]
# if all NaN in columns, left it as it is
if len(valid_values) == 0:
segmented_matrix[:,col_idx] = col
continue # Skip if all values in the column are NaN
if algorithm == 'otsu':
cnt = len(set(valid_values))
# if there are less distint valid_values than bins ... otsu raise an error.
if cnt < num_thresholds:
warnings.warn('Too fews distint values in column which is less then binarization values... using linspace mode!')
thresh = np.linspace(np.nanmin(valid_values), np.nanmax(valid_values), num_thresholds + 1)[1:-1]
else:
thresh = threshold_multiotsu(valid_values, num_thresholds)
elif algorithm == 'linspace':
thresh = np.linspace(np.nanmin(valid_values), np.nanmax(valid_values), num_thresholds + 1)[1:-1]
elif algorithm == 'yen':
if num_thresholds > 2:
warnings.warn('Yen thresholding only support one threshow (binary separation)... using linspace mode!')
thresh = np.linspace(np.nanmin(valid_values), np.nanmax(valid_values), num_thresholds + 1)[1:-1]
else:
thresh = np.array([threshold_yen(valid_values)])
else:
raise Exception("Thresholding method not supported")
thresholds += [thresh]
# Define a placeholder value (can be any value not present in the array)
placeholder_value = -1
# Replace NaN values with the placeholder
col_no_nan = np.where(np.isnan(col), placeholder_value, col)
# Perform digitization on the modified array
for i, threshold in enumerate(thresh):
col_digitize[(col_no_nan[:] > threshold)] = i + 1
# Revert the placeholder values back to NaN
col_digitize_with_nan = np.where(col_no_nan == placeholder_value, np.nan, col_digitize)
segmented_matrix[:,col_idx] = col_digitize_with_nan
return segmented_matrix, thresholds
[docs]
def rows_with_all_nan(df):
"""
Computes the mode of an array along each row. In case of ex-aequo modes, return the value computed by reducefoo (default: max).
:param pd.DatFrame df: the input DataFrame.
:returns: the array of indices in DataFrame with all NaNs.
:rtype: np.ndarray
"""
idx = df.index[df.isnull().all(1)]
nans = df.loc[idx]
return idx.values
[docs]
def modemax_nan(a: np.ndarray, reducefoo: Callable[[List[int]], int] = max) -> np.ndarray:
"""
Computes the mode of an array along each row. In case of ex-aequo modes, return the value computed by reducefoo (default: max).
:param np.ndarray a: Input 2D array.
:param Callable[[List[int]], int] reducefoo: Reduction function (default: max).
:returns: Mode values.
:rtype: np.ndarray
:example:
.. code-block:: python
# Example usage
input_array = np.array([[np.nan, 2, 3, 3, 5, 5],
[2, np.nan, 4, 4, 6, 6],
[4, 5, 6, 6, 8, 8]])
mode_values = modemax_nan(input_array, min)
"""
res = []
m = a.max()
for x in range(a.shape[0]):
modes = statistics.multimode([x for x in a[x,:] if ~np.isnan(x)])
if modes == []:
res += [np.nan]
else:
res += [reducefoo(modes)] # if max(modes) is not np.nan else sorted(set(modes))[-2]]
return np.array(res)
[docs]
def labelling_core(df: pd.DataFrame, columns: List[str] = [], n_classes: int=2,
verbose: bool = False, labelnames: Dict[int, str] = {0: 'E', 1: 'NE'},
mode='flat-multi', algorithm='otsu', rowname: str = 'gene', colname: str = 'label',
reducefoo: Callable[[List[int]], int]=max) -> Tuple[pd.DataFrame, np.ndarray]:
"""
Core function for HELP labelling algorithm.
:param pd.DataFrame df: Input DataFrame.
:param List[str] columns: List of column names used for labelling computation.
:param int n_classes: Number of classes for 'flat-multi' labelling mode,
In 'two-by-two' mode the param is ignored: two binary labelling are computed
(default: 2).
:param bool verbose: Verbosity level for printing information (default: False).
:param Dict[int, str] labelnames: Dictionary mapping class labels to names (default: {0: 'E', 1: 'NE'}).
:param str mode: quantization modes: 'flat-multi' - the quantization applies to all matrix (the mode on all rows give the labels).
'two-by-two' - a binary quantization is done on all matrix (the mode on all rows give the binary labels),
then a binary quantization is applied only on 1-label rows (the mode on these rows give the binary labels 1 and 2)
(default: 'flat-multi').
:param str algorithm: quantization algorithm type otsu|linspace (default: 'otsu').
:param str rowname: Name of the DataFrame index (default: 'gene').
:param str colname: Name of the label column (default: 'label').
:param Callable[[List[int]], int] reducefoo: function used for solving ex-aequo in mode.
:returns: Output labelled DataFrame and quantized array.
:rtype: Tuple[pd.DataFrame, np.ndarray]
:example
.. code-block:: python
# Example usage
from HELPpy.models.labelling import labelling_core
input_df = pd.DataFrame(...)
columns_list = ['feature1', 'feature2', 'feature3']
output_df, quantized_array = labelling_core(input_df, columns=[], labelnames={0: 'E', 1: 'NE'}, n_classes, algorithm='otsu', mode='flat-multi')
"""
# Extract data from the dataframe
if columns == []:
T = df.to_numpy()
else:
T = df[columns].to_numpy()
if mode == 'two-by-two':
if verbose: print("[two-by-two]: 1. Two-class labelling:")
# Perform quantization
Q, Thr = multi_threshold_with_nan_by_column(T, 2, algorithm=algorithm)
Labels2 = modemax_nan(Q, reducefoo=reducefoo)
if verbose: print(Labels2.shape)
dfout = pd.DataFrame(index=df.index)
dfout.index.name = rowname
dfout[colname] = Labels2
if verbose: print("[two-by-two]: 2. Two-class labelling on 1-label rows:")
NE_genes = dfout[dfout[colname]==1].index
if columns == []:
TNE = df.loc[NE_genes].to_numpy()
else:
TNE = df[columns].loc[NE_genes].to_numpy()
NumberOfClasses = 2
QNE, ThrNE = multi_threshold_with_nan_by_column(TNE, 2, algorithm=algorithm)
Labels = modemax_nan(QNE, reducefoo=reducefoo)
if verbose: print(Labels.shape)
dfout2 = pd.DataFrame(index=NE_genes)
dfout2.index.name = rowname
dfout2[colname] = Labels
dfout2 = dfout2.replace({0: 1, 1: 2})
dfout.loc[dfout.index.isin(dfout2.index), [colname]] = dfout2[[colname]]
dfout = dfout.replace(labelnames)
elif mode == 'flat-multi':
if verbose: print("[flat-multi]: 1. multi-class labelling:")
# Perform quantization
Q, Thr = multi_threshold_with_nan_by_column(T, n_classes, algorithm=algorithm)
Labels = modemax_nan(Q, reducefoo=reducefoo)
dfout = pd.DataFrame(index=df.index)
dfout.index.name = rowname
dfout[colname] = Labels
else:
raise Exception("Labelling mode not supported!")
return dfout
[docs]
def labelling(df: pd.DataFrame, columns: List[List[str]] = [], n_classes: int=2,
verbose: bool = False, labelnames: Dict[int, str] = {1 : 'NE', 0: 'E'},
mode='flat-multi', rowname: str = 'gene', colname: str = 'label', algorithm='otsu', reducefoo: Callable[[List[int]], int]=max) -> pd.DataFrame:
"""
Main function for HELP labelling algorithm. Genes are labelled based on a selection of columns in the CRISPR DataFrame (columns=[line1, ..., lineN]).
By default (columns=[]) and the labelling algorithm uses all columns of CRISPR DataFrame. If the columns=[[list1_of_lines], ..., [listN_of_lines]] argument is a list of list,
it represents a partition of CRISPR lines: in this case the labelling is computed in each partition adn then the mode of mode is
applied to compute the final gene labels.
=[[...], ..., [...]]
:param pd.DataFrame df: Input DataFrame.
:param List[List[str]] columns: List of column names in DataFrame used for labelling (default: []).
:param bool three_class: Flag for three-class labeling (default: False).
:param bool verbose: Verbosity level for printing information (default: False).
:param Dict[int, str] labelnames: Dictionary mapping class labels to names (default: {}).
:param str mode: quantization modes: 'flat-multi' - the quantization applies to all matrix (the mode on all rows give the labels).
'two-by-two' - a binary quantization is done on all matrix (the mode on all rows give the binary labels),
then a binary quantization is applied only on 1-label rows (the mode on these rows give the binary labels 1 and 2)
(default: 'flat-multi').
:param str rowname: Name of the DataFrame index (default: 'gene').
:param str colname: Name of the label column (default: 'label').
:param str algorithm: quantization algorithm type otsu|linspace (default: 'otsu').
:param Callable[[List[int]], int] reducefoo: function used for solving ex-aequo in mode.
:returns: Output DataFrame with labels.
:rtype: pd.DataFrame
:example:
.. code-block:: python
# Example usage
from HELPpy.models.labelling import labelling
input_df = pd.DataFrame(...)
output_df = labelling(input_df, columns=[], n_classes=2, labelnames={0: 'E', 1: 'NE'}, algorithm='otsu', mode='flat-multi')
"""
if mode=='two-by-two': n_classes = 3
assert len(labelnames) == n_classes, "Label dictionary not same size of no. of classes!"
if all(isinstance(sub, list) for sub in columns) and len(columns) > 0: # use mode of mode across tissues
# Mode per groups
if verbose: print(f'performing mode of mode on {n_classes}-class labelling ({mode}).')
L_tot = np.empty(shape=(len(df), 0))
for lines in columns:
# check if there are rows with all Nans
nanrows = rows_with_all_nan(df[lines])
if len(nanrows) and verbose:
warnings.warn("There are rows with all NaNs, please remove them using the function 'rows_with_all_nan()' and re-apply the labelling. Otherwise you will have NaN labels in your output.")
labels = labelling_core(df, columns=lines, verbose=verbose, mode=mode,
labelnames=labelnames, rowname=rowname, colname=colname,
n_classes=n_classes, algorithm=algorithm, reducefoo=reducefoo)
labels = labels.replace(dict(map(reversed, labelnames.items()))).infer_objects()
L_tot = np.hstack((L_tot, labels.values))
# Execute mode on each tissue and sort'em
modeOfmode = modemax_nan(L_tot, reducefoo=reducefoo)
dfout = pd.DataFrame(index=df[sum(columns, [])].index)
dfout.index.name = rowname
dfout[colname] = modeOfmode
dfout = dfout.replace(labelnames)
elif any(isinstance(sub, list) for sub in columns) and len(columns) > 0:
raise Exception("Wrong columns partition format.")
else:
if verbose: print(f'performing flat mode on {n_classes}-class labelling ({mode}).')
nanrows = rows_with_all_nan(df[columns])
if len(nanrows) > 0:
warnings.warn("There are rows with all NaNs, please remove them using the function 'rows_with_all_nan()' and re-apply the labelling. Otherwise you will ha NaN labels in your output.")
dfout = labelling_core(df, columns=columns, verbose=verbose, mode=mode,
labelnames=labelnames, rowname=rowname, colname=colname,
n_classes=n_classes,algorithm=algorithm, reducefoo=reducefoo)
dfout.index.name = rowname
dfout = dfout.replace(labelnames)
if verbose: print(dfout.value_counts())
return dfout