Source code for featuremap.core_transition_states

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Nov  1 13:52:14 2023

"""


import anndata as ad
from anndata import AnnData
# from quasildr.structdr import Scms
import numpy as np
import time
import matplotlib.pyplot as plt
import scanpy as sc
import pandas as pd
import networkx as nx

import seaborn as sns

from featuremap.featuremap_ import nearest_neighbors

from scipy.sparse.csgraph import shortest_path
from scipy.sparse import csr_matrix

from sklearn.neighbors import NearestNeighbors
from scipy.stats import norm as normal

[docs] def kernel_density_estimate(data, X, bw=0.5, min_radius=5, output_onlylogp=False, ): """ Density estimation for data points specified by X with kernel density estimation. Parameters ---------- data : array of shape (n_samples, n_features) 2D array including data points. Input to density estimation. X : array 2D array including multiple data points. Input to density estimation. output_onlylogp : bool If true, returns logp, else returns p, g, h, msu. Returns ------- p : array 1D array. Unnormalized probability density. The probability density is not normalized due to numerical stability. Exact log probability """ nbrs = NearestNeighbors(n_neighbors=min_radius + 1).fit(data) adaptive_bw = np.maximum(nbrs.kneighbors(data)[0][:, -1], bw) # the number of data points and the dimensionality n, d = data.shape from scipy.spatial.distance import cdist # compare euclidean distances between each pair of data and X D = cdist(data, X) # and evaluate the kernel at each distance # prevent numerical overflow due to large exponentials logc = -d * np.log(np.min(adaptive_bw)) - d / 2 * np.log(2 * np.pi) C = (adaptive_bw[:, np.newaxis] / np.min(adaptive_bw)) ** (-d) * \ np.exp(-1 / 2. * (D / adaptive_bw[:, np.newaxis]) ** 2) if output_onlylogp: # return the kernel density estimate return np.log(np.mean(C, axis=0).T) + logc else: return np.mean(C, axis=0).T
[docs] def plot_density( adata: AnnData, emb = 'featmap', ): """ Plot the density of the embedding space. Parameters ---------- adata : AnnData Annotated data matrix. emb : str The embedding space to plot the density. """ data = adata.obsm[f'X_{emb}'].copy() # Exclude one leiden cluster; min_x = min(data[:, 0]) max_x = max(data[:, 0]) min_y = min(data[:, 1]) max_y = max(data[:, 1]) # part = 200 if data.shape[0] < 5000: num_grid_point = data.shape[0] * 0.5 else: num_grid_point = 2000 x_range = max_x - min_x y_range = max_y - min_y # x_range = 1 - 0.618 # y_range = 0.618 part_y = np.sqrt(num_grid_point / x_range * y_range) part_x = x_range / y_range * part_y # part_y = 60 # part_x = 60 # Assign num of grid points mort to vertical direction ?? xv, yv = np.meshgrid(np.linspace(min_x, max_x, round(part_x)), np.linspace(min_y, max_y, round(part_y)), sparse=False, indexing='ij') # xv, yv = np.meshgrid(np.linspace(-10, 10, part), np.linspace(-10, 15, part), # sparse=False, indexing='ij') grid_contour = np.column_stack([np.concatenate(xv), np.concatenate(yv)]) # T1 = time.time() # p1, g1, h1, msu,_ = s._kernel_density_estimate_anisotropic(grid_contour, rotational_matrix, r_emb) p1 = kernel_density_estimate(data=data, X=grid_contour, output_onlylogp=False, ) # T2 = time.time() # print('Finish kernel_density_estimate_anisotropic in ' + str(T2-T1)) # ifilter_1 = np.where(p1 >= (np.max(p1)*0.05))[0] # sampling # fig, ax = plt.subplots(figsize=(15, 15)) plt.contourf(xv, yv, p1.reshape(round(part_x), round(part_y)), levels=20, cmap='Blues') plt.xticks([]) plt.yticks([]) plt.show() plt.clf()
[docs] def compute_density( adata:AnnData, emb='featmap', cluster_key='leiden', density_key='density', quantile_core = 0.5, quantile_trans = 0.2, ): """ Identify the core state and transition state in the embedding space. Parameters ---------- adata : AnnData Annotated data matrix. emb : str The embedding space to plot the density. cluster_key : str The key of clusters in adata.obs. top_quantile : float The top quantile of the density. """ import scanpy as sc # adata.obs['clusters'] = adata.obs['clusters_fine'] # if there is no clusters in obs, then use leiden if cluster_key not in adata.obs.keys(): cluster_key = 'leiden' # Clusters by leiden import scanpy as sc sc.pp.neighbors(adata, n_neighbors=30,) sc.tl.leiden(adata, resolution=0.5) partition_label = adata.obs[cluster_key].copy() partition_label.value_counts() # print(partition_label.value_counts()) data = adata.obsm[f'X_{emb}'].copy() p= kernel_density_estimate(data, data) # normalize the density by min-max scaling p = (p - np.min(p)) / (np.max(p) - np.min(p)) adata.obs['density'] = p # Density in each cluster adata.obs[density_key] = np.nan leiden_clusters = adata.obs[cluster_key].copy() leiden_clusters.value_counts() for cluster in leiden_clusters.cat.categories.values: cluster_in_cluster_label = (leiden_clusters == cluster) data_cluster = data[cluster_in_cluster_label, :] p_1 = kernel_density_estimate(data_cluster, data_cluster) # adata.obs['density_separate_cluster'][cluster_in_cluster_label] = p_1 # normalize the density by min-max scaling p_1 = (p_1 - np.min(p_1)) / (np.max(p_1) - np.min(p_1)) adata.obs.loc[cluster_in_cluster_label, density_key] = p_1 density = adata.obs[density_key][cluster_in_cluster_label] # Select top ratio(%) in each cluster as core state leiden_clusters = adata.obs[cluster_key].copy() adata.obs['density_core'] = np.nan adata.obs['density_largest'] = np.nan adata.obs['density_transition'] = np.nan for cluster in leiden_clusters.cat.categories.values: cluster_in_cluster_label = (leiden_clusters == cluster) density = adata.obs[density_key][cluster_in_cluster_label].copy() # density = adata.obs['density'][cluster_in_cluster_label] cluster_index = leiden_clusters.index[leiden_clusters == cluster] density_sort = density[cluster_index].sort_values(ascending=False) if int(len(cluster_index) * (1-quantile_core)) < 50: density_sort_topper_index = density_sort.index[:50] density_sort_topper_value = density_sort[:50] else: density_sort_topper_index = density_sort.index[:int(len(cluster_index) * (1-quantile_core))] density_sort_topper_value = density_sort[:int(len(cluster_index) * (1-quantile_core))] adata.obs.loc[density_sort_topper_index, 'density_core_id'] = cluster adata.obs.loc[density_sort_topper_index, 'density_core'] = density_sort_topper_value if int(len(cluster_index) * quantile_trans) < 50: density_sort_bottom_index = density_sort.index[-50:] density_sort_bottom_value = density_sort[-50:] else: density_sort_bottom_index = density_sort.index[-int(len(cluster_index) * (quantile_trans)):] density_sort_bottom_value = density_sort[-int(len(cluster_index) * (quantile_trans)):] adata.obs.loc[density_sort_bottom_index, 'density_transition_id'] = cluster adata.obs.loc[density_sort_bottom_index, 'density_transition'] = density_sort_bottom_value # non-corestate # density_sort_rest_index = density_sort.index[int(len(cluster_index) * 0.2):] # adata.obs['corestates'][density_sort_rest_index] = f'{cluster} Rest' density_sort_largest_index = density_sort.index[:1] # adata.obs['corestates_largest'][density_sort_largest_index] = cluster adata.obs.loc[density_sort_largest_index, 'density_largest'] = cluster # adata.obs['corestates'] = pd.Series(adata.obs['corestates'].copy(), dtype='category').values # # Expand the core state by NNs # from featuremap.featuremap_ import nearest_neighbors # n_neighbors = 15 # knn_indices, _, _ = nearest_neighbors(adata.obsm[f'X_{emb}'].copy(), n_neighbors=n_neighbors, # metric="euclidean", metric_kwds={}, angular=False, random_state=42) # # corestates_nn_points coresponding to clusters # # initialize an empty dataframe # df_temp = pd.DataFrame(index=adata.obs_names, columns=['density_filter']) # df_temp['density_filter'] = np.nan # for cluster in leiden_clusters.cat.categories.values: # corestates_points = np.where(adata.obs['density_filter'] == cluster)[0] # corestates_nn_points = np.unique(knn_indices[corestates_points].reshape(-1)) # df_temp.loc[adata.obs_names[corestates_nn_points], 'density_filter'] = '1' # # adata.obs.loc[adata.obs_names[corestates_nn_points], 'corestates_nn_points'] = cluster # adata.obs['density_filter'] = df_temp['density_filter'] # adata.obs['density_filter'] = pd.Categorical(adata.obs['density_filter'], categories=adata.obs[cluster_key].cat.categories, ordered=True) sc.pl.embedding(adata, emb, color=['density_core'],) sc.pl.embedding(adata, emb, color=['density_transition'],)
# # corestates_nn_points: binary # adata.obs['density_filter_binary'] = 0 # corestates_points = np.where(adata.obs['density_filter'].notna())[0] # corestates_points = np.unique(corestates_points.reshape(-1)) # corestates_binary = np.isin(np.array(range(adata.shape[0])), corestates_points) * 1 # adata.obs['density_filter_points'] = corestates_binary # adata.obs['core_trans_states'] = '0' # corestate_points = np.where(adata.obs['density_filter_points']==1)[0] # adata.obs.loc[adata.obs_names[corestate_points],'core_trans_states'] = '1' # sc.pl.embedding(adata, emb, color=['core_trans_states'])
[docs] def compute_density_0( adata:AnnData, emb='featmap', cluster_key='leiden', density_key='density', quantile_core = 0.5, quantile_trans = 0.2, ): """ Identify the core state and transition state in the embedding space. Parameters ---------- adata : AnnData Annotated data matrix. emb : str The embedding space to plot the density. cluster_key : str The key of clusters in adata.obs. top_quantile : float The top quantile of the density. """ import scanpy as sc # adata.obs['clusters'] = adata.obs['clusters_fine'] # if there is no clusters in obs, then use leiden if cluster_key not in adata.obs.keys(): cluster_key = 'leiden' # Clusters by leiden import scanpy as sc sc.pp.neighbors(adata, n_neighbors=30,) sc.tl.leiden(adata, resolution=0.5) partition_label = adata.obs[cluster_key].copy() partition_label.value_counts() # print(partition_label.value_counts()) data = adata.obsm[f'X_{emb}'].copy() p= kernel_density_estimate(data, data) # normalize the density by min-max scaling p = (p - np.min(p)) / (np.max(p) - np.min(p)) adata.obs['density'] = p # Density in each cluster adata.obs[density_key] = np.nan leiden_clusters = adata.obs[cluster_key].copy() leiden_clusters.value_counts() for cluster in leiden_clusters.cat.categories.values: cluster_in_cluster_label = (leiden_clusters == cluster) data_cluster = data[cluster_in_cluster_label, :] p_1 = kernel_density_estimate(data_cluster, data_cluster) # adata.obs['density_separate_cluster'][cluster_in_cluster_label] = p_1 # normalize the density by min-max scaling p_1 = (p_1 - np.min(p_1)) / (np.max(p_1) - np.min(p_1)) adata.obs.loc[cluster_in_cluster_label, density_key] = p_1 sc.pl.embedding(adata, emb, color=[density_key,]) density_filter = adata.obs[density_key].copy() # get the 0.8 quantile of curvature quantile_08 = np.quantile(density_filter, quantile_core) # set curvature to Nan for the nodes with curvature less than quantile_08 density_filter[density_filter<quantile_08] = np.nan adata.obs['density_core'] = density_filter density_filter = adata.obs[density_key].copy() # get the 0.8 quantile of curvature quantile_08 = np.quantile(density_filter, quantile_trans) # set curvature to Nan for the nodes with curvature less than quantile_08 density_filter[density_filter>quantile_08] = np.nan adata.obs['density_transition'] = density_filter sc.pl.embedding(adata, emb, color=['density_core'],) sc.pl.embedding(adata, emb, color=['density_transition'],)
######################################################## # Collect trasition state and core state given clusters ##############################################################
[docs] def nodes_of_transition_states(adata, start_state, end_state, clusters): """ Collect the nodes of transition states given the start and end state. Parameters ---------- adata : AnnData Annotated data matrix. start_state : str The start state of the transition. end_state : str The end state of the transition. clusters : list The list of clusters in the data. Returns ------- path_nodes : np.array The nodes of the path from start to end state. path_points_nn : np.array The points of the path from start to end state. end_bridge_points : np.array The points of the end bridge. core_points : np.array The points of the core states. transition_points : np.array The points of the transition states. """ node_name_start = adata.obs['corestates_largest'][adata.obs['corestates_largest'] == (start_state)].index[0] start = np.where(adata.obs_names == node_name_start)[0][0] node_name_end = adata.obs['corestates_largest'][adata.obs['corestates_largest'] == (end_state)].index[0] end = np.where(adata.obs_names == node_name_end)[0][0] # Spanning tree on embedding space ridge_points = np.where(np.array(adata.obs['trajectory_points'])==1)[0] corestate_points = np.where(pd.isna((adata.obs['corestates_largest'])) == False)[0] # Points for tree tree_points = np.union1d(ridge_points, corestate_points) mst_subg = mst_subgraph(adata, tree_points, emb='X_featmap') mst_subg.clusters().summary() start_id = mst_subg.vs.find(name=start).index end_id = mst_subg.vs.find(name=end).index path_given_start_end = mst_subg.get_shortest_paths(v=start_id, to=end_id) path_nodes_name = np.array([mst_subg.vs[i]['name'] for i in path_given_start_end]) # Extend the path to both ends in trajectory nodes_start_state = np.where(np.array(adata.obs['clusters'] == str(start_state)) == True)[0] nodes_start_ridge = ridge_points[np.where(np.in1d(ridge_points, nodes_start_state))[0]] nodes_end_state = np.where(np.array(adata.obs['clusters'] == str(end_state)) == True)[0] nodes_end_ridge = ridge_points[np.where(np.in1d(ridge_points, nodes_end_state))[0]] node_corestate_start = adata.obs['corestates'][adata.obs['corestates_largest'] == start_state].index corestate_start = np.where(np.in1d(adata.obs_names, node_corestate_start))[0] node_corestate_end = adata.obs['corestates'][adata.obs['corestates_largest'] == end_state].index corestate_end = np.where(np.in1d(adata.obs_names, node_corestate_end))[0] from functools import reduce path_nodes = reduce(np.union1d, (path_nodes_name, corestate_start, corestate_end, nodes_start_ridge, nodes_end_ridge)) path_binary = np.isin(np.array(range(adata.shape[0])), path_nodes) adata.obs['path_binary'] = (path_binary * 1).astype(int) sc.pl.embedding(adata, 'featmap', legend_loc='on data', s=10, color=['path_binary'],cmap='bwr') # sc.pl.embedding(adata_var, 'umap_v', legend_loc='on data', s=10, color=['path_binary']) from featuremap.featuremap_ import nearest_neighbors knn_indices, knn_dists, _ = nearest_neighbors(adata.obsm['X_featmap'].copy(), n_neighbors=60, metric="euclidean", metric_kwds={}, angular=False, random_state=42) path_nodes_nn = np.unique(knn_indices[path_nodes].reshape(-1)) core_nodes = np.array([]).astype(int) for cluster in clusters: core_nodes = np.append(core_nodes, np.where(adata.obs['corestates'] == str(cluster))[0]) knn_indices, knn_dists, _ = nearest_neighbors(adata.obsm['X_featmap'].copy(), n_neighbors=60, metric="euclidean", metric_kwds={}, angular=False, random_state=42) core_points = np.unique(knn_indices[core_nodes].reshape(-1)) path_points_nn = np.union1d(path_nodes_nn, core_points) path_points_binary = np.isin(np.array(range(adata.shape[0])), path_points_nn) * 1 adata.obs['path_points_nn'] = path_points_binary sc.pl.embedding(adata, 'featmap', legend_loc='on data', s=10, color=['path_points_nn'],cmap='bwr') end_bridge_nodes = reduce(np.union1d, (path_nodes_name, corestate_start, corestate_end)) end_bridge_nodes = np.unique(knn_indices[end_bridge_nodes].reshape(-1)) transition_points = end_bridge_nodes end_bridge_points = np.union1d(end_bridge_nodes, core_points) # end_bridge_points_binary = np.isin(np.array(range(adata.shape[0])), end_bridge_points) * 1 # adata.obs['end_bridge_points'] = end_bridge_points_binary # sc.pl.embedding(adata, 'featmap', legend_loc='on data', s=10, color=['end_bridge_points'],cmap=cmp('bwr')) adata.obs['core_trans_temp'] = np.nan adata.obs['core_trans_temp'][end_bridge_points] = '0' adata.obs['core_trans_temp'][core_points] = '1' sc.pl.embedding(adata, 'featmap', color=['core_trans_temp']) return path_nodes, path_points_nn, end_bridge_points, core_points, transition_points
# def ridge_estimation( # adata:AnnData # ): # data = adata.obsm['X_featmap'].copy() # Exclude one leiden cluster; # # data = adata_var.obsm['X_umap_v'] # pos_collection = [] # # for sample_time in range(20): # s = Scms(data, 0.5, min_radius=5) # p, _, h, msu = s._kernel_density_estimate(data) # ifilter_2 = np.where(p >= (np.max(p)*0.05))[0] # sampling # # shifted = np.append(grid_contour[ifilter_1, :],data[ifilter_2, :], axis=0) # shifted = data[ifilter_2,:] # inverse_sample_index = s.inverse_density_sampling(shifted, n_samples=200, n_jobs=1, batch_size=16) # shifted = shifted[inverse_sample_index] # n_iterations = 200 # allshiftedx_grid = np.zeros((shifted.shape[0],n_iterations)) # allshiftedy_grid = np.zeros((shifted.shape[0],n_iterations)) # for j in range(n_iterations): # allshiftedx_grid[:,j] = shifted[:,0] # allshiftedy_grid[:,j] = shifted[:,1] # shifted += 1*s.scms_update(shifted,method='GradientLogP',stepsize=0.02, relaxation=0.5)[0] # pos = np.column_stack([allshiftedx_grid[:,-1], allshiftedy_grid[:,-1]]) # pos_collection.append(pos) # pos = np.array(pos_collection).reshape(-1,2) # p_pos, _, _, _ = s._kernel_density_estimate(pos) # pos_filter_idx = np.where(p_pos >= (np.max(p_pos)*0.1))[0] # sampling # pos_filter = pos[pos_filter_idx] # # Plot the ridge # s = Scms(data, 0.5, min_radius=5) # min_x = min(data[:, 0]) # max_x = max(data[:, 0]) # min_y = min(data[:, 1]) # max_y = max(data[:, 1]) # # part = 200 # num_grid_point = data.shape[0] * 0.5 # x_range = max_x - min_x # y_range = max_y - min_y # # x_range = 1 - 0.618 # # y_range = 0.618 # part_y = np.sqrt(num_grid_point / x_range * y_range) # part_x = x_range / y_range * part_y # # Assign num of grid points mort to vertical direction ?? # xv, yv = np.meshgrid(np.linspace(min_x, max_x, round(part_x)), np.linspace(min_y, max_y, round(part_y)), # sparse=False, indexing='ij') # grid_contour = np.column_stack([np.concatenate(xv), np.concatenate(yv)]) # p1, g1, h1, msu = s._kernel_density_estimate(grid_contour, output_onlylogp=False, ) # plt.contourf(xv, yv, p1.reshape( # round(part_x), round(part_y)), levels=20, cmap='Blues') # plt.scatter(data[:,0],data[:,1], s=1, c='darkgrey', alpha=0.1) # plt.scatter(pos_filter[:,0],pos_filter[:,1],c="red", s=1) # plt.xticks([]) # plt.yticks([]) # plt.show() # plt.clf() # from scipy.sparse.csgraph import shortest_path, dijkstra
[docs] def mst_subgraph(adata, tree_points, emb='X_featmap'): """ Construct the minimum spanning tree over the tree points. Parameters ---------- adata tree_points : np.array Points included in the induced subgraph Returns ------- mst_subg : igraph minimum spanning_tree over tree_points (anchors). """ # # M = adata.obsp['emb_dists'].copy().toarray() # M = adata_var.obsm['knn_dists'].copy().toarray() # graph = csr_matrix(M) # knn graph # dist_matrix, predecessors = dijkstra( # csgraph=graph, directed=False, return_predecessors=True) # dist_mat = dist_matrix # g = sc._utils.get_igraph_from_adjacency(dist_mat) # Complete graph from pairwise distance # g.vs["name"] = range(M.shape[0]) # 'name' to store original point id # g_induced_subg = g.induced_subgraph(tree_points) # mst_subg = g_induced_subg.spanning_tree(weights=g_induced_subg.es["weight"]) n_neighbors = 60 knn_indices, knn_dists, _ = nearest_neighbors(adata.obsm[emb][tree_points].copy(), n_neighbors=n_neighbors, metric="euclidean", metric_kwds={}, angular=False, random_state=42) # Pairwise distance by knn indices and knn distances dist_mat = np.zeros([tree_points.shape[0], tree_points.shape[0]]) for i in range(tree_points.shape[0]): for j in range(n_neighbors): dist_mat[i, knn_indices[i,j]] += knn_dists[i,j] # knn graph by iGraph g = sc._utils.get_igraph_from_adjacency(dist_mat) # Complete graph from pairwise distance g.vs["name"] = tree_points # 'name' to store original point id # g_induced_subg = g.induced_subgraph(tree_points) mst_subg = g.spanning_tree(weights=g.es["weight"]) return mst_subg
[docs] def ridge_pseudotime(adata, root, plot='featmap'): """ Compute the pseudotime along the ridge path. Parameters ---------- adata : AnnData Annotated data matrix. root : str The root of the ridge path. plot : str The embedding space to plot the pseudotime. Returns ------- adata.obs['ridge_pseudotime'] : np.array The pseudotime along the ridge path. """ from scipy.special import expit from sklearn.preprocessing import scale # Construct mst subgraph ridge_points = np.where(np.array(adata.obs['trajectory_points'])==1)[0] corestate_points = np.where(pd.isna((adata.obs['corestates_largest'])) == False)[0] tree_points = np.union1d(ridge_points, corestate_points) mst_subg = mst_subgraph(adata, tree_points, emb='X_featmap') farthest_points = mst_subg.farthest_points() # (34, 174, 140) farthest_points = np.array(farthest_points[:2]) farthest_path = mst_subg.get_shortest_paths(v=farthest_points[0], to=farthest_points[1]) farthest_path_name = np.array([mst_subg.vs[i]['name'] for i in farthest_path]) farthest_path_binary = np.isin(np.array(range(adata.shape[0])), farthest_path_name) adata.obs['farthest_path'] = (farthest_path_binary * 1).astype(int) sc.pl.embedding(adata, plot, legend_loc='on data', s=100, color=['farthest_path','trajectory_points']) # sc.pl.embedding(adata, 'featmap', color=['leiden','corestates','farthest_path','trajectory_points']) # Set the starting point if root is None: start = farthest_points[0] else: # root_index = adata.obs['corestates_largest'][adata.obs['corestates_largest'] == root].index[0] # root_id = np.where(adata.obs_names == root_index)[0][0] start = np.where(mst_subg.vs['name'] == root)[0][0] # start = start dist_from_start = mst_subg.shortest_paths(start, weights="weight") nodes_in_tree = np.array([mst_subg.vs[i]['name'] for i in range(mst_subg.vcount())]) dist_from_start_dict = dict(zip(nodes_in_tree, dist_from_start[0])) # Pairwise shortest path of origninal knn graph # M = adata.obsp['emb_dists'].toarray() # M = adata.obsp['knn_dists'].toarray() from umap.umap_ import fuzzy_simplicial_set _, _, _, knn_dists = fuzzy_simplicial_set( adata.obsm['X_featmap'] , n_neighbors=60, random_state=42, metric="euclidean", metric_kwds={}, # knn_indices, # knn_dists, verbose=True, return_dists=True) M = knn_dists.toarray() graph = csr_matrix(M) dist_matrix, predecessors = shortest_path( csgraph=graph, directed=False, indices=tree_points,return_predecessors=True) # For each node, find its nearest node in the tree dist_matrix = dist_matrix.T nearest_in_tree = np.argmin(dist_matrix, axis=1) nearest_in_tree_dist = np.min(dist_matrix, axis=1) data_dist = {'node_in_tree': tree_points[nearest_in_tree], 'dist': nearest_in_tree_dist} nearest_node_in_tree = pd.DataFrame.from_dict(data_dist,orient='columns') # For each node, compute the dist to start by first identifying its nearest node in the tree, then to start point emb_pseudotime = np.array([nearest_node_in_tree.at[i,'dist'] + dist_from_start_dict[nearest_node_in_tree.at[i,'node_in_tree']] for i in range(dist_matrix.shape[0]) ]) emb_pseudotime[np.where(emb_pseudotime == np.inf)[0]] = 20 adata.obs['ridge_pseudotime'] = expit(scale(emb_pseudotime)) # adata.obs['emb_pseudotime'] = emb_pseudotime # root_idx = mst_s1ubg.vs[start]['name'] # adata.uns["iroot"] = root_idx # sc.tl.dpt(adata) # adata.obs['dpt_pseudotime'] = expit(scale(adata.obs['dpt_pseudotime'])+1) # expit(scale(emb_pseudotime)) sc.pl.embedding(adata, plot, legend_loc='on data', color=['ridge_pseudotime',]) # sc.pl.embedding(adata, 'umap', legend_loc='on data', color=['emb_pseudotime',]) return adata.obs['ridge_pseudotime']
[docs] def bifurcation_plot(adata, core_states, transition_states_1, transition_states_2): """ Plot the bifurcation states in the embedding space. Parameters ---------- adata : AnnData Annotated data matrix. core_states : list The list of core states. transition_states_1 : list The list of transition states 1. transition_states_2 : list The list of transition states 2. """ core_states_map = {str(i):'core' for i in core_states} transition_states_map_1 = {str(i):'transition_1' for i in transition_states_1} transition_states_map_2 = {str(i):'transition_2' for i in transition_states_2} # merge the core states and transition states core_trans_states_bifur = {**core_states_map, **transition_states_map_1, **transition_states_map_2} adata.obs['core_trans_states_bifur'] = adata.obs['leiden_v'].map(core_trans_states_bifur) adata.obs['core_trans_states_bifur'] = adata.obs['core_trans_states_bifur'].astype('category') adata.obs['core_trans_states_bifur'] = adata.obs['core_trans_states_bifur'].cat.set_categories([ 'transition_1','core', 'transition_2'], ordered=True) sc.pl.embedding(adata, 'featmap_v',legend_fontsize=10, s=10, color=['core_trans_states_bifur'])
[docs] def bifurcation_plot_1(adata, core_states, transition_states_1, transition_states_2, transition_states_3): """ Plot the bifurcation states in the embedding space. Parameters ---------- adata : AnnData Annotated data matrix. core_states : list The list of core states. transition_states_1 : list The list of transition states 1. transition_states_2 : list The list of transition states 2. """ core_states_map = {str(i):'core' for i in core_states} transition_states_map_1 = {str(i):'transition_1' for i in transition_states_1} transition_states_map_2 = {str(i):'transition_2' for i in transition_states_2} transition_states_map_3 = {str(i):'transition_3' for i in transition_states_3} # merge the core states and transition states core_trans_states_bifur = {**core_states_map, **transition_states_map_1, **transition_states_map_2, **transition_states_map_3} adata.obs['core_trans_states_bifur'] = adata.obs['leiden_v'].map(core_trans_states_bifur) adata.obs['core_trans_states_bifur'] = adata.obs['core_trans_states_bifur'].astype('category') adata.obs['core_trans_states_bifur'] = adata.obs['core_trans_states_bifur'].cat.set_categories([ 'transition_1','core', 'transition_2', 'transition_3'], ordered=True) sc.pl.embedding(adata, 'featmap_v',legend_fontsize=10, s=10, color=['core_trans_states_bifur'])
[docs] def path_plot(adata, core_states, transition_states): """ Plot the path states in the embedding space. Parameters ---------- adata : AnnData Annotated data matrix. core_states : list The list of core states. transition_states : list The list of transition states. """ core_states_map = {str(i):'core' for i in core_states} transition_states_map = {str(i):'transition' for i in transition_states} # merge the core states and transition states path_state = {**core_states_map, **transition_states_map} adata.obs['path_states'] = adata.obs['leiden_v'].map(path_state) adata.obs['path_states'] = adata.obs['path_states'].astype('category') adata.obs['path_states'] = adata.obs['path_states'].cat.set_categories(['transition', 'core'], ordered=True) sc.pl.embedding(adata, 'featmap_v',legend_fontsize=10, s=30, color=['path_states'], palette='tab10')
############################################ # Density vs pseudotime ############################################ #%%
[docs] def plot_density_pseudotime(filtered_data, pseudotime='feat_pseudotime', clusters='clusters', density='density'): """ Plot the density vs pseudotime. Parameters ---------- filtered_data : pd.DataFrame The dataframe including the data. pseudotime : str The pseudotime in the data. clusters : str The clusters in the data. density : str The density in the data. """ from pygam import LinearGAM import seaborn as sns import matplotlib.pyplot as plt X = filtered_data[pseudotime].values y = filtered_data[density].values gam = LinearGAM(n_splines=20).fit(X, y) fig, ax = plt.subplots() XX = gam.generate_X_grid(term=0, n=100) for response in gam.sample(X, y, quantity='y', n_draws=50, sample_at_X=XX): plt.scatter(XX, response, alpha=.01, color='k') plt.plot(XX, gam.predict(XX), 'r--') plt.plot(XX, gam.prediction_intervals(XX, width=.95), color='b', ls='--') ax.plot(XX, gam.predict(XX), 'b--', label='_nolegend_') sns.scatterplot(x=pseudotime, y=density, data=filtered_data, hue=clusters, ax=ax) ax.set_xlabel(pseudotime) # ax.set_ylabel('') ax.set_ylabel(density) ax.legend().remove() ax.set_xticks([]) ax.set_yticks([]) # ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left') # plt.savefig(f'./figures/pancreas/density_pseudotime_{pseudotime}_beta.png', bbox_inches='tight') plt.show()
[docs] def compute_betweenness_centrality(adata, emb_featuremap, quantile_trans=0.8, quantile_core=0.2): """ Compute the betweenness centrality of the cell network. Parameters ---------- adata : AnnData Annotated data matrix. emb_featuremap : object The featuremap object. top_quantile : float The top quantile of the betweenness centrality. """ graph = emb_featuremap._featuremap_kwds['graph_v'] # graph =emb_featuremap.graph_ # create a graph from the adjacency matrix G = nx.from_numpy_array(graph) # graph size of G n = G.number_of_nodes() # compute the betweenness centrality betweenness = nx.betweenness_centrality(G, k=int(n*0.2)) # adata.obs['betweenness'] = [betweenness[i] for i in range(len(betweenness))] # extract the degree of each node degree = G.degree() adata.obs['degree'] = [degree[i] for i in range(len(degree))] # normalize the betweenness centrality by node degree betweenness_centrality = [betweenness[i]/degree[i] for i in range(len(betweenness))] # set nan values to 0 # betweenness_centrality = np.nan_to_num(betweenness_centrality) # log normalization betweenness_centrality = np.array(betweenness_centrality) * 10e4 betweenness_centrality = np.log(betweenness_centrality + 1) # normalize the betweenness centrality by min-max scaling betweenness_centrality = (betweenness_centrality - np.min(betweenness_centrality)) / (np.max(betweenness_centrality) - np.min(betweenness_centrality)) adata.obs['betweenness_centrality'] = betweenness_centrality betweenness_centrality_filter = betweenness_centrality.copy() # get the 0.8 quantile of betweenness_normalized quantile = np.quantile(betweenness_centrality_filter, quantile_trans) # set betweenness_normalized to Nan for the nodes with betweenness_normalized less than quantile betweenness_centrality_filter[betweenness_centrality_filter<quantile] = np.nan adata.obs['betweenness_centrality_transition'] = betweenness_centrality_filter betweenness_centrality_filter = betweenness_centrality.copy() # get the 0.8 quantile of betweenness_normalized quantile = np.quantile(betweenness_centrality_filter, quantile_core) # set betweenness_normalized to Nan for the nodes with betweenness_normalized less than quantile betweenness_centrality_filter[betweenness_centrality_filter>quantile] = np.nan adata.obs['betweenness_centrality_core'] = betweenness_centrality_filter sc.pl.embedding(adata, basis='featmap_v', color='betweenness_centrality', cmap='viridis', size=10) sc.pl.embedding(adata, basis='featmap_v', color='betweenness_centrality_core', cmap='viridis', size=10) sc.pl.embedding(adata, basis='featmap_v', color='betweenness_centrality_transition', cmap='viridis', size=10)
[docs] def compute_curvature(adata, emb_featuremap, quantile_core=0.2, quantile_trans=0.8): """ Compute the curvature of the embedding space. k = ||d T / d s|| Parameters ---------- adata : AnnData Annotated data matrix. emb_featuremap : object The featuremap object. top_quantile : float The top quantile of the curvature. """ # data = adata.X.copy() data = adata.obsm['variation_pc'] knn_indices = emb_featuremap._featuremap_kwds['_knn_indices'] # each data point, get the average distance to its neighbors from sklearn.metrics import pairwise_distances k = knn_indices.shape[1] delta_dist = np.zeros(data.shape[0]) for i in range(data.shape[0]): indices = knn_indices[i] delta_dist[i] = np.mean(pairwise_distances(data[i].reshape(1, -1), data[indices], metric='euclidean')) # curvature = 1 / delta_dist curvature = delta_dist # normalize the curvature by min-max scaling curvature = (curvature - np.min(curvature)) / (np.max(curvature) - np.min(curvature)) adata.obs['curvature'] = curvature curvature_filter = curvature.copy() # get the 0.8 quantile of curvature quantile_core_val = np.quantile(curvature_filter, quantile_core) # set curvature to Nan for the nodes with curvature less than quantile_08 curvature_filter[curvature_filter>quantile_core_val] = np.nan adata.obs['curvature_core'] = curvature_filter curvature_filter = curvature.copy() # get the 0.8 quantile of curvature quantile_trans_val = np.quantile(curvature_filter, quantile_trans) # set curvature to Nan for the nodes with curvature less than quantile_08 curvature_filter[curvature_filter<quantile_trans_val] = np.nan adata.obs['curvature_transition'] = curvature_filter # print(adata.obs['curvature']) # print(f'curvature_filter: {curvature_filter}') sc.pl.embedding(adata, basis='featmap_v', color='curvature', cmap='viridis', size=10) sc.pl.embedding(adata, basis='featmap_v', color='curvature_core', cmap='viridis', size=10) sc.pl.embedding(adata, basis='featmap_v', color='curvature_transition', cmap='viridis', size=10)
[docs] def plot_core_transition_states(adata): """ Given the core and transition states defined by the density, curvature and betweenness centrality, union the core and transition states and plot the core and transition states in the embedding space. Parameters ---------- adata : AnnData Annotated data matrix. """ density_core = adata.obs['density_core'].values.astype(float) curvature_core = adata.obs['curvature_core'].values betweenness_centrality_core = adata.obs['betweenness_centrality_core'].values density_transition = adata.obs['density_transition'].values curvature_transition = adata.obs['curvature_transition'].values betweenness_centrality_transition = adata.obs['betweenness_centrality_transition'].values # set above to binary of not nan density_core = ~np.isnan(density_core) curvature_core = ~np.isnan(curvature_core) betweenness_centrality_core = ~np.isnan(betweenness_centrality_core) density_transition = ~np.isnan(density_transition) curvature_transition = ~np.isnan(curvature_transition) betweenness_centrality_transition = ~np.isnan(betweenness_centrality_transition) # core states core_states = density_core | curvature_core | betweenness_centrality_core # transition states transition_states = density_transition | curvature_transition | betweenness_centrality_transition # intersection of core and transition states core_transition_overlap = core_states & transition_states # Exclude the overlap core_states[core_transition_overlap] = False transition_states[core_transition_overlap] = False core_states = core_states.astype(int).astype(str) transition_states = transition_states.astype(int).astype(str) # Replace '0' with NaN core_states_nan = [np.nan if i == '0' else i for i in core_states] transition_states_nan = [np.nan if i == '0' else i for i in transition_states] # Extract the first two colours from the tab10 colormap tab10_colors = [plt.cm.tab10(1), plt.cm.tab10(0)] # Convert to a format suitable for seaborn or other libraries color_palette = [tuple(color[:3]) for color in tab10_colors] # Drop alpha if necessary adata.obs['core_states'] = core_states_nan sc.pl.embedding(adata, basis='X_featmap_v', color='core_states', palette=color_palette, s=10, frameon=False) # Extract the first two colours from the tab10 colormap tab10_colors = [plt.cm.tab10(0), plt.cm.tab10(1)] # Convert to a format suitable for seaborn or other libraries color_palette = [tuple(color[:3]) for color in tab10_colors] # Drop alpha if necessary adata.obs['transition_states'] = transition_states_nan sc.pl.embedding(adata, basis='X_featmap_v', color='transition_states', palette=color_palette, s=10, frameon=False)
[docs] def compute_cluster_state_labels(adata): """ Compute the cluster state labels based on the percentage of core_states and transition_states for each cluster. Parameters ---------- adata : AnnData Annotated data matrix. """ leiden_v_clusters = adata.obs['leiden_v'].cat.categories # For each cluster, compute the percentage of core_states and transition_states, respectively. # The cluster is labeled as core_states if the percentage of core_states is larger than 0.5, otherwise, it is labeled as transition_states. cluster_state_label = [] cluster_state_dict = {} # Save the percentage of core_states and transition_states for each cluster core_states_percentage_dict = {} transition_states_percentage_dict = {} for cluster in leiden_v_clusters: cluster_index = np.where(adata.obs['leiden_v'] == cluster)[0] core_states_percentage = np.sum(adata.obs['core_states'].values[cluster_index] == '1') / len(cluster_index) transition_states_percentage = np.sum(adata.obs['transition_states'].values[cluster_index] == '1') / len(cluster_index) core_states_percentage_dict[cluster] = core_states_percentage transition_states_percentage_dict[cluster] = transition_states_percentage if core_states_percentage > transition_states_percentage: cluster_state_label.append('core_states') cluster_state_dict[cluster] = 'core_states' else: cluster_state_label.append('transition_states') cluster_state_dict[cluster] = 'transition_states' # assign the cluster label to the original adata adata.obs['cluster_state_label'] = adata.obs['leiden_v'].map(dict(zip(leiden_v_clusters, cluster_state_label))) adata.obs['cluster_state_label'] = adata.obs['cluster_state_label'].astype('category') adata.obs['cluster_state_label'] = adata.obs['cluster_state_label'].cat.reorder_categories(['transition_states', 'core_states']) sc.pl.embedding(adata, basis='FeatureMAP_v', color='cluster_state_label', cmap='viridis', ) adata.uns['cluster_state_dict'] = cluster_state_dict # adata_var.obs['cluster_state_label'] = adata.obs['cluster_state_label'] # plot the percentage of core_states and transition_states for each cluster import pandas as pd core_states_percentage_df = pd.DataFrame.from_dict(core_states_percentage_dict, orient='index', columns=['core_states_percentage']) transition_states_percentage_df = pd.DataFrame.from_dict(transition_states_percentage_dict, orient='index', columns=['transition_states_percentage']) percentage_df = pd.concat([transition_states_percentage_df, core_states_percentage_df], axis=1) percentage_df = percentage_df.sort_values(by='transition_states_percentage', ascending=False) percentage_df.plot(kind='bar', stacked=True, figsize=(10, 8)) plt.xlabel('Clusters by Leiden') plt.ylabel('Percentage') plt.legend().set_visible(False) plt.show()
# return adata import networkx as nx import leidenalg as la import numpy as np import igraph as ig
[docs] def weighted_clustering_coefficient(G, node): neighbors = list(G.neighbors(node)) k_i = len(neighbors) # If the node has fewer than 2 neighbors, its clustering coefficient is 0 if k_i < 2: return 0.0 # Sum of the weights of edges connected to the node (s_i) s_i = sum([G[node][neighbor].get('weight', 1.0) for neighbor in neighbors]) # Initialize numerator for the clustering coefficient numerator = 0.0 # Iterate through pairs of neighbors for j in neighbors: for k in neighbors: if j != k and G.has_edge(j, k): # Edge weights between node and its neighbors j and k w_ij = G[node][j].get('weight', 1.0) w_ik = G[node][k].get('weight', 1.0) # Weight of the edge between j and k a_jk = G[j][k].get('weight', 1.0) if G.has_edge(j, k) else 0.0 # Apply the formula from Barrat et al. numerator += (w_ij + w_ik) / 2 * a_jk # Compute the denominator denominator = s_i * (k_i - 1) # If the denominator is 0, return 0 if denominator == 0: return 0.0 # Return the clustering coefficient return numerator / denominator
[docs] def clustering_coefficient(G, node): neighbors = list(G.neighbors(node)) k_i = len(neighbors) # If the node has fewer than 2 neighbors, its clustering coefficient is 0 if k_i < 2: return 0.0 # Initialize numerator for the clustering coefficient numerator = 0 # Iterate through pairs of neighbors for j in neighbors: for k in neighbors: if j != k and G.has_edge(j, k): numerator += 1 # Since each edge is counted twice, divide by 2 numerator /= 2 # Compute the denominator denominator = k_i * (k_i - 1) / 2 # If the denominator is 0, return 0 if denominator == 0: return 0.0 # Return the clustering coefficient return numerator / denominator
# Function to compute the weighted clustering coefficient for each cluster
[docs] def clustering_coefficient_by_cluster(G, clusters): cluster_coefficients = {} for cluster_id in np.unique(clusters): nodes_in_cluster = [node for node, cluster in enumerate(clusters) if cluster == cluster_id] if len(nodes_in_cluster) == 0: continue # Randomly sample 35% nodes from the cluster for efficiency sample_size = int(0.35 * len(nodes_in_cluster)) sample_nodes = np.random.choice(nodes_in_cluster, size=sample_size, replace=False) # Calculate the weighted clustering coefficient for each node in the cluster cluster_coeffs = [ clustering_coefficient(G, node) for node in sample_nodes ] # Average clustering coefficient for the cluster cluster_coefficients[cluster_id] = np.mean(cluster_coeffs) return cluster_coefficients
[docs] def silhouette_score_one_point(distances, labels, point_index): """ Compute the silhouette score for one node using a shortest path distance matrix. Parameters: - distances: 2D list or ndarray of shape (n_nodes, n_nodes), shortest path distances between nodes - labels: list or ndarray of shape (n_nodes,) - point_index: int, index of the node for which to compute the silhouette score Returns: - silhouette score for the given node """ point_label = labels[point_index] # Compute a(i): Mean intra-cluster distance same_cluster_nodes = [idx for idx, label in enumerate(labels) if label == point_label] n_same_cluster = len(same_cluster_nodes) if n_same_cluster <= 1: return 0 # If the node is the only one in its cluster, s(i) = 0 intra_distances = [distances[point_index][other_node] for other_node in same_cluster_nodes if other_node != point_index] a_i = sum(intra_distances) / (n_same_cluster - 1) # Compute b(i): Minimum mean distance to any other cluster other_cluster_labels = set(labels) - {point_label} b_i = float('inf') for other_label in other_cluster_labels: other_cluster_nodes = [idx for idx, label in enumerate(labels) if label == other_label] inter_distances = [distances[point_index][other_node] for other_node in other_cluster_nodes] mean_inter_distance = sum(inter_distances) / len(other_cluster_nodes) b_i = min(b_i, mean_inter_distance) # Compute silhouette score for the given node s_i = (b_i - a_i) / max(a_i, b_i) return s_i
import networkx as nx import pandas as pd import seaborn as sns import matplotlib.pyplot as plt from scipy.stats import f_oneway
[docs] def compute_and_plot_clustering_coefficients(adata, emb_featuremap, emb_featuremap_v, phate_graph_nx, tsne_graph, densmap_graph): G_expr = nx.from_numpy_array(emb_featuremap.graph_) G_var = nx.from_numpy_array(emb_featuremap_v._featuremap_kwds['graph_v']) G_phate = phate_graph_nx G_tsne = nx.from_numpy_array(tsne_graph) G_densmap = nx.from_numpy_array(densmap_graph) clusters = adata.obs['leiden_v'].values.tolist() sc.pl.embedding(adata, basis='X_featmap_v', color='leiden_v', legend_loc='on data') print('Computing the weighted clustering coefficients...') cluster_coefficients_var = clustering_coefficient_by_cluster(G_var, clusters) cluster_coefficients_expr = clustering_coefficient_by_cluster(G_expr, clusters) cluster_coefficients_phate = clustering_coefficient_by_cluster(G_phate, clusters) cluster_coefficients_tsne = clustering_coefficient_by_cluster(G_tsne, clusters) cluster_coefficients_densmap = clustering_coefficient_by_cluster(G_densmap, clusters) df_var = pd.DataFrame( {"Cluster": list(cluster_coefficients_var.keys()), "Coefficient": list(cluster_coefficients_var.values())} ) df_var['type'] = 'variation_graph' df_expr = pd.DataFrame( {"Cluster": list(cluster_coefficients_expr.keys()), "Coefficient": list(cluster_coefficients_expr.values())} ) df_expr['type'] = 'expression_graph' df_phate = pd.DataFrame( {"Cluster": list(cluster_coefficients_phate.keys()), "Coefficient": list(cluster_coefficients_phate.values())} ) df_phate['type'] = 'phate_graph' df_tsne = pd.DataFrame( {"Cluster": list(cluster_coefficients_tsne.keys()), "Coefficient": list(cluster_coefficients_tsne.values())} ) df_tsne['type'] = 'tsne_graph' df_densmap = pd.DataFrame( {"Cluster": list(cluster_coefficients_densmap.keys()), "Coefficient": list(cluster_coefficients_densmap.values())} ) df_densmap['type'] = 'densmap_graph' df = pd.concat([df_var, df_expr, df_phate, df_tsne, df_densmap], axis=0) plt.figure(figsize=(4, 3), dpi=100) plt.rcParams.update({'font.size': 12}) fontsize = 12 cluster_state_dict = adata.uns['cluster_state_dict'] transition_states = [cluster for cluster, state in cluster_state_dict.items() if state == 'transition_states'] core_states = [cluster for cluster, state in cluster_state_dict.items() if state == 'core_states'] coeff_core_var = [] coeff_core_expr = [] coeff_core_phate = [] coeff_core_tsne = [] coeff_core_densmap = [] for cluster in core_states: coeff_core_var.append(cluster_coefficients_var[cluster]) coeff_core_expr.append(cluster_coefficients_expr[cluster]) coeff_core_phate.append(cluster_coefficients_phate[cluster]) coeff_core_tsne.append(cluster_coefficients_tsne[cluster]) coeff_core_densmap.append(cluster_coefficients_densmap[cluster]) f_stat, p_val = f_oneway(coeff_core_tsne, coeff_core_expr, coeff_core_densmap, coeff_core_phate, coeff_core_var) print(f'ANOVA: f_stat: {f_stat}, p_val: {p_val}') plt.figure(figsize=(4, 2)) box = sns.boxplot(data=[coeff_core_tsne, coeff_core_expr, coeff_core_densmap, coeff_core_phate, coeff_core_var], color=plt.get_cmap('tab10')(1), fill=False, width=0.5, showfliers=False) hatch_patterns = ['/', '\\', 'x', '/', '/', '/'] for patch, hatch in zip(box.patches, hatch_patterns): patch.set_hatch(hatch) sns.stripplot(data=[coeff_core_tsne, coeff_core_expr, coeff_core_densmap, coeff_core_phate, coeff_core_var], color=plt.get_cmap('tab10')(1), jitter=True, dodge=True) plt.xticks(ticks=[0, 1, 2, 3, 4], labels=['t-SNE', 'UMAP', 'densMAP', 'PHATE', 'FeatureMAP'], fontsize=fontsize, rotation=15) plt.title("Clustering coefficients in core states", fontsize=fontsize) plt.ylabel("Clustering coefficient", fontsize=fontsize) plt.text(0.1, 0.9, f"ANOVA F-stat: {f_stat:.2e}", ha='left', va='center', transform=plt.gca().transAxes, fontsize=fontsize) plt.text(0.1, 0.8, f"ANOVA p-value: {p_val:.2e}", ha='left', va='center', transform=plt.gca().transAxes, fontsize=fontsize) plt.show() coeff_trans_var = [] coeff_trans_expr = [] coeff_trans_phate = [] coeff_trans_tsne = [] coeff_trans_densemap = [] for cluster in transition_states: coeff_trans_expr.append(cluster_coefficients_expr[cluster]) coeff_trans_phate.append(cluster_coefficients_phate[cluster]) coeff_trans_var.append(cluster_coefficients_var[cluster]) coeff_trans_tsne.append(cluster_coefficients_tsne[cluster]) coeff_trans_densemap.append(cluster_coefficients_densmap[cluster]) f_stat, p_val = f_oneway(coeff_trans_expr, coeff_trans_phate, coeff_trans_var, coeff_trans_tsne, coeff_trans_densemap) print(f'ANOVA: f_stat: {f_stat}, p_val: {p_val}') plt.figure(figsize=(4, 2)) box = sns.boxplot(data=[coeff_trans_tsne, coeff_trans_expr, coeff_trans_densemap, coeff_trans_phate, coeff_trans_var], color=plt.get_cmap('tab10')(0), fill=False, width=0.5, showfliers=False) hatch_patterns = ['/', '\\', 'x', '/', '\\', 'x'] for patch, hatch in zip(box.patches, hatch_patterns): patch.set_hatch(hatch) sns.stripplot(data=[coeff_trans_tsne, coeff_trans_expr, coeff_trans_densemap, coeff_trans_phate, coeff_trans_var], color=plt.get_cmap('tab10')(0), jitter=True, dodge=True) plt.xticks(ticks=[0, 1, 2, 3, 4], labels=['t-SNE', 'UMAP', 'densMAP', 'PHATE', 'FeatureMAP'], fontsize=fontsize, rotation=15) plt.title("Clustering coefficients in transition states", fontsize=fontsize) plt.ylabel("Clustering coefficient", fontsize=fontsize) plt.text(0.1, 0.95, f"ANOVA F-stat: {f_stat:.2e}", ha='left', va='center', transform=plt.gca().transAxes, fontsize=fontsize) plt.text(0.1, 0.85, f"ANOVA p-value: {p_val:.2e}", ha='left', va='center', transform=plt.gca().transAxes, fontsize=fontsize) plt.show()
import networkx as nx from sklearn.metrics import pairwise_distances from scipy.stats import ttest_ind, f_oneway import seaborn as sns import matplotlib.pyplot as plt
[docs] def compute_and_plot_silhouette_scores(adata, emb_featuremap, emb_featuremap_v, phate_graph_nx, tsne_graph, densmap_graph): # Plot embedding sc.pl.embedding(adata, basis='X_featmap_v', color='leiden_v', cmap='Blues_r', s=10, legend_loc='on data', title='Leiden_v clustering') # Graph distance matrices G_expr = nx.from_numpy_array(emb_featuremap.graph_) G_var = nx.from_numpy_array(emb_featuremap_v._featuremap_kwds['graph_v']) G_phate = phate_graph_nx G_tsne = nx.from_numpy_array(tsne_graph) G_densmap = nx.from_numpy_array(densmap_graph) # Get the weighted adjacency matrix adjacency_expr = nx.to_numpy_array(G_expr) adjacency_var = nx.to_numpy_array(G_var) adjacency_phate = nx.to_numpy_array(G_phate) adjacency_tsne = nx.to_numpy_array(G_tsne) adjacency_densmap = nx.to_numpy_array(G_densmap) # Compute the distance matrices dist_mat_expr = pairwise_distances(adjacency_expr, metric='euclidean') dist_mat_var = pairwise_distances(adjacency_var, metric='euclidean') dist_mat_phate = pairwise_distances(adjacency_phate, metric='euclidean') dist_mat_tsne = pairwise_distances(adjacency_tsne, metric='euclidean') dist_mat_densmap = pairwise_distances(adjacency_densmap, metric='euclidean') labels = adata.obs['leiden_v'].values.tolist() labels = np.array(labels) clusters = np.unique(labels) ss_clusters_expr = {} ss_clusters_var = {} ss_clusters_phate = {} ss_clusters_tsne = {} ss_clusters_densmap = {} for cluster in clusters: print(f'Cluster {cluster}') cluster_indices = np.where(labels == cluster)[0] ss_cluster_expr = [] ss_cluster_var = [] ss_cluster_phate = [] ss_cluster_tsne = [] ss_cluster_densmap = [] for idx in cluster_indices: ss_cluster_expr.append(silhouette_score_one_point(dist_mat_expr, labels, idx)) ss_cluster_var.append(silhouette_score_one_point(dist_mat_var, labels, idx)) ss_cluster_phate.append(silhouette_score_one_point(dist_mat_phate, labels, idx)) ss_cluster_tsne.append(silhouette_score_one_point(dist_mat_tsne, labels, idx)) ss_cluster_densmap.append(silhouette_score_one_point(dist_mat_densmap, labels, idx)) ss_clusters_expr[cluster] = sum(ss_cluster_expr) / len(ss_cluster_expr) ss_clusters_var[cluster] = sum(ss_cluster_var) / len(ss_cluster_var) ss_clusters_phate[cluster] = sum(ss_cluster_phate) / len(ss_cluster_phate) ss_clusters_tsne[cluster] = sum(ss_cluster_tsne) / len(ss_cluster_tsne) ss_clusters_densmap[cluster] = sum(ss_cluster_densmap) / len(ss_cluster_densmap) ss_all = [] # Silhouette score in UMAP graph cluster_state_dict = adata.uns['cluster_state_dict'] transition_states = [cluster for cluster, state in cluster_state_dict.items() if state == 'transition_states'] core_states = [cluster for cluster, state in cluster_state_dict.items() if state == 'core_states'] ss_tran = [] ss_core = [] for cluster in transition_states: ss_tran.append(ss_clusters_expr[cluster]) for cluster in core_states: ss_core.append(ss_clusters_expr[cluster]) ss_all.append(ss_tran) ss_all.append(ss_core) # Silhouette score in phate graph ss_tran = [] ss_core = [] for cluster in transition_states: ss_tran.append(ss_clusters_phate[cluster]) for cluster in core_states: ss_core.append(ss_clusters_phate[cluster]) ss_all.append(ss_tran) ss_all.append(ss_core) # T test between ss_tran and ss_core t_stat, p_val = ttest_ind(ss_tran, ss_core) print(f'T-test: t_stat: {t_stat}, p_val: {p_val}') # Silhouette score in variation graph ss_tran = [] ss_core = [] for cluster in transition_states: ss_tran.append(ss_clusters_var[cluster]) for cluster in core_states: ss_core.append(ss_clusters_var[cluster]) ss_all.append(ss_tran) ss_all.append(ss_core) # Silhouette score in tsne graph ss_tran = [] ss_core = [] for cluster in transition_states: ss_tran.append(ss_clusters_tsne[cluster]) for cluster in core_states: ss_core.append(ss_clusters_tsne[cluster]) ss_all.append(ss_tran) ss_all.append(ss_core) # Silhouette score in densmap graph ss_tran = [] ss_core = [] for cluster in transition_states: ss_tran.append(ss_clusters_densmap[cluster]) for cluster in core_states: ss_core.append(ss_clusters_densmap[cluster]) ss_all.append(ss_tran) ss_all.append(ss_core) # Silhouette score in core states ss_core_var = [] ss_core_expr = [] ss_core_phate = [] ss_core_tsne = [] ss_core_densmap = [] for cluster in core_states: ss_core_expr.append(ss_clusters_expr[cluster]) ss_core_phate.append(ss_clusters_phate[cluster]) ss_core_var.append(ss_clusters_var[cluster]) ss_core_tsne.append(ss_clusters_tsne[cluster]) ss_core_densmap.append(ss_clusters_densmap[cluster]) # Anova test between ss_core_tsne, ss_core_expr, ss_core_densmap, ss_core_phate, ss_core_var f_stat, p_val = f_oneway(ss_core_tsne, ss_core_expr, ss_core_densmap, ss_core_phate, ss_core_var) print(f'ANOVA: f_stat: {f_stat}, p_val: {p_val}') # Plot the result plt.figure(figsize=(10, 6)) box = sns.boxplot(data=[ss_core_tsne, ss_core_expr, ss_core_densmap, ss_core_phate, ss_core_var], color=plt.get_cmap('tab10')(1), fill=False, width=0.5, showfliers=False) hatch_patterns = ['/', '\\', 'x'] for patch, hatch in zip(box.patches, hatch_patterns): patch.set_hatch(hatch) sns.stripplot(data=[ss_core_tsne, ss_core_expr, ss_core_densmap, ss_core_phate, ss_core_var], color=plt.get_cmap('tab10')(1), jitter=True, dodge=True) plt.xticks(ticks=[0, 1, 2], labels=['UMAP graph', 'PHATE graph', 'Variation graph']) plt.title("Silhouette score in core states") plt.ylabel("Silhouette score") plt.text(0.2, 0.8, f"ANOVA p-value: {p_val:.2e}", ha='center', va='center', transform=plt.gca().transAxes) plt.show() # Silhouette score in transition states ss_tran_expr = [] ss_tran_phate = [] ss_tran_var = [] ss_tran_tsne = [] ss_tran_densmap = [] for cluster in transition_states: ss_tran_expr.append(ss_clusters_expr[cluster]) ss_tran_phate.append(ss_clusters_phate[cluster]) ss_tran_var.append(ss_clusters_var[cluster]) ss_tran_tsne.append(ss_clusters_tsne[cluster]) ss_tran_densmap.append(ss_clusters_densmap[cluster]) # Anova test between ss_tran_tsne, ss_tran_expr, ss_tran_densmap, ss_tran_phate, ss_tran_var f_stat, p_val = f_oneway(ss_tran_tsne, ss_tran_expr, ss_tran_densmap, ss_tran_phate, ss_tran_var) print(f'ANOVA: f_stat: {f_stat}, p_val: {p_val}') # Plot the result plt.figure(figsize=(10, 6)) box = sns.boxplot(data=[ss_tran_tsne, ss_tran_expr, ss_tran_densmap, ss_tran_phate, ss_tran_var], color=plt.get_cmap('tab10')(0), fill=False, width=0.5, showfliers=False) for patch, hatch in zip(box.patches, hatch_patterns): patch.set_hatch(hatch) sns.stripplot(data=[ss_tran_tsne, ss_tran_expr, ss_tran_densmap, ss_tran_phate, ss_tran_var], color=plt.get_cmap('tab10')(0), jitter=True, dodge=True) plt.xticks(ticks=[0, 1, 2], labels=['UMAP graph', 'PHATE graph', 'Variation graph']) plt.title("Silhouette score in transition states") plt.ylabel("Silhouette score") plt.text(0.2, 0.8, f"ANOVA p-value: {p_val:.2e}", ha='center', va='center', transform=plt.gca().transAxes) plt.show()