Source code for featuremap.features

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Nov  1 13:52:14 2023

"""


import anndata as ad
from anndata import AnnData
# from quasildr.structdr import Scms
import numpy as np
import time
import matplotlib.pyplot as plt
import scanpy as sc
import pandas as pd

import seaborn as sns

from umap.umap_ import nearest_neighbors

from scipy.sparse.csgraph import shortest_path
from scipy.sparse import csr_matrix

from sklearn.neighbors import NearestNeighbors
from scipy.stats import norm as normal

from featuremap.core_transition_states import kernel_density_estimate

from featuremap.featuremap_ import _preprocess_data


# Create adata object for plotting
[docs] def create_adata(X, emb_featuremap, obs=None, var=None): """ Create an AnnData object for plotting Parameters ---------- X : np.ndarray Data matrix. emb_featuremap : object Featuremap object. obs : pd.DataFrame var : pd.DataFrame Returns ------- adata : AnnData Annotated data matrix. """ adata = ad.AnnData(X=X) if obs is not None: adata.obs = obs if var is not None: adata.var = var adata.obsm['X_featmap'] = emb_featuremap.embedding_ # find the shape of every item in emb_featuremap._featuremap_kwds for key, item in emb_featuremap._featuremap_kwds.items(): # check if the item is an array if isinstance(item, np.ndarray): if item.shape[0] == X.shape[0]: adata.obsm[key] = item elif item.shape[0] == X.shape[1]: adata.varm[key] = item else: print(f'{key} is not added to adata') return adata
[docs] def pseudotime_mst(adata, random_state, start_point_index): """ Given a staring point, compute the pseudotime by distance to all other points on the MST Parameters ---------- adata : AnnData Annotated data matrix. random_state : int Random seed. start_point_index : int Starting point index. Returns ------- adata.obs['feat_pseudotime'] = pseudotime_mst pseudotime_mst : np.ndarray. Pseudotime based on MST. """ from featuremap import featuremap_ from umap.umap_ import fuzzy_simplicial_set import numpy as np # Generate 2 embeddings for computing MST, and average the pseudotime pseudotime_mst = np.zeros(adata.shape[0]) for i in range(3): # pairwise distances of knn graph # check if 'X_featmap_v_3d' exists in obsm # if 'X_featmap_v_3d' not in adata.obsm.keys(): if 'X_svd' not in adata.obsm.keys(): emb_svd, _ = featuremap_._preprocess_data(adata.X) adata.obsm['X_svd'] = emb_svd emb_svd = adata.obsm['X_svd'] rnd = np.random.RandomState(i) adata.obsm['X_featmap_v_3d'] = featuremap_.FeatureMAP(n_components=3, output_variation=True,random_state=rnd).fit_transform(emb_svd) _, _,_,dists = fuzzy_simplicial_set(adata.obsm['X_featmap_v_3d'], n_neighbors=60, random_state=random_state, metric='euclidean', metric_kwds={}, verbose=False, return_dists=True) # Minimum Spanning Tree on kgraph from scipy.sparse.csgraph import minimum_spanning_tree # Compute the minimum spanning tree # including the graphs with disconnected components mst = minimum_spanning_tree(dists) # mst = mst + mst.T # transform to coo matrix mst = mst.tocoo() # Given a staring point, compute the distance to all other points on the mst import numpy as np import networkx as nx # Create a weighted graph from the MST G = nx.from_scipy_sparse_array(mst, edge_attribute='weight') # Compute the weighted shortest path from the starting point to all other points on the MST shortest_paths = nx.shortest_path_length(G, source=start_point_index, weight='weight') # Convert the shortest paths to an array distances = np.zeros(len(shortest_paths)) for i, d in shortest_paths.items(): distances[i] = d pseudotime_mst += distances if adata.shape[0] > 5000: from sklearn.preprocessing import MinMaxScaler pseudotime_mst = MinMaxScaler().fit_transform(pseudotime_mst.reshape(-1,1)).reshape(-1) adata.obs['feat_pseudotime'] = pseudotime_mst return pseudotime_mst pseudotime_mst /= 10 from sklearn.preprocessing import MinMaxScaler pseudotime_mst = MinMaxScaler().fit_transform(pseudotime_mst.reshape(-1,1)).reshape(-1) adata.obs['feat_pseudotime'] = pseudotime_mst
[docs] def quiver_autoscale(X_emb, V_emb): """ Autoscale the arrow plot Parameters ---------- X_emb : np.ndarray Embedding matrix. V_emb : np.ndarray Variation embedding matrix. Returns ------- Q.scale / scale_factor * 5 Scale factor for quiver plot. """ import matplotlib.pyplot as plt scale_factor = np.abs(X_emb).max() # just so that it handles very large values fig, ax = plt.subplots() Q = ax.quiver( X_emb[:, 0] / scale_factor, X_emb[:, 1] / scale_factor, V_emb[:, 0], V_emb[:, 1], angles="xy", scale_units="xy", scale=None, ) Q._init() fig.clf() plt.close(fig) return Q.scale / scale_factor * 5
[docs] def plot_gauge( adata:AnnData, embedding='X_featmap', vkey='gauge_v1_emb', density=1, smooth=0.5, n_neighbors=None, min_mass=1, autoscale=True, ): """ Plot the gauge embedding to visualize the eigengene (or frame for feature loadings) Parameters ---------- adata : AnnData An annotated data matrix. embedding : string Embedding background for feature plot. The default is 'X_featmap'. vkey : string Variation key. The default is 'gauge_v1_emb'. density : float Grid desity for plot. The default is 1. smooth : float For kde estimation. The default is 0.5. n_neighbors : int Number of neighbours for kde. The default is None. min_mass : float Minumum denstiy to show the grid plot. The default is 1. autoscale : bool Scale the arrow plot. The default is True. """ # Set grid as the support X_emb=adata.obsm[embedding] # Exclude one leiden cluster; # X_emb=adata.obsm[embedding] V_emb=adata.obsm[vkey] # Normalize the V_emb V_emb = V_emb / np.linalg.norm(V_emb, axis=1)[:, np.newaxis] idx_valid = np.isfinite(X_emb.sum(1) + V_emb.sum(1)) X_emb = X_emb[idx_valid] V_emb = V_emb[idx_valid] # prepare grid n_obs, n_dim = X_emb.shape density = 1 if density is None else density smooth = 0.5 if smooth is None else smooth grs = [] for dim_i in range(n_dim): m, M = np.min(X_emb[:, dim_i]), np.max(X_emb[:, dim_i]) m = m - 0.01 * np.abs(M - m) M = M + 0.01 * np.abs(M - m) gr = np.linspace(m, M, int(50 * density)) grs.append(gr) meshes_tuple = np.meshgrid(*grs) X_grid = np.vstack([i.flat for i in meshes_tuple]).T # p1, _, _, _, C = s._kernel_density_estimate_anisotropic( # X_grid, rotational_matrix, r_emb) # p1, _, _, _ = s._kernel_density_estimate(X_grid) p1 = kernel_density_estimate(X_emb, X_grid) # estimate grid velocities if n_neighbors is None: n_neighbors = int(n_obs / 50) nn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=-1) nn.fit(X_emb) dists, neighs = nn.kneighbors(X_grid) scale = np.mean([(g[1] - g[0]) for g in grs]) * smooth weight = normal.pdf(x=dists, scale=scale) p_mass = weight.sum(1) # p_mass = p1 V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) # V_grid = V_emb[neighs] V_grid /= np.maximum(1, p_mass)[:, None] if min_mass is None: min_mass = 1 min_mass *= np.percentile(p_mass, 99) / 100 # min_mass = 0.01 X_grid, V_grid = X_grid[p_mass > min_mass], V_grid[p_mass > min_mass] if autoscale: V_grid /= quiver_autoscale(X_grid, V_grid) plt.contourf(meshes_tuple[0], meshes_tuple[1], p1.reshape(int(50 * density),int(50 * density)), levels=20, cmap='Blues') emb = adata.obsm[embedding] embedding_df = pd.DataFrame(adata.obsm[embedding], index=adata.obs_names, columns=['dim_0', 'dim_1']) # Calculate the range for the ticks x_min, x_max = embedding_df['dim_0'].min(), embedding_df['dim_0'].max() y_min, y_max = embedding_df['dim_1'].min(), embedding_df['dim_1'].max() x_mid = (x_min + x_max) / 2 y_mid = (y_min + y_max) / 2 # Determine the common range common_min = min(x_min, y_min) common_max = max(x_max, y_max) x_range = common_max - common_min y_range = common_max - common_min # color = np.array(adata.obs['leiden']).astype(int) # plt.scatter(emb[:,0],emb[:,1], s=1, c=color, cmap='Set2', alpha=0.1) plt.scatter(emb[:,0],emb[:,1], s=1, alpha=0.1) plt.title(f'{vkey}') # plt.xlim(x_mid-x_range/2, x_mid+x_range/2) # plt.ylim(y_mid-y_range/2, y_mid+y_range/2) plt.xticks([]) plt.yticks([]) qv = plt.quiver(X_grid[:,0], X_grid[:,1],V_grid[:,0],V_grid[:,1],color='black',alpha=1,scale=3) plt.show()
# legend = plt.legend([qv], [vkey]) # legend.set_bbox_to_anchor((1, 1)) # Set the shape of the legend # plt.show() # plt.clf()
[docs] def plot_gauge_both( adata:AnnData, embedding='X_featmap', vkey_1='gauge_v1_emb', vkey_2='gauge_v2_emb', density=1, smooth=0.5, n_neighbors=None, min_mass=1, autoscale=True, scale_quiver=1, ): """ Plot the gauge embedding to visualize the eigengene (or frame for feature loadings) Parameters ---------- adata : AnnData An annotated data matrix. embedding : string Embedding background for feature plot. The default is 'X_featmap'. density : float Grid desity for plot. The default is 1. smooth : float For kde estimation. The default is 0.5. n_neighbors : int Number of neighbours for kde. The default is None. min_mass : float Minumum denstiy to show the grid plot. The default is 1. autoscale : bool Scale the arrow plot. The default is True. """ print('Plotting gauge embedding') # Set grid as the support X_emb=adata.obsm[embedding] # Exclude one leiden cluster; # X_emb=adata.obsm[embedding] vkey=vkey_1 V_emb=adata.obsm[vkey] # Normalize the V_emb V_emb = V_emb / np.linalg.norm(V_emb, axis=1)[:, np.newaxis] idx_valid = np.isfinite(X_emb.sum(1) + V_emb.sum(1)) X_emb = X_emb[idx_valid] V_emb = V_emb[idx_valid] # prepare grid n_obs, n_dim = X_emb.shape density = 1 if density is None else density smooth = 0.5 if smooth is None else smooth grs = [] for dim_i in range(n_dim): m, M = np.min(X_emb[:, dim_i]), np.max(X_emb[:, dim_i]) m = m - 0.01 * np.abs(M - m) M = M + 0.01 * np.abs(M - m) gr = np.linspace(m, M, int(50 * density)) grs.append(gr) meshes_tuple = np.meshgrid(*grs) X_grid = np.vstack([i.flat for i in meshes_tuple]).T # p1, _, _, _, C = s._kernel_density_estimate_anisotropic( # X_grid, rotational_matrix, r_emb) # p1, _, _, _ = s._kernel_density_estimate(X_grid) p1 = kernel_density_estimate(X_emb, X_grid) # estimate grid velocities if n_neighbors is None: n_neighbors = int(n_obs / 50) nn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=-1) nn.fit(X_emb) dists, neighs = nn.kneighbors(X_grid) scale = np.mean([(g[1] - g[0]) for g in grs]) * smooth weight = normal.pdf(x=dists, scale=scale) p_mass = weight.sum(1) # p_mass = p1 V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) # V_grid = V_emb[neighs] V_grid /= np.maximum(1, p_mass)[:, None] if min_mass is None: min_mass = 1 min_mass *= np.percentile(p_mass, 99) / 100 # min_mass = 0.01 X_grid, V_grid = X_grid[p_mass > min_mass], V_grid[p_mass > min_mass] if autoscale: V_grid /= scale_quiver * quiver_autoscale(X_grid, V_grid) plt.contourf(meshes_tuple[0], meshes_tuple[1], p1.reshape(int(50 * density),int(50 * density)), levels=20, cmap='Blues') emb = adata.obsm[embedding] embedding_df = pd.DataFrame(adata.obsm[embedding], index=adata.obs_names, columns=['dim_0', 'dim_1']) # Calculate the range for the ticks x_min, x_max = embedding_df['dim_0'].min(), embedding_df['dim_0'].max() y_min, y_max = embedding_df['dim_1'].min(), embedding_df['dim_1'].max() x_mid = (x_min + x_max) / 2 y_mid = (y_min + y_max) / 2 # Determine the common range common_min = min(x_min, y_min) common_max = max(x_max, y_max) x_range = common_max - common_min y_range = common_max - common_min # color = np.array(adata.obs['leiden']).astype(int) # plt.scatter(emb[:,0],emb[:,1], s=1, c=color, cmap='Set2', alpha=0.1) # plt.scatter(emb[:,0],emb[:,1], s=1, alpha=0.1) plt.title(f'Local gauge plot') # plt.xlim(x_mid-x_range/2, x_mid+x_range/2) # plt.ylim(y_mid-y_range/2, y_mid+y_range/2) plt.xticks([]) plt.yticks([]) qv1 = plt.quiver(X_grid[:,0], X_grid[:,1],V_grid[:,0],V_grid[:,1],color='red',alpha=1,scale=3) # X_emb=adata.obsm[embedding] vkey=vkey_2 V_emb=adata.obsm[vkey] # Normalize the V_emb V_emb = V_emb / np.linalg.norm(V_emb, axis=1)[:, np.newaxis] idx_valid = np.isfinite(X_emb.sum(1) + V_emb.sum(1)) X_emb = X_emb[idx_valid] V_emb = V_emb[idx_valid] # prepare grid n_obs, n_dim = X_emb.shape density = 1 if density is None else density smooth = 0.5 if smooth is None else smooth grs = [] for dim_i in range(n_dim): m, M = np.min(X_emb[:, dim_i]), np.max(X_emb[:, dim_i]) m = m - 0.01 * np.abs(M - m) M = M + 0.01 * np.abs(M - m) gr = np.linspace(m, M, int(50 * density)) grs.append(gr) meshes_tuple = np.meshgrid(*grs) X_grid = np.vstack([i.flat for i in meshes_tuple]).T # p1, _, _, _, C = s._kernel_density_estimate_anisotropic( # X_grid, rotational_matrix, r_emb) # p1, _, _, _ = s._kernel_density_estimate(X_grid) p1 = kernel_density_estimate(X_emb, X_grid) # estimate grid velocities if n_neighbors is None: n_neighbors = int(n_obs / 50) nn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=-1) nn.fit(X_emb) dists, neighs = nn.kneighbors(X_grid) scale = np.mean([(g[1] - g[0]) for g in grs]) * smooth weight = normal.pdf(x=dists, scale=scale) p_mass = weight.sum(1) # p_mass = p1 V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) # V_grid = V_emb[neighs] V_grid /= np.maximum(1, p_mass)[:, None] if min_mass is None: min_mass = 1 min_mass *= np.percentile(p_mass, 99) / 100 # min_mass = 0.01 X_grid, V_grid = X_grid[p_mass > min_mass], V_grid[p_mass > min_mass] if autoscale: V_grid /= scale_quiver * quiver_autoscale(X_grid, V_grid) # plt.contourf(meshes_tuple[0], meshes_tuple[1], p1.reshape(int(50 * density),int(50 * density)), # levels=20, cmap='Blues') emb = adata.obsm[embedding] embedding_df = pd.DataFrame(adata.obsm[embedding], index=adata.obs_names, columns=['dim_0', 'dim_1']) # Calculate the range for the ticks x_min, x_max = embedding_df['dim_0'].min(), embedding_df['dim_0'].max() y_min, y_max = embedding_df['dim_1'].min(), embedding_df['dim_1'].max() x_mid = (x_min + x_max) / 2 y_mid = (y_min + y_max) / 2 # Determine the common range common_min = min(x_min, y_min) common_max = max(x_max, y_max) x_range = common_max - common_min y_range = common_max - common_min # color = np.array(adata.obs['leiden']).astype(int) # plt.scatter(emb[:,0],emb[:,1], s=1, c=color, cmap='Set2', alpha=0.1) # plt.scatter(emb[:,0],emb[:,1], s=1, alpha=0.1) # plt.title(f'{vkey}') # plt.xlim(x_mid-x_range/2, x_mid+x_range/2) # plt.ylim(y_mid-y_range/2, y_mid+y_range/2) plt.xticks([]) plt.yticks([]) qv2 = plt.quiver(X_grid[:,0], X_grid[:,1],V_grid[:,0],V_grid[:,1],color='blue',alpha=1,scale=3) # create a legend by arrows, red for v1, blue for v2 legend = plt.legend([qv1, qv2], ['v1', 'v2']) legend.set_bbox_to_anchor((1.2, 1)) # Set the shape of the legend plt.show()
# plt.clf() # def matrix_multiply(X, Y): # # X shape: (11951, 60, 100) # # Y shape: (100, 14577) # # The goal is to multiply each 60x100 matrix in X with Y, resulting in 11951 matrices of size 60x14577 # # Reshape X to a 2D array for matrix multiplication # X_reshaped = X.reshape(-1, Y.shape[0]) # Shape becomes (11951*60, 100) # # Perform matrix multiplication # result = np.dot(X_reshaped, Y) # Resulting shape is (11951*60, 14577) # # Reshape the result back to 3D # result_reshaped = result.reshape(X.shape[0], X.shape[1], Y.shape[1]) # Shape becomes (11951, 60, 14577) # return result_reshaped # from multiprocessing import Pool # # import itertools # def compute_norm_chunk(array, start, end): # # Slice the actual array # chunk = array[start:end] # return np.linalg.norm(chunk, axis=1) # def compute_norm_parallel(array, chunk_size): # # Split the first dimension into chunks # ranges = [(i, min(i + chunk_size, array.shape[0])) for i in range(0, array.shape[0], chunk_size)] # with Pool() as pool: # # Map the compute_norm_chunk function to each chunk # results = pool.starmap(compute_norm_chunk, [(array, r[0], r[1]) for r in ranges]) # # Concatenate the results # return np.concatenate(results)
[docs] def local_intrinsic_dim( adata: AnnData, threshold=0.9, plot: bool = False, ): """ Compute the intrinsic dimensionality locally based on local SVD. Parameters ---------- adata : AnnData An annotated data matrix. threshold : float Threshold for proportion of variance contributions. The default is 0.9. plot : bool If True, plot a histogram of the intrinsic dimensionality. Default False. Returns ------- intrinsic_dim : np.ndarray Intrinsic dimensionality. """ singular_values_collection = adata.obsm['Singular_value'].copy() # Compute intrinsic dimensionality locally def pc_accumulation(arr, threshold): arr_sum = np.sum(np.square(arr)) temp_sum = 0 for i in range(arr.shape[0]): temp_sum += arr[i] * arr[i] if temp_sum > arr_sum * threshold: return i intrinsic_dim = np.zeros(adata.shape[0]).astype(int) for i in range(adata.shape[0]): intrinsic_dim[i] = pc_accumulation(singular_values_collection[i], threshold) if plot: plt.hist(intrinsic_dim, bins=30) plt.title('Local_intrinsic_dim') plt.xlabel('Intrinsic dimensionality') plt.ylabel('Count') plt.show() plt.clf() adata.obs['intrinsic_dim'] = intrinsic_dim return intrinsic_dim
# @numba.njit()
[docs] def feature_variation( adata: AnnData, threshold=0.5, ): """ Compute the feature variation and feature loadings based on local SVD. Parameters ---------- adata : AnnData An annotated data matrix. """ import numpy as np vh_smoothed = adata.obsm['vh_smoothed'].copy() svd_vh = adata.varm['svd_vh'].copy().T # gauge_vh_original = adata.obsm['gauge_vh_original'].copy() # vh_smoothed = gauge_vh_original intrinsic_dim = local_intrinsic_dim(adata, threshold) # sc.pl.embedding(adata,'featmap',color=['instrinsic_dim'],cmap='plasma') # sc.pl.embedding(adata,'umap',color=['instrinsic_dim'],cmap='plasma') # Compute the gene norm in top k PCs (norm of the arrow in biplot) k = int(np.median(intrinsic_dim)) print(f'k is {k}') print("Start matrix multiplication") T1 = time.time() # pcVals_project_back = np.matmul(vh_smoothed, svd_vh[np.newaxis, :]) # pcVals_project_back = np.matmul(vh_smoothed, svd_vh) pcVals_project_back = np.matmul(vh_smoothed[:,:k,:], svd_vh) adata.obsm['pcVals_project_back'] = pcVals_project_back # pcVals_project_back = np.matmul(np.squeeze(vh_smoothed[:,:k,:]), svd_vh) # pcVals_project_back = matrix_multiply(vh_smoothed, svd_vh) T2 = time.time() print(f'Finish matrix multiplication in {T2-T1}') T1 = time.time() # gene_val_norm = np.linalg.norm(pcVals_project_back[:, :k, :], axis=1) gene_val_norm = np.linalg.norm(pcVals_project_back, axis=1) # gene_val_norm = np.abs(pcVals_project_back) adata.layers['variation_feature'] = gene_val_norm T2 = time.time() print(f'Finish norm calculation in {T2-T1}')
# T1 = time.time() # gene_norm_first_two = np.linalg.norm(pcVals_project_back[:, :2, :], axis=1) # pc_loadings_scale = pcVals_project_back[:, :2, :] /\ # gene_norm_first_two[:,np.newaxis,:] *\ # gene_val_norm[:,np.newaxis,:] # # pc_loadings_scale = pcVals_project_back[:, :2, :] /\ # # np.linalg.norm(pcVals_project_back[:, :2, :], axis=1)[:,np.newaxis,:] # adata.obsm['feature_loading_scale'] = pc_loadings_scale # T2 = time.time() # print(f'Finish feature loading in {T2-T1}') # # Feature loadings on each local gauge # gauge_vh_emb = adata.obsm['VH_embedding'] # feature_loading_emb = adata.obsm['feature_loading_scale'] # feature_loadings_embedding = np.matmul(feature_loading_emb.transpose(0,2,1), gauge_vh_emb.transpose(0,2,1)) # Project to gauge_embedding # adata.obsm['feature_loading_embedding'] = feature_loadings_embedding.transpose(0,2,1) # @numba.njit()
[docs] def feature_variation_one_feature( adata: AnnData, feature='', threshold=0.9, feature_key='variation_feature', ): """ Compute the feature variation and feature loadings based on local SVD. Parameters ---------- adata : AnnData An annotated data matrix. """ # check if feature_key exists in adata.layers if feature_key in adata.layers.keys(): adata.obsm[f'variation_feature_{feature}'] = adata.layers[feature_key][:, np.where(adata.var.index == feature)[0][0]].copy() else: vh_smoothed = adata.obsm['vh_smoothed'].copy() svd_vh = adata.varm['svd_vh'].copy().T feature_index = np.where(adata.var.index == feature)[0][0] svd_vh = svd_vh[:,feature_index].reshape(-1,1) intrinsic_dim = local_intrinsic_dim(adata, threshold) # Compute the gene norm in top k PCs (norm of the arrow in biplot) k = int(np.median(intrinsic_dim)) print(f'k is {k}') print("Start matrix multiplication") T1 = time.time() # pcVals_project_back = np.matmul(vh_smoothed, svd_vh[np.newaxis, :]) # pcVals_project_back = np.matmul(vh_smoothed, svd_vh) pcVals_project_back = np.matmul(vh_smoothed[:,:k,:], svd_vh) pcVals_project_back = np.squeeze(pcVals_project_back) print(pcVals_project_back.shape) # adata.obsm['pcVals_project_back'] = pcVals_project_back T2 = time.time() print(f'Finish matrix multiplication in {T2-T1}') T1 = time.time() gene_val_norm = np.linalg.norm(pcVals_project_back, axis=1) if k > 1 else np.abs(pcVals_project_back) # gene_val_norm = np.abs(pcVals_project_back) adata.obsm[f'variation_feature_{feature}'] = gene_val_norm T2 = time.time() print(f'Finish norm calculation in {T2-T1}')
# @numba.njit()
[docs] def feature_projection( adata: AnnData, feature='', vkey='VH_embedding', # parallel=False ): """ Compute the feature variation and feature loadings based on local SVD. Parameters ---------- adata : AnnData An annotated data matrix. feature : string Feature name to be plotted. """ vh = adata.obsm['vh_smoothed'].copy() # vh = adata.obsm['VH'].copy() # gauge_vh_original = adata.obsm['gauge_vh_original'].copy() # vh_smoothed = gauge_vh_original # gauge_u = adata.obsm['gauge_u'].copy() # singular_values_collection = adata.obsm['Singular_value'].copy() svd_vh = adata.varm['svd_vh'].copy().T # Compute the gene norm in top k PCs (norm of the arrow in biplot) print("Start matrix multiplication") T1 = time.time() feature_index = np.where(adata.var.index == feature)[0][0] pcVals_project_back_feature = np.matmul(np.array(vh)[:,:2,:], svd_vh[:,feature_index].reshape(-1,1)) T2 = time.time() print(f'Finish matrix multiplication in {T2-T1}') # feature_index = np.where(adata.var.index == feature)[0][0] # pcVals_project_back_feature = np.matmul(np.array(vh_smoothed)[:,:2,:], svd_vh[:,feature_index].reshape(-1,1)) print(f'pcVals_project_back_feature: {pcVals_project_back_feature.shape}') # Feature loadings on each local gauge gauge_vh_emb = adata.obsm[vkey] print(f'gauge_vh_emb: {gauge_vh_emb.shape}') # feature_loading_emb = adata.obsm['feature_loading_scale'] # feature_loadings_embedding = np.matmul(pcVals_project_back_feature.transpose(0,2,1), gauge_vh_emb) # Project to gauge_embedding feature_loadings_embedding = np.matmul(gauge_vh_emb, pcVals_project_back_feature) # Project to gauge_embedding adata.obsm[f'feature_{feature}_loading'] = np.squeeze(feature_loadings_embedding)
# @numba.njit()
[docs] def feature_gradient( adata: AnnData, feature='', # threshold=0.9, # parallel=False ): """ Compute the feature variation and feature loadings based on local SVD. Parameters ---------- adata : AnnData An annotated data matrix. feature : string Feature name to be plotted. """ vh = adata.obsm['vh_smoothed'].copy() # vh = adata.obsm['VH'].copy() # gauge_vh_original = adata.obsm['gauge_vh_original'].copy() # vh_smoothed = gauge_vh_original # gauge_u = adata.obsm['gauge_u'].copy() # singular_values_collection = adata.obsm['Singular_value'].copy() svd_vh = adata.varm['svd_vh'].copy().T # intrinsic_dim = local_intrinsic_dim(adata, threshold) # # Compute the gene norm in top k PCs (norm of the arrow in biplot) # k = int(np.median(intrinsic_dim)) k=2 # Compute the gene norm in top k PCs (norm of the arrow in biplot) print("Start matrix multiplication") T1 = time.time() pcVals_project_back_feature = np.matmul(np.array(vh)[:,:k,:], svd_vh) print(f'pcVals_project_back_feature: {pcVals_project_back_feature.shape}') T2 = time.time() print(f'Finish matrix multiplication in {T2-T1}') feature_index = np.where(adata.var.index == feature)[0][0] X_slice = pcVals_project_back_feature[:,:,feature_index] X_diag = np.array([np.diag(row) for row in X_slice]) feature_grad = np.matmul(X_diag, pcVals_project_back_feature) feature_gradient = np.sum(feature_grad, axis=1) print(f'feature_gradient: {feature_gradient.shape}') # feature_gradient = np.sum(feature_gradient, axis=1) # print(f'feature_gradient: {feature_gradient.shape}') adata.obsm[f'feature_{feature}_gradient'] = feature_gradient
[docs] def feature_gradient_projection( adata: AnnData, feature='', random_state=42 ): data = adata.obsm[f'feature_{feature}_gradient'] import umap emb_variation = umap.UMAP(random_state=random_state,min_dist=0.1).fit(data) adata.obsm[f'feature_{feature}_loading'] = emb_variation.embedding_ del adata.obsm[f'feature_{feature}_gradient']
[docs] def plot_feature( adata:AnnData, feature='', feature_loading_emb='feature_loading_embedding', embedding='X_featmap', cluster_key='clusters', plot_within_cluster=[], pseudotime_adjusted=False, pseudotime='dpt_pseudotime', trend='positive', ratio=0.2, density=1, smooth=0.5, n_neighbors=None, min_mass=1, autoscale=True,): """ Plot a given feature (e.g., gene) in two dimensional visualization Parameters ---------- adata : AnnData An annotated data matrix. feature : string Feature name to be plotted. embedding : string Embedding background for feature plot. The default is 'X_featmap'. cluster_key : string Cluster name indicator. The default is 'clusters'. plot_within_cluster : list A list of clusters in which the feaure is to plot. The default is []. pseudotime_adjusted : bool Whether to adjust the feature direction by pseudotime. The default is False. pseudotime : string Pseudotime indicator. The default is 'dpt_pseudotime'. trend : string of {'positive','negative'} The direction along pseudotime. The default is 'positive'. ratio : float Filtering ratio by expression to filter varition by low expression. The default is 0.5. density : float Grid desity for plot. The default is 1. smooth : float For kde estimation. The default is 0.5. n_neighbors : int Number of neighbours for kde. The default is None. min_mass : float Minumum denstiy to show the grid plot. The default is 1. autoscale : bool Scale the arrow plot. The default is True. """ # Compute the feature loading embedding # feature_loading(adata) vkey=f'feature_{feature}_loading' feature_id = np.where(adata.var_names == feature)[0][0] adata.obsm[vkey] = adata.obsm[feature_loading_emb][:,:,feature_id] # Set grid as the support X_emb=adata.obsm[embedding] V_emb=adata.obsm[vkey] idx_valid = np.isfinite(X_emb.sum(1) + V_emb.sum(1)) X_emb = X_emb[idx_valid] V_emb = V_emb[idx_valid] # prepare grid n_obs, n_dim = X_emb.shape density = 1 if density is None else density smooth = 0.5 if smooth is None else smooth grs = [] for dim_i in range(n_dim): m, M = np.min(X_emb[:, dim_i]), np.max(X_emb[:, dim_i]) m = m - 0.01 * np.abs(M - m) M = M + 0.01 * np.abs(M - m) gr = np.linspace(m, M, int(50 * density)) grs.append(gr) meshes_tuple = np.meshgrid(*grs) X_grid = np.vstack([i.flat for i in meshes_tuple]).T # p1, _, _, _, C = s._kernel_density_estimate_anisotropic( # X_grid, rotational_matrix, r_emb) # p1, _, _, _ = s._kernel_density_estimate(X_grid) p1 = kernel_density_estimate(X_emb, X_grid) # estimate grid variation if n_neighbors is None: n_neighbors = int(n_obs / 50) nn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=-1) nn.fit(X_emb) dists, neighs = nn.kneighbors(X_grid) scale = np.mean([(g[1] - g[0]) for g in grs]) * smooth weight = normal.pdf(x=dists, scale=scale) p_mass = weight.sum(1) # p_mass = p1 V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) # V_grid = V_emb[neighs] V_grid /= np.maximum(1, p_mass)[:, None] if min_mass is None: min_mass = 1 # Restrict the plot within given clusters def grid_within_cluster(X_grid): nn = NearestNeighbors(n_neighbors=1, n_jobs=-1) nn.fit(X_emb) _, neighs = nn.kneighbors(X_grid) # plot_within_cluster = ['Beta'] if len(plot_within_cluster) > 0: grid_in_cluster = [] for cluster in plot_within_cluster: idx_in_cluster = np.where(np.array(adata.obs[cluster_key] == cluster))[0] for i in range(neighs.shape[0]): if neighs[i,0] in idx_in_cluster: grid_in_cluster.append(i) return grid_in_cluster # start ploting feature feature_id = np.where(adata.var_names == feature)[0][0] # average expression in grid points over NNs expr_grid = [] if isinstance(adata.X, np.ndarray): expr_count = adata.X.copy()[:,feature_id] else: expr_count = adata.X.toarray().copy()[:,feature_id] expr_grid = (expr_count[neighs] * weight).sum(1) expr_grid /= np.maximum(1, p_mass) # Filter the expr_velo by low expression threshold = max(expr_grid) * ratio # feature_velo_loading = pc_loadings_grid[:,:,feature_id] V_grid[expr_grid<threshold]=np.nan min_mass *= np.percentile(p_mass, 99) / 100 # min_mass = 0.01 X_grid, V_grid = X_grid[p_mass > min_mass], V_grid[p_mass > min_mass] if autoscale: V_grid /= quiver_autoscale(X_grid, V_grid) # Adjust the v direction by the sign of local expression change # V_grid = V_grid * 10 displace_grid = X_grid + V_grid grid_idx = np.unique(np.where(np.isnan(displace_grid) == False)[0]) _, displace_grid_neighs = nn.kneighbors(displace_grid[grid_idx]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx]) displace_expr = np.mean(expr_count[displace_grid_neighs[:,:100]], axis=1) - np.mean(expr_count[start_grid_neighs[:,:100]],axis=1) displace_expr_sign = np.sign(displace_expr) # displace_expr_sign[displace_expr_sign == 0] = 1 V_grid[grid_idx] = np.multiply(V_grid[grid_idx], displace_expr_sign[:, np.newaxis]) # Keep arrows along the positive (negative) trend of time flow if pseudotime_adjusted: time_ = np.array(adata.obs[pseudotime]) displace_grid_adjusted = X_grid + V_grid grid_idx_adjusted = np.unique(np.where(np.isnan(displace_grid_adjusted) == False)[0]) _, displace_grid_neighs = nn.kneighbors(displace_grid_adjusted[grid_idx_adjusted]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx_adjusted]) displace_time = np.mean(time_[displace_grid_neighs[:,:100]], axis=1) - np.mean(time_[start_grid_neighs[:,:100]],axis=1) displace_time_sign = np.sign(displace_time) if trend == 'positive': displace_time_sign[displace_time_sign < 0] = 0 else: displace_time_sign[displace_time_sign > 0] = 0 displace_time_sign[displace_time_sign < 0] = 1 V_grid[grid_idx_adjusted] = np.multiply(V_grid[grid_idx_adjusted], displace_time_sign[:, np.newaxis]) # plt.contourf(meshes_tuple[0], meshes_tuple[1], p1.reshape(int(50 * density),int(50 * density)), # levels=20, cmap='Blues') emb = adata.obsm[embedding] # color = np.array(adata.obs['leiden']).astype(int) # plt.scatter(emb[:,0],emb[:,1], s=1, c=color, cmap='Set2', alpha=0.1) plt.scatter(emb[:,0],emb[:,1], s=1, alpha=0.1) plt.title(feature) plt.xticks([]) plt.yticks([]) if len(plot_within_cluster) > 0: grid_in_cluster = grid_within_cluster(X_grid) plt.quiver(X_grid[grid_in_cluster,0], X_grid[grid_in_cluster,1],V_grid[grid_in_cluster,0],V_grid[grid_in_cluster,1],color='black',alpha=1) else: plt.quiver(X_grid[:,0], X_grid[:,1],V_grid[:,0],V_grid[:,1],color='black',alpha=1,scale=2) plt.xlabel(feature_loading_emb) # save the plot plt.savefig(f'./figures/cd8/gene_{feature}_emb_{embedding}.png') plt.show() plt.clf() del adata.obsm[vkey]
# plt.savefig(f'./data/flow/gene_{feature}.pdf')
[docs] def plot_one_feature( adata:AnnData, feature='', embedding='X_featmap', cluster_key='clusters', plot_within_cluster=[], pseudotime_adjusted=False, pseudotime='dpt_pseudotime', trend='positive', ratio=0.2, density=1, smooth=0.5, n_neighbors=None, min_mass=1, autoscale=True, scale_quiver=1.0, output_v_grid=False, scale_by_variation=False, ax=None): """ Plot a given feature (e.g., gene) in two dimensional visualization Parameters ---------- adata : AnnData An annotated data matrix. feature : string Feature name to be plotted. embedding : string Embedding background for feature plot. The default is 'X_featmap'. cluster_key : string Cluster name indicator. The default is 'clusters'. plot_within_cluster : list A list of clusters in which the feaure is to plot. The default is []. pseudotime_adjusted : bool Whether to adjust the feature direction by pseudotime. The default is False. pseudotime : string Pseudotime indicator. The default is 'dpt_pseudotime'. trend : string of {'positive','negative'} The direction along pseudotime. The default is 'positive'. ratio : float Filtering ratio by expression to filter varition by low expression. The default is 0.5. density : float Grid desity for plot. The default is 1. smooth : float For kde estimation. The default is 0.5. n_neighbors : int Number of neighbours for kde. The default is None. min_mass : float Minumum denstiy to show the grid plot. The default is 1. autoscale : bool Scale the arrow plot. The default is True. output_v_grid : bool Output the grid velocity. The default is False. scale_by_variation : bool Scale the arrow plot by feature variation. The default is True. ax : matplotlib axis The axis to plot the feature. The default is None. """ # Compute the feature loading embedding # feature_loading(adata) vkey=f'feature_{feature}_loading' feature_id = np.where(adata.var_names == feature)[0][0] # adata.obsm[vkey] = adata.obsm[feature_loading_emb][:,:,feature_id] # Set grid as the support X_emb=adata.obsm[embedding] V_emb=adata.obsm[vkey] idx_valid = np.isfinite(X_emb.sum(1) + V_emb.sum(1)) X_emb = X_emb[idx_valid] V_emb = V_emb[idx_valid] if scale_by_variation: variation_feature = adata.obsm[f'variation_feature_{feature}'] V_emb *= variation_feature[:, np.newaxis] # prepare grid n_obs, n_dim = X_emb.shape density = 1 if density is None else density smooth = 0.5 if smooth is None else smooth grs = [] for dim_i in range(n_dim): m, M = np.min(X_emb[:, dim_i]), np.max(X_emb[:, dim_i]) m = m - 0.01 * np.abs(M - m) M = M + 0.01 * np.abs(M - m) gr = np.linspace(m, M, int(50 * density)) grs.append(gr) meshes_tuple = np.meshgrid(*grs) X_grid = np.vstack([i.flat for i in meshes_tuple]).T # p1, _, _, _, C = s._kernel_density_estimate_anisotropic( # X_grid, rotational_matrix, r_emb) # p1, _, _, _ = s._kernel_density_estimate(X_grid) p1 = kernel_density_estimate(X_emb, X_grid) # estimate grid variation if n_neighbors is None: n_neighbors = int(n_obs / 50) nn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=-1) nn.fit(X_emb) dists, neighs = nn.kneighbors(X_grid) scale = np.mean([(g[1] - g[0]) for g in grs]) * smooth weight = normal.pdf(x=dists, scale=scale) p_mass = weight.sum(1) # p_mass = p1 V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) # V_grid = V_emb[neighs] V_grid /= np.maximum(1, p_mass)[:, None] if min_mass is None: min_mass = 1 # print('V_grid:', V_grid) # V_grid /= np.max(np.linalg.norm(V_grid, axis=1)) # Restrict the plot within given clusters def grid_within_cluster(X_grid): nn = NearestNeighbors(n_neighbors=1, n_jobs=-1) nn.fit(X_emb) _, neighs = nn.kneighbors(X_grid) # plot_within_cluster = ['Beta'] if len(plot_within_cluster) > 0: grid_in_cluster = [] for cluster in plot_within_cluster: idx_in_cluster = np.where(np.array(adata.obs[cluster_key] == cluster))[0] for i in range(neighs.shape[0]): if neighs[i,0] in idx_in_cluster: grid_in_cluster.append(i) return grid_in_cluster # start ploting feature feature_id = np.where(adata.var_names == feature)[0][0] # average expression in grid points over NNs expr_grid = [] if isinstance(adata.X, np.ndarray): expr_count = adata.X.copy()[:,feature_id] else: expr_count = adata.X.toarray().copy()[:,feature_id] expr_grid = (expr_count[neighs] * weight).sum(1) expr_grid /= np.maximum(1, p_mass) # print('V_grid_1:', V_grid) # Filter the expr_velo by low expression threshold = max(expr_grid) * ratio # feature_velo_loading = pc_loadings_grid[:,:,feature_id] V_grid[expr_grid<threshold]=np.nan # print('V_grid', V_grid) min_mass *= np.percentile(p_mass, 99) / 100 # min_mass = 0.01 X_grid, V_grid = X_grid[p_mass > min_mass], V_grid[p_mass > min_mass] if autoscale: V_grid /= scale_quiver * quiver_autoscale(X_grid, V_grid) # Adjust the v direction by the sign of local expression change # V_grid = V_grid * 10 displace_grid = X_grid + V_grid grid_idx = np.unique(np.where(np.isnan(displace_grid) == False)[0]) if grid_idx.shape[0] != 0: _, displace_grid_neighs = nn.kneighbors(displace_grid[grid_idx]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx]) displace_expr = np.mean(expr_count[displace_grid_neighs[:,:100]], axis=1) - np.mean(expr_count[start_grid_neighs[:,:100]],axis=1) displace_expr_sign = np.sign(displace_expr) # displace_expr_sign[displace_expr_sign == 0] = 1 V_grid[grid_idx] = np.multiply(V_grid[grid_idx], displace_expr_sign[:, np.newaxis]) # Keep arrows along the positive (negative) trend of time flow if pseudotime_adjusted: time_ = np.array(adata.obs[pseudotime]) displace_grid_adjusted = X_grid + V_grid grid_idx_adjusted = np.unique(np.where(np.isnan(displace_grid_adjusted) == False)[0]) _, displace_grid_neighs = nn.kneighbors(displace_grid_adjusted[grid_idx_adjusted]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx_adjusted]) displace_time = np.mean(time_[displace_grid_neighs[:,:100]], axis=1) - np.mean(time_[start_grid_neighs[:,:100]],axis=1) displace_time_sign = np.sign(displace_time) if trend == 'positive': displace_time_sign[displace_time_sign < 0] = 0 else: displace_time_sign[displace_time_sign > 0] = 0 displace_time_sign[displace_time_sign < 0] = 1 V_grid[grid_idx_adjusted] = np.multiply(V_grid[grid_idx_adjusted], displace_time_sign[:, np.newaxis]) # plt.contourf(meshes_tuple[0], meshes_tuple[1], p1.reshape(int(50 * density),int(50 * density)), # levels=20, cmap='Blues') # ax = plt.subplot() # emb = adata.obsm[embedding] # # color = np.array(adata.obs['clusters']) # import seaborn as sns # # sns.scatterplot(data=adata.obs, x=emb[:,0], y=emb[:,1], s=20, hue='clusters', palette='tab10', alpha=0.5, legend=False) # # plt.scatter(emb[:,0],emb[:,1], s=1, c=color, cmap='tab10', alpha=0.1) # plt.scatter(emb[:,0],emb[:,1], s=1, alpha=0.1) # plt.title(feature) # plt.xticks([]) # plt.yticks([]) ax = plt.subplot() if ax is None else ax emb = adata.obsm[embedding] embedding_df = pd.DataFrame(adata.obsm[embedding], index=adata.obs_names, columns=['dim_0', 'dim_1']) # Calculate the range for the ticks x_min, x_max = embedding_df['dim_0'].min(), embedding_df['dim_0'].max() y_min, y_max = embedding_df['dim_1'].min(), embedding_df['dim_1'].max() x_mid = (x_min + x_max) / 2 y_mid = (y_min + y_max) / 2 # Determine the common range common_min = min(x_min, y_min) common_max = max(x_max, y_max) x_range = common_max - common_min y_range = common_max - common_min # color = np.array(adata.obs['leiden']).astype(int) # plt.scatter(emb[:,0],emb[:,1], s=1, c=color, cmap='Set2', alpha=0.1) plt.scatter(emb[:,0],emb[:,1], s=1, alpha=0.1) plt.title(f'{vkey}') # plt.xlim(x_mid-x_range/2, x_mid+x_range/2) # plt.ylim(y_mid-y_range/2, y_mid+y_range/2) plt.xticks([]) plt.yticks([]) # check if 'clusters' is in adata.obs # if 'clusters' in adata.obs: # sc.pl.embedding(adata, embedding, color='clusters', projection='2d', size=20, ax=ax, show=False, alpha=0.2) if len(plot_within_cluster) > 0: grid_in_cluster = grid_within_cluster(X_grid) plt.quiver(X_grid[grid_in_cluster,0], X_grid[grid_in_cluster,1],V_grid[grid_in_cluster,0],V_grid[grid_in_cluster,1],color='black',alpha=1,width=0.004, headwidth=5, headlength=5) else: plt.quiver(X_grid[:,0], X_grid[:,1],V_grid[:,0],V_grid[:,1],color='black',alpha=1, width=0.002, headwidth=5, headlength=5) plt.xlabel(vkey) # save the plot plt.show()
# plt.clf() # del adata.obsm[vkey] # plt.savefig(f'./figures/gene_{feature}.png') # if output_v_grid: # return V_grid # return ax
[docs] def plot_multiple_features( adata:AnnData, features=[], embedding='X_featmap', cluster_key='clusters', plot_within_cluster=[], pseudotime_adjusted=False, pseudotime='dpt_pseudotime', trend='positive', ratio=0.2, density=1, smooth=0.5, n_neighbors=None, min_mass=1, autoscale=True, scale_by_variation=True, ): """ Plot a given feature (e.g., gene) in two dimensional visualization Parameters ---------- adata : AnnData An annotated data matrix. feature : string Feature name to be plotted. embedding : string Embedding background for feature plot. The default is 'X_featmap'. cluster_key : string Cluster name indicator. The default is 'clusters'. plot_within_cluster : list A list of clusters in which the feaure is to plot. The default is []. pseudotime_adjusted : bool Whether to adjust the feature direction by pseudotime. The default is False. pseudotime : string Pseudotime indicator. The default is 'dpt_pseudotime'. trend : string of {'positive','negative'} The direction along pseudotime. The default is 'positive'. ratio : float Filtering ratio by expression to filter varition by low expression. The default is 0.5. density : float Grid desity for plot. The default is 1. smooth : float For kde estimation. The default is 0.5. n_neighbors : int Number of neighbours for kde. The default is None. min_mass : float Minumum denstiy to show the grid plot. The default is 1. autoscale : bool Scale the arrow plot. The default is True. output_v_grid : bool Output the grid velocity. The default is False. scale_by_variation : bool Scale the arrow plot by feature variation. The default is True. """ colors = sns.color_palette('tab10', n_colors=len(features)) for i, feature in enumerate(features): # Project the feature to the gauge embedding feature_projection(adata, feature=feature) # Compute the feature loading embedding vkey=f'feature_{feature}_loading' feature_id = np.where(adata.var_names == feature)[0][0] # adata.obsm[vkey] = adata.obsm[feature_loading_emb][:,:,feature_id] # Set grid as the support X_emb=adata.obsm[embedding] V_emb=adata.obsm[vkey] idx_valid = np.isfinite(X_emb.sum(1) + V_emb.sum(1)) X_emb = X_emb[idx_valid] V_emb = V_emb[idx_valid] if scale_by_variation: feature_variation_one_feature(adata, feature=feature) variation_feature = adata.obsm[f'variation_feature_{feature}'] V_emb *= variation_feature[:, np.newaxis] del adata.obsm[f'variation_feature_{feature}'] # prepare grid n_obs, n_dim = X_emb.shape density = 1 if density is None else density smooth = 0.5 if smooth is None else smooth grs = [] for dim_i in range(n_dim): m, M = np.min(X_emb[:, dim_i]), np.max(X_emb[:, dim_i]) m = m - 0.01 * np.abs(M - m) M = M + 0.01 * np.abs(M - m) gr = np.linspace(m, M, int(50 * density)) grs.append(gr) meshes_tuple = np.meshgrid(*grs) X_grid = np.vstack([i.flat for i in meshes_tuple]).T # p1, _, _, _, C = s._kernel_density_estimate_anisotropic( # X_grid, rotational_matrix, r_emb) # p1, _, _, _ = s._kernel_density_estimate(X_grid) p1 = kernel_density_estimate(X_emb, X_grid) # estimate grid variation if n_neighbors is None: n_neighbors = int(n_obs / 50) nn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=-1) nn.fit(X_emb) dists, neighs = nn.kneighbors(X_grid) scale = np.mean([(g[1] - g[0]) for g in grs]) * smooth weight = normal.pdf(x=dists, scale=scale) p_mass = weight.sum(1) # p_mass = p1 V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) # V_grid = V_emb[neighs] V_grid /= np.maximum(1, p_mass)[:, None] if min_mass is None: min_mass = 1 # V_grid /= np.max(np.linalg.norm(V_grid, axis=1)) # Restrict the plot within given clusters def grid_within_cluster(X_grid): nn = NearestNeighbors(n_neighbors=1, n_jobs=-1) nn.fit(X_emb) _, neighs = nn.kneighbors(X_grid) # plot_within_cluster = ['Beta'] if len(plot_within_cluster) > 0: grid_in_cluster = [] for cluster in plot_within_cluster: idx_in_cluster = np.where(np.array(adata.obs[cluster_key] == cluster))[0] for i in range(neighs.shape[0]): if neighs[i,0] in idx_in_cluster: grid_in_cluster.append(i) return grid_in_cluster # start ploting feature feature_id = np.where(adata.var_names == feature)[0][0] # average expression in grid points over NNs expr_grid = [] if isinstance(adata.X, np.ndarray): expr_count = adata.X.copy()[:,feature_id] else: expr_count = adata.X.toarray().copy()[:,feature_id] expr_grid = (expr_count[neighs] * weight).sum(1) expr_grid /= np.maximum(1, p_mass) # print('V_grid_1:', V_grid) # Filter the expr_velo by low expression threshold = max(expr_grid) * ratio # feature_velo_loading = pc_loadings_grid[:,:,feature_id] V_grid[expr_grid<threshold]=np.nan # print('V_grid', V_grid) min_mass *= np.percentile(p_mass, 99) / 100 # min_mass = 0.01 X_grid, V_grid = X_grid[p_mass > min_mass], V_grid[p_mass > min_mass] if autoscale: V_grid /= 1* quiver_autoscale(X_grid, V_grid) # Adjust the v direction by the sign of local expression change # V_grid = V_grid * 10 displace_grid = X_grid + V_grid grid_idx = np.unique(np.where(np.isnan(displace_grid) == False)[0]) if grid_idx.shape[0] != 0: _, displace_grid_neighs = nn.kneighbors(displace_grid[grid_idx]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx]) displace_expr = np.mean(expr_count[displace_grid_neighs[:,:100]], axis=1) - np.mean(expr_count[start_grid_neighs[:,:100]],axis=1) displace_expr_sign = np.sign(displace_expr) # displace_expr_sign[displace_expr_sign == 0] = 1 V_grid[grid_idx] = np.multiply(V_grid[grid_idx], displace_expr_sign[:, np.newaxis]) # Keep arrows along the positive (negative) trend of time flow if pseudotime_adjusted: time_ = np.array(adata.obs[pseudotime]) displace_grid_adjusted = X_grid + V_grid grid_idx_adjusted = np.unique(np.where(np.isnan(displace_grid_adjusted) == False)[0]) _, displace_grid_neighs = nn.kneighbors(displace_grid_adjusted[grid_idx_adjusted]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx_adjusted]) displace_time = np.mean(time_[displace_grid_neighs[:,:100]], axis=1) - np.mean(time_[start_grid_neighs[:,:100]],axis=1) displace_time_sign = np.sign(displace_time) if trend == 'positive': displace_time_sign[displace_time_sign < 0] = 0 else: displace_time_sign[displace_time_sign > 0] = 0 displace_time_sign[displace_time_sign < 0] = 1 V_grid[grid_idx_adjusted] = np.multiply(V_grid[grid_idx_adjusted], displace_time_sign[:, np.newaxis]) ax = plt.subplot() emb = adata.obsm[embedding] embedding_df = pd.DataFrame(adata.obsm[embedding], index=adata.obs_names, columns=['dim_0', 'dim_1']) # Calculate the range for the ticks x_min, x_max = embedding_df['dim_0'].min(), embedding_df['dim_0'].max() y_min, y_max = embedding_df['dim_1'].min(), embedding_df['dim_1'].max() x_mid = (x_min + x_max) / 2 y_mid = (y_min + y_max) / 2 # Determine the common range common_min = min(x_min, y_min) common_max = max(x_max, y_max) x_range = common_max - common_min y_range = common_max - common_min # color = np.array(adata.obs['leiden']).astype(int) # plt.scatter(emb[:,0],emb[:,1], s=1, c=color, cmap='Set2', alpha=0.1) plt.scatter(emb[:,0],emb[:,1], s=1, alpha=0.1, c='grey') plt.title('Feature projection') plt.xlim(x_mid-x_range/2, x_mid+x_range/2) plt.ylim(y_mid-y_range/2, y_mid+y_range/2) plt.xticks([]) plt.yticks([]) # check if 'clusters' is in adata.obs if 'clusters' in adata.obs: sc.pl.embedding(adata, embedding, color='clusters', projection='2d', size=20, ax=ax, show=False) if len(plot_within_cluster) > 0: grid_in_cluster = grid_within_cluster(X_grid) q = ax.quiver(X_grid[grid_in_cluster,0], X_grid[grid_in_cluster,1],V_grid[grid_in_cluster,0],V_grid[grid_in_cluster,1],color=colors[i],alpha=1) ax.quiverkey(q, X=0.9, Y=0.9-0.1*i, U=0.1, label=feature, labelpos='E',) else: plt.quiver(X_grid[:,0], X_grid[:,1],V_grid[:,0],V_grid[:,1],color=colors[i],alpha=1,scale=2) ax.quiverkey(q, X=0.9, Y=0.9-0.1*i, U=0.1, label=feature, labelpos='E',) del adata.obsm[vkey] plt.show()
[docs] def plot_one_feature_by_all_cells( adata:AnnData, feature='', embedding='X_featmap', cluster_key='clusters', plot_within_cluster=[], pseudotime_adjusted=False, pseudotime='dpt_pseudotime', trend='positive', ratio=0.2, density=1, smooth=0.5, n_neighbors=None, min_mass=1, autoscale=True, output_v_grid=False, scale_by_variation=True, scale_quiver=1.0,): """ Plot a given feature (e.g., gene) in two dimensional visualization Parameters ---------- adata : AnnData An annotated data matrix. feature : string Feature name to be plotted. embedding : string Embedding background for feature plot. The default is 'X_featmap'. cluster_key : string Cluster name indicator. The default is 'clusters'. plot_within_cluster : list A list of clusters in which the feaure is to plot. The default is []. pseudotime_adjusted : bool Whether to adjust the feature direction by pseudotime. The default is False. pseudotime : string Pseudotime indicator. The default is 'dpt_pseudotime'. trend : string of {'positive','negative'} The direction along pseudotime. The default is 'positive'. ratio : float Filtering ratio by expression to filter varition by low expression. The default is 0.5. density : float Grid desity for plot. The default is 1. smooth : float For kde estimation. The default is 0.5. n_neighbors : int Number of neighbours for kde. The default is None. min_mass : float Minumum denstiy to show the grid plot. The default is 1. autoscale : bool Scale the arrow plot. The default is True. """ # Compute the feature loading embedding # feature_loading(adata) vkey=f'feature_{feature}_loading' feature_id = np.where(adata.var_names == feature)[0][0] # adata.obsm[vkey] = adata.obsm[feature_loading_emb][:,:,feature_id] # Set grid as the support X_emb=adata.obsm[embedding] V_emb=adata.obsm[vkey] idx_valid = np.isfinite(X_emb.sum(1) + V_emb.sum(1)) X_emb = X_emb[idx_valid] V_emb = V_emb[idx_valid] # prepare grid n_obs, n_dim = X_emb.shape density = 1 if density is None else density smooth = 0.5 if smooth is None else smooth # grs = [] # for dim_i in range(n_dim): # m, M = np.min(X_emb[:, dim_i]), np.max(X_emb[:, dim_i]) # m = m - 0.01 * np.abs(M - m) # M = M + 0.01 * np.abs(M - m) # gr = np.linspace(m, M, int(50 * density)) # grs.append(gr) # print('grs:', grs) # set seed # np.random.seed(0) # ind = np.random.choice(X_emb.shape[0], int(X_emb.shape[0] * density), replace=False) grs = [X_emb[:,0], X_emb[:,1]] # meshes_tuple = np.meshgrid(*grs) # X_grid = np.vstack([i.flat for i in meshes_tuple]).T X_grid = X_emb # p1, _, _, _, C = s._kernel_density_estimate_anisotropic( # X_grid, rotational_matrix, r_emb) # p1, _, _, _ = s._kernel_density_estimate(X_grid) p1 = kernel_density_estimate(X_emb, X_grid) # estimate grid variation if n_neighbors is None: n_neighbors = int(n_obs / 50) nn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=-1) nn.fit(X_emb) dists, neighs = nn.kneighbors(X_grid) scale = np.mean([(g[1] - g[0]) for g in grs]) * smooth scale = np.abs(scale) weight = normal.pdf(x=dists, scale=scale) p_mass = weight.sum(1) # p_mass = p1 V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) # V_grid = V_emb[neighs] V_grid /= np.maximum(1, p_mass)[:, None] if min_mass is None: min_mass = 1 # V_grid /= np.max(np.linalg.norm(V_grid, axis=1)) # Restrict the plot within given clusters def grid_within_cluster(X_grid): nn = NearestNeighbors(n_neighbors=1, n_jobs=-1) nn.fit(X_emb) _, neighs = nn.kneighbors(X_grid) # plot_within_cluster = ['Beta'] if len(plot_within_cluster) > 0: grid_in_cluster = [] for cluster in plot_within_cluster: idx_in_cluster = np.where(np.array(adata.obs[cluster_key] == cluster))[0] for i in range(neighs.shape[0]): if neighs[i,0] in idx_in_cluster: grid_in_cluster.append(i) return grid_in_cluster # start ploting feature feature_id = np.where(adata.var_names == feature)[0][0] # average expression in grid points over NNs expr_grid = [] if isinstance(adata.X, np.ndarray): expr_count = adata.X.copy()[:,feature_id] else: expr_count = adata.X.toarray().copy()[:,feature_id] expr_grid = (expr_count[neighs] * weight).sum(1) expr_grid /= np.maximum(1, p_mass) # print('V_grid_1:', V_grid) # Filter the expr_velo by low expression threshold = max(expr_grid) * ratio # feature_velo_loading = pc_loadings_grid[:,:,feature_id] V_grid[expr_grid<threshold]=np.nan # min_mass *= np.percentile(p_mass, 99) / 100 # min_mass = 0.01 # X_grid, V_grid = X_grid[p_mass > min_mass], V_grid[p_mass > min_mass] if autoscale: V_grid /= scale_quiver * quiver_autoscale(X_grid, V_grid) # Adjust the v direction by the sign of local expression change # V_grid = V_grid * 10 displace_grid = X_grid + V_grid grid_idx = np.unique(np.where(np.isnan(displace_grid) == False)[0]) if grid_idx.shape[0] != 0: _, displace_grid_neighs = nn.kneighbors(displace_grid[grid_idx]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx]) displace_expr = np.mean(expr_count[displace_grid_neighs[:,:100]], axis=1) - np.mean(expr_count[start_grid_neighs[:,:100]],axis=1) displace_expr_sign = np.sign(displace_expr) # displace_expr_sign[displace_expr_sign == 0] = 1 V_grid[grid_idx] = np.multiply(V_grid[grid_idx], displace_expr_sign[:, np.newaxis]) # Keep arrows along the positive (negative) trend of time flow if pseudotime_adjusted: time_ = np.array(adata.obs[pseudotime]) displace_grid_adjusted = X_grid + V_grid grid_idx_adjusted = np.unique(np.where(np.isnan(displace_grid_adjusted) == False)[0]) _, displace_grid_neighs = nn.kneighbors(displace_grid_adjusted[grid_idx_adjusted]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx_adjusted]) displace_time = np.mean(time_[displace_grid_neighs[:,:100]], axis=1) - np.mean(time_[start_grid_neighs[:,:100]],axis=1) displace_time_sign = np.sign(displace_time) if trend == 'positive': displace_time_sign[displace_time_sign < 0] = 0 else: displace_time_sign[displace_time_sign > 0] = 0 displace_time_sign[displace_time_sign < 0] = 1 V_grid[grid_idx_adjusted] = np.multiply(V_grid[grid_idx_adjusted], displace_time_sign[:, np.newaxis]) if scale_by_variation: variation_feature = adata.obsm[f'variation_feature_{feature}'] V_grid *= variation_feature[:, np.newaxis] # plt.contourf(meshes_tuple[0], meshes_tuple[1], p1.reshape(int(50 * density),int(50 * density)), # levels=20, cmap='Blues') # ax = plt.subplot() # emb = adata.obsm[embedding] # # color = np.array(adata.obs['clusters']) # import seaborn as sns # # sns.scatterplot(data=adata.obs, x=emb[:,0], y=emb[:,1], s=20, hue='clusters', palette='tab10', alpha=0.5, legend=False) # # plt.scatter(emb[:,0],emb[:,1], s=1, c=color, cmap='tab10', alpha=0.1) # plt.scatter(emb[:,0],emb[:,1], s=1, alpha=0.1) # plt.title(feature) # plt.xticks([]) # plt.yticks([]) ax = plt.subplot() emb = adata.obsm[embedding] embedding_df = pd.DataFrame(adata.obsm[embedding], index=adata.obs_names, columns=['dim_0', 'dim_1']) # Calculate the range for the ticks x_min, x_max = embedding_df['dim_0'].min(), embedding_df['dim_0'].max() y_min, y_max = embedding_df['dim_1'].min(), embedding_df['dim_1'].max() x_mid = (x_min + x_max) / 2 y_mid = (y_min + y_max) / 2 # Determine the common range common_min = min(x_min, y_min) common_max = max(x_max, y_max) x_range = common_max - common_min y_range = common_max - common_min # color = np.array(adata.obs['leiden']).astype(int) # plt.scatter(emb[:,0],emb[:,1], s=1, c=color, cmap='Set2', alpha=0.1) plt.scatter(emb[:,0],emb[:,1], s=1, alpha=0.1) plt.title(f'{vkey}') # plt.xlim(x_mid-x_range/2, x_mid+x_range/2) # plt.ylim(y_mid-y_range/2, y_mid+y_range/2) plt.xticks([]) plt.yticks([]) # check if 'clusters' is in adata.obs if 'clusters' in adata.obs: sc.pl.embedding(adata, embedding, color='clusters', projection='2d', size=20, ax=ax, show=False) if len(plot_within_cluster) > 0: grid_in_cluster = grid_within_cluster(X_grid) plt.quiver(X_grid[grid_in_cluster,0], X_grid[grid_in_cluster,1],V_grid[grid_in_cluster,0],V_grid[grid_in_cluster,1],color='black',alpha=1) else: plt.quiver(X_grid[:,0], X_grid[:,1],V_grid[:,0],V_grid[:,1],color='black',alpha=1,scale=2) plt.xlabel(vkey) # save the plot plt.show() plt.clf() del adata.obsm[vkey] # plt.savefig(f'./data/flow/gene_{feature}.pdf') if output_v_grid: return V_grid
[docs] def plot_multiple_features_by_all_cells( adata:AnnData, features=[], embedding='X_featmap', cluster_key='clusters', plot_within_cluster=[], pseudotime_adjusted=False, pseudotime='dpt_pseudotime', trend='positive', ratio=0.2, density=1, smooth=0.5, n_neighbors=None, min_mass=1, autoscale=True, output_v_grid=False, scale_by_variation=True,): """ Plot a given feature (e.g., gene) in two dimensional visualization Parameters ---------- adata : AnnData An annotated data matrix. feature : string Feature name to be plotted. embedding : string Embedding background for feature plot. The default is 'X_featmap'. cluster_key : string Cluster name indicator. The default is 'clusters'. plot_within_cluster : list A list of clusters in which the feaure is to plot. The default is []. pseudotime_adjusted : bool Whether to adjust the feature direction by pseudotime. The default is False. pseudotime : string Pseudotime indicator. The default is 'dpt_pseudotime'. trend : string of {'positive','negative'} The direction along pseudotime. The default is 'positive'. ratio : float Filtering ratio by expression to filter varition by low expression. The default is 0.5. density : float Grid desity for plot. The default is 1. smooth : float For kde estimation. The default is 0.5. n_neighbors : int Number of neighbours for kde. The default is None. min_mass : float Minumum denstiy to show the grid plot. The default is 1. autoscale : bool Scale the arrow plot. The default is True. """ colors = sns.color_palette('tab10', n_colors=len(features)) legend_patches = [] v_grid_collection = [] for i, feature in enumerate(features): # Project the feature to the gauge embedding feature_projection(adata, feature=feature) vkey=f'feature_{feature}_loading' feature_id = np.where(adata.var_names == feature)[0][0] # adata.obsm[vkey] = adata.obsm[feature_loading_emb][:,:,feature_id] # Set grid as the support X_emb=adata.obsm[embedding] V_emb=adata.obsm[vkey] idx_valid = np.isfinite(X_emb.sum(1) + V_emb.sum(1)) X_emb = X_emb[idx_valid] V_emb = V_emb[idx_valid] # prepare grid n_obs, n_dim = X_emb.shape density = 1 if density is None else density smooth = 0.5 if smooth is None else smooth # grs = [] # for dim_i in range(n_dim): # m, M = np.min(X_emb[:, dim_i]), np.max(X_emb[:, dim_i]) # m = m - 0.01 * np.abs(M - m) # M = M + 0.01 * np.abs(M - m) # gr = np.linspace(m, M, int(50 * density)) # grs.append(gr) # print('grs:', grs) # set seed # np.random.seed(0) # ind = np.random.choice(X_emb.shape[0], int(X_emb.shape[0] * density), replace=False) grs = [X_emb[:,0], X_emb[:,1]] # meshes_tuple = np.meshgrid(*grs) # X_grid = np.vstack([i.flat for i in meshes_tuple]).T X_grid = X_emb[:, :] # p1, _, _, _, C = s._kernel_density_estimate_anisotropic( # X_grid, rotational_matrix, r_emb) # p1, _, _, _ = s._kernel_density_estimate(X_grid) # p1 = kernel_density_estimate(X_emb, X_grid) # estimate grid variation if n_neighbors is None: n_neighbors = int(n_obs / 50) nn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=-1) nn.fit(X_emb) dists, neighs = nn.kneighbors(X_grid) scale = np.mean([(g[1] - g[0]) for g in grs]) * smooth scale = np.abs(scale) weight = normal.pdf(x=dists, scale=scale) p_mass = weight.sum(1) # p_mass = p1 V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) # V_grid = V_emb[neighs] V_grid /= np.maximum(1, p_mass)[:, None] if min_mass is None: min_mass = 1 # V_grid /= np.max(np.linalg.norm(V_grid, axis=1)) # Restrict the plot within given clusters def grid_within_cluster(X_grid): nn = NearestNeighbors(n_neighbors=1, n_jobs=-1) nn.fit(X_emb) _, neighs = nn.kneighbors(X_grid) # plot_within_cluster = ['Beta'] if len(plot_within_cluster) > 0: grid_in_cluster = [] for cluster in plot_within_cluster: idx_in_cluster = np.where(np.array(adata.obs[cluster_key] == cluster))[0] for i in range(neighs.shape[0]): if neighs[i,0] in idx_in_cluster: grid_in_cluster.append(i) return grid_in_cluster # start ploting feature feature_id = np.where(adata.var_names == feature)[0][0] # average expression in grid points over NNs expr_grid = [] if isinstance(adata.X, np.ndarray): expr_count = adata.X.copy()[:,feature_id] else: expr_count = adata.X.toarray().copy()[:,feature_id] expr_grid = (expr_count[neighs] * weight).sum(1) expr_grid /= np.maximum(1, p_mass) # print('V_grid_1:', V_grid) # Filter the expr_velo by low expression threshold = max(expr_grid) * ratio # feature_velo_loading = pc_loadings_grid[:,:,feature_id] V_grid[expr_grid<threshold]=np.nan # min_mass *= np.percentile(p_mass, 99) / 100 # min_mass = 0.01 # X_grid, V_grid = X_grid[p_mass > min_mass], V_grid[p_mass > min_mass] if autoscale: V_grid /= 1* quiver_autoscale(X_grid, V_grid) # Adjust the v direction by the sign of local expression change # V_grid = V_grid * 10 displace_grid = X_grid + V_grid grid_idx = np.unique(np.where(np.isnan(displace_grid) == False)[0]) if grid_idx.shape[0] != 0: _, displace_grid_neighs = nn.kneighbors(displace_grid[grid_idx]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx]) displace_expr = np.mean(expr_count[displace_grid_neighs[:,:100]], axis=1) - np.mean(expr_count[start_grid_neighs[:,:100]],axis=1) displace_expr_sign = np.sign(displace_expr) # displace_expr_sign[displace_expr_sign == 0] = 1 V_grid[grid_idx] = np.multiply(V_grid[grid_idx], displace_expr_sign[:, np.newaxis]) # Keep arrows along the positive (negative) trend of time flow if pseudotime_adjusted: time_ = np.array(adata.obs[pseudotime]) displace_grid_adjusted = X_grid + V_grid grid_idx_adjusted = np.unique(np.where(np.isnan(displace_grid_adjusted) == False)[0]) _, displace_grid_neighs = nn.kneighbors(displace_grid_adjusted[grid_idx_adjusted]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx_adjusted]) displace_time = np.mean(time_[displace_grid_neighs[:,:100]], axis=1) - np.mean(time_[start_grid_neighs[:,:100]],axis=1) displace_time_sign = np.sign(displace_time) if trend == 'positive': displace_time_sign[displace_time_sign < 0] = 0 else: displace_time_sign[displace_time_sign > 0] = 0 displace_time_sign[displace_time_sign < 0] = 1 V_grid[grid_idx_adjusted] = np.multiply(V_grid[grid_idx_adjusted], displace_time_sign[:, np.newaxis]) if scale_by_variation: feature_variation_one_feature(adata, feature=feature) variation_feature = adata.obsm[f'variation_feature_{feature}'] V_grid *= variation_feature[:, np.newaxis] v_grid_collection.append(V_grid) ax = plt.subplot() emb = adata.obsm[embedding] embedding_df = pd.DataFrame(adata.obsm[embedding], index=adata.obs_names, columns=['dim_0', 'dim_1']) # Calculate the range for the ticks x_min, x_max = embedding_df['dim_0'].min(), embedding_df['dim_0'].max() y_min, y_max = embedding_df['dim_1'].min(), embedding_df['dim_1'].max() x_mid = (x_min + x_max) / 2 y_mid = (y_min + y_max) / 2 # Determine the common range common_min = min(x_min, y_min) common_max = max(x_max, y_max) x_range = common_max - common_min y_range = common_max - common_min # color = np.array(adata.obs['leiden']).astype(int) # plt.scatter(emb[:,0],emb[:,1], s=1, c=color, cmap='Set2', alpha=0.1) plt.scatter(emb[:,0],emb[:,1], s=1, alpha=0.1, c='grey') plt.title('Feature projection') plt.xlim(x_mid-x_range/2, x_mid+x_range/2) plt.ylim(y_mid-y_range/2, y_mid+y_range/2) plt.xticks([]) plt.yticks([]) # check if 'clusters' is in adata.obs if 'clusters' in adata.obs: sc.pl.embedding(adata, embedding, color='clusters', projection='2d', size=20, ax=ax, show=False) if len(plot_within_cluster) > 0: grid_in_cluster = grid_within_cluster(X_grid) ax.quiver(X_grid[grid_in_cluster,0], X_grid[grid_in_cluster,1],V_grid[grid_in_cluster,0],V_grid[grid_in_cluster,1],color=colors[i],alpha=1-0.2*i, width=0.002) # ax.quiverkey(q, X=0.9, Y=0.9-0.1*i, U=0.01, label=feature, labelpos='E',) else: ax.quiver(X_grid[:,0], X_grid[:,1],V_grid[:,0],V_grid[:,1],color=colors[i],alpha=1,scale=1) # ax.quiverkey(q, X=0.9, Y=0.9-0.1*i, U=0.01, label=feature, labelpos='E',) # plt.xlabel(vkey) # save the plot # plt.show() # plt.clf() del adata.obsm[vkey] import matplotlib.patches as mpatches # Create a patch for the current feature to add to the legend patch = mpatches.Patch(color=colors[i], label=feature) legend_patches.append(patch) # plt.savefig(f'./data/flow/gene_{feature}.pdf') # Add the legend to the plot plt.legend(handles=legend_patches, loc='upper right') plt.show() return v_grid_collection
[docs] def plot_multiple_features_by_all_cells_given_clusters( adata:AnnData, features=[], embedding='X_featmap', cluster_key='clusters', plot_within_cluster=[], pseudotime_adjusted=True, pseudotime='feat_pseudotime', trend='positive', ratio=0.2, density=1, smooth=0.5, n_neighbors=None, min_mass=1, autoscale=True, scale_by_variation=True, feature_key='', scale_quiver=1.0,): """ Plot a given feature (e.g., gene) in two dimensional visualization Parameters ---------- adata : AnnData An annotated data matrix. feature : string Feature name to be plotted. embedding : string Embedding background for feature plot. The default is 'X_featmap'. cluster_key : string Cluster name indicator. The default is 'clusters'. plot_within_cluster : list A list of clusters in which the feaure is to plot. The default is []. pseudotime_adjusted : bool Whether to adjust the feature direction by pseudotime. The default is False. pseudotime : string Pseudotime indicator. The default is 'dpt_pseudotime'. trend : string of {'positive','negative'} The direction along pseudotime. The default is 'positive'. ratio : float Filtering ratio by expression to filter varition by low expression. The default is 0.5. density : float Grid desity for plot. The default is 1. smooth : float For kde estimation. The default is 0.5. n_neighbors : int Number of neighbours for kde. The default is None. min_mass : float Minumum denstiy to show the grid plot. The default is 1. autoscale : bool Scale the arrow plot. The default is True. """ legend_patches = [] v_grid_collection = [] X_emb=adata.obsm[embedding] X_grid = X_emb[:, :] # Restrict the plot within given clusters def grid_within_cluster(X_grid): nn = NearestNeighbors(n_neighbors=1, n_jobs=-1) nn.fit(X_emb) _, neighs = nn.kneighbors(X_grid) # plot_within_cluster = ['Beta'] if len(plot_within_cluster) > 0: grid_in_cluster = [] for cluster in plot_within_cluster: idx_in_cluster = np.where(np.array(adata.obs[cluster_key] == cluster))[0] for i in range(neighs.shape[0]): if neighs[i,0] in idx_in_cluster: grid_in_cluster.append(i) return grid_in_cluster ax = plt.subplot() if len(plot_within_cluster) > 0: grid_in_cluster = grid_within_cluster(X_grid) emb = adata.obsm[embedding][grid_in_cluster,:] embedding_df = pd.DataFrame(emb, index=adata.obs_names[grid_in_cluster], columns=['dim_0', 'dim_1']) # Calculate the range for the ticks x_min, x_max = embedding_df['dim_0'].min(), embedding_df['dim_0'].max() y_min, y_max = embedding_df['dim_1'].min(), embedding_df['dim_1'].max() x_mid = (x_min + x_max) / 2 y_mid = (y_min + y_max) / 2 # Determine the common range common_min = min(x_min, y_min) common_max = max(x_max, y_max) # x_range = common_max - common_min # y_range = common_max - common_min x_range = x_max - x_min y_range = y_max - y_min # cluster labels # scatter color by cluster color = np.array(adata.obs[cluster_key][grid_in_cluster]).astype(str) # Define a colormap cmap = plt.get_cmap('Pastel1') # 'tab10' is a good colormap for categorical data unique_clusters = np.unique(color) colors = [cmap(i) for i in range(len(unique_clusters))] # Create a dictionary to map clusters to colors cluster_color_map = {cluster: colors[i] for i, cluster in enumerate(unique_clusters)} # Plot the scatter plot for cluster in unique_clusters: idx = np.where(color == cluster) plt.scatter(emb[idx, 0], emb[idx, 1], s=100, alpha=0.5, c=[cluster_color_map[cluster]], label=f'Cluster {cluster}', marker='.') plt.title('Feature projection') plt.xlim(x_mid-x_range/2, x_mid+x_range/2) plt.ylim(y_mid-y_range/2, y_mid+y_range/2) plt.xticks([]) plt.yticks([]) for i, feature in enumerate(features): # Project the feature to the gauge embedding feature_projection(adata, feature=feature) # feature_gradient(adata, feature=feature) # feature_gradient_projection(adata, feature=feature) vkey=f'feature_{feature}_loading' feature_id = np.where(adata.var_names == feature)[0][0] # adata.obsm[vkey] = adata.obsm[feature_loading_emb][:,:,feature_id] # Set grid as the support X_emb=adata.obsm[embedding] V_emb=adata.obsm[vkey] idx_valid = np.isfinite(X_emb.sum(1) + V_emb.sum(1)) X_emb = X_emb[idx_valid] V_emb = V_emb[idx_valid] # prepare grid n_obs, n_dim = X_emb.shape density = 1 if density is None else density smooth = 0.5 if smooth is None else smooth # grs = [] # for dim_i in range(n_dim): # m, M = np.min(X_emb[:, dim_i]), np.max(X_emb[:, dim_i]) # m = m - 0.01 * np.abs(M - m) # M = M + 0.01 * np.abs(M - m) # gr = np.linspace(m, M, int(50 * density)) # grs.append(gr) # print('grs:', grs) # set seed # np.random.seed(0) # ind = np.random.choice(X_emb.shape[0], int(X_emb.shape[0] * density), replace=False) grs = [X_emb[:,0], X_emb[:,1]] # meshes_tuple = np.meshgrid(*grs) # X_grid = np.vstack([i.flat for i in meshes_tuple]).T X_grid = X_emb[:, :] # p1, _, _, _, C = s._kernel_density_estimate_anisotropic( # X_grid, rotational_matrix, r_emb) # p1, _, _, _ = s._kernel_density_estimate(X_grid) # p1 = kernel_density_estimate(X_emb, X_grid) # estimate grid variation if n_neighbors is None: n_neighbors = int(n_obs / 50) nn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=-1) nn.fit(X_emb) dists, neighs = nn.kneighbors(X_grid) scale = np.mean([(g[1] - g[0]) for g in grs]) * smooth scale = np.abs(scale) weight = normal.pdf(x=dists, scale=scale) p_mass = weight.sum(1) # p_mass = p1 V_grid = (V_emb[neighs] * weight[:, :, None]).sum(1) # V_grid = V_emb[neighs] V_grid /= np.maximum(1, p_mass)[:, None] if min_mass is None: min_mass = 1 # V_grid /= np.max(np.linalg.norm(V_grid, axis=1)) # start ploting feature feature_id = np.where(adata.var_names == feature)[0][0] # average expression in grid points over NNs expr_grid = [] if isinstance(adata.X, np.ndarray): expr_count = adata.X.copy()[:,feature_id] else: expr_count = adata.X.toarray().copy()[:,feature_id] expr_grid = (expr_count[neighs] * weight).sum(1) expr_grid /= np.maximum(1, p_mass) # print('V_grid_1:', V_grid) # Filter the expr_velo by low expression threshold = max(expr_grid) * ratio # feature_velo_loading = pc_loadings_grid[:,:,feature_id] V_grid[expr_grid<threshold]=np.nan # min_mass *= np.percentile(p_mass, 99) / 100 # min_mass = 0.01 # X_grid, V_grid = X_grid[p_mass > min_mass], V_grid[p_mass > min_mass] # Adjust the v direction by the sign of local expression change # V_grid = V_grid * 10 displace_grid = X_grid + V_grid grid_idx = np.unique(np.where(np.isnan(displace_grid) == False)[0]) if grid_idx.shape[0] != 0: _, displace_grid_neighs = nn.kneighbors(displace_grid[grid_idx]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx]) displace_expr = np.mean(expr_count[displace_grid_neighs[:,:100]], axis=1) - np.mean(expr_count[start_grid_neighs[:,:100]],axis=1) displace_expr_sign = np.sign(displace_expr) # displace_expr_sign[displace_expr_sign == 0] = 1 V_grid[grid_idx] = np.multiply(V_grid[grid_idx], displace_expr_sign[:, np.newaxis]) # Keep arrows along the positive (negative) trend of time flow if pseudotime_adjusted: time_ = np.array(adata.obs[pseudotime]) displace_grid_adjusted = X_grid + V_grid grid_idx_adjusted = np.unique(np.where(np.isnan(displace_grid_adjusted) == False)[0]) _, displace_grid_neighs = nn.kneighbors(displace_grid_adjusted[grid_idx_adjusted]) _, start_grid_neighs = nn.kneighbors(X_grid[grid_idx_adjusted]) displace_time = np.mean(time_[displace_grid_neighs[:,:100]], axis=1) - np.mean(time_[start_grid_neighs[:,:100]],axis=1) displace_time_sign = np.sign(displace_time) if trend == 'positive': displace_time_sign[displace_time_sign < 0] = 0 else: displace_time_sign[displace_time_sign > 0] = 0 displace_time_sign[displace_time_sign < 0] = 1 V_grid[grid_idx_adjusted] = np.multiply(V_grid[grid_idx_adjusted], displace_time_sign[:, np.newaxis]) if scale_by_variation: feature_variation_one_feature(adata, feature=feature, feature_key=feature_key) variation_feature = adata.obsm[f'variation_feature_{feature}'] V_grid *= variation_feature[:, np.newaxis] if autoscale: V_grid /= scale_quiver * quiver_autoscale(X_grid, V_grid) v_grid_collection.append(V_grid) colors = sns.color_palette('tab10', n_colors=len(features)) if len(plot_within_cluster) > 0: grid_in_cluster = grid_within_cluster(X_grid) ax.quiver(X_grid[grid_in_cluster,0], X_grid[grid_in_cluster,1],V_grid[grid_in_cluster,0],V_grid[grid_in_cluster,1],color=colors[i],alpha=1, width=0.004, scale=scale_quiver) # ax.quiverkey(q, X=0.9, Y=0.9-0.1*i, U=0.01, label=feature, labelpos='E',) else: ax.quiver(X_grid[:,0], X_grid[:,1],V_grid[:,0],V_grid[:,1],color=colors[i],alpha=1,scale=0.1) # ax.quiverkey(q, X=0.9, Y=0.9-0.1*i, U=0.01, label=feature, labelpos='E',) # plt.xlabel(vkey) # save the plot # plt.show() # plt.clf() del adata.obsm[vkey] import matplotlib.patches as mpatches # Create a patch for the current feature to add to the legend patch = mpatches.Patch(color=colors[i], label=feature) legend_patches.append(patch) # plt.savefig(f'./data/flow/gene_{feature}.pdf') # Add the legend to the plot # plt.legend(handles=legend_patches, loc='upper right') # add legend for cluster outside the plot plt.legend(handles=[mpatches.Patch(color=cluster_color_map[cluster], label=f'Cluster {cluster}') for cluster in unique_clusters], loc='upper right', bbox_to_anchor=(1.5, 1)) plt.show()
[docs] def variation_feature_pp(adata): """ Preprocess the variation feature for DGV analysis Parameters ---------- adata : AnnData An annotated data matrix. """ import anndata as ad layer = 'variation_feature' adata_var = ad.AnnData(X=adata.layers[layer].copy(), ) adata_var.obs = adata.obs.copy() adata_var.var = adata.var.copy() adata_var.layers['counts'] = adata.X.copy() adata_var.X[np.isnan(adata_var.X)]=0 adata_var.obs_names = adata.obs_names adata_var.var_names = adata.var_names adata_var.obs['clusters'] = adata.obs['clusters'].copy() adata_var.layers['counts'] = adata.X.copy() # Normalization sc.pp.normalize_total(adata_var, target_sum=1e4 ) sc.pp.log1p(adata_var, ) # Filtering variation for DGV adata_var.layers['var_filter'] = adata_var.X.copy() # Filter low variation idx = adata_var.layers['var_filter'] < np.max(adata_var.layers['var_filter']) * 0.2 # idx = adata_var.layers['var_filter'] < np.quantile(adata_var.layers['var_filter'], 0.2) # print(f'Low var ratio is {np.sum(idx) / (idx.shape[0]*idx.shape[1])}') adata_var.layers['var_filter'][idx] = 0 # Filter variation by low count if isinstance(adata.X, np.ndarray): idx = adata.X < np.max(adata.X) * 0.2 else: idx = adata.X.toarray() < np.max(adata.X.toarray()) * 0.2 # idx = adata.X.toarray() < np.quantile(adata.X.toarray()[np.nonzero(adata.X.toarray())], 0.2) # idx = adata.X < np.max(adata.X) * 0.2 # print(f'Low var ratio by expression is {np.sum(idx) / (idx.shape[0]*idx.shape[1])}') adata_var.layers['var_filter'][idx] = 0 # Normalization sc.pp.normalize_total(adata_var, target_sum=1e4, layer='var_filter' ) sc.pp.log1p(adata_var, layer='var_filter') return adata_var
[docs] def feature_variation_embedding( adata, n_components=2, layer = 'variation_feature', variation_preprocess_flag=False, random_state=42 ): """ Compute the feature variation embedding based on all features based on local SVD. Parameters ---------- adata : AnnData An annotated data matrix. n_components : int Number of components for embedding. The default is 2. layer : string Layer for variation feature. The default is 'variation_feature'. variation_preprocess_flag : bool Whether to preprocess the variation feature. The default is False. random_state : int Random state. The default is 42. Returns ------- adata_var : AnnData Annotated data matrix with variation matrix and variation embedding. """ adata_var = ad.AnnData(X=adata.layers[layer].copy(), ) adata_var.X[np.isnan(adata_var.X)]=0 adata_var.obs_names = adata.obs_names adata_var.var_names = adata.var_names adata_var.obs['clusters'] = adata.obs['clusters'].copy() adata_var.layers['counts'] = adata.X.copy() # Normalization # sc.pl.highest_expr_genes(adata_var, n_top=20,) sc.pp.normalize_total(adata_var, target_sum=1e4 ) sc.pp.log1p(adata_var, ) if variation_preprocess_flag: # Filtering variation for DGV adata_var.layers['var_filter'] = adata_var.X.copy() # Filter low variation idx = adata_var.layers['var_filter'] < np.max(adata_var.layers['var_filter']) * 0.2 # idx = adata_var.layers['var_filter'] < np.quantile(adata_var.layers['var_filter'], 0.2) # print(f'Low var ratio is {np.sum(idx) / (idx.shape[0]*idx.shape[1])}') adata_var.layers['var_filter'][idx] = 0 # Filter variation by low count if isinstance(adata.X, np.ndarray): idx = adata.X < np.max(adata.X) * 0.2 else: idx = adata.X.toarray() < np.max(adata.X.toarray()) * 0.2 # idx = adata.X.toarray() < np.quantile(adata.X.toarray()[np.nonzero(adata.X.toarray())], 0.2) # idx = adata.X < np.max(adata.X) * 0.2 # print(f'Low var ratio by expression is {np.sum(idx) / (idx.shape[0]*idx.shape[1])}') adata_var.layers['var_filter'][idx] = 0 # Normalization sc.pp.normalize_total(adata_var, target_sum=1e4, layer='var_filter' ) sc.pp.log1p(adata_var, layer='var_filter') # Variation embedding data_original = adata_var.X.copy() data_original[np.isnan(data_original)] = 0 print('Start PCA') # PCA by svd import scipy u, s, vh = scipy.sparse.linalg.svds( data_original, k= min(data_original.shape[1]-1, 100), which='LM', random_state=42) # u, s, vh = scipy.linalg.svd(gene_val_norm, full_matrices=False) # PCA coordinates in first 100 dims emb_svd = np.matmul(u, np.diag(s)) print('Start embedding') import umap emb_umap = umap.UMAP(random_state=random_state, n_neighbors=30,min_dist=0.5, spread=1, n_components=n_components).fit(emb_svd) adata_var.obsm['X_featmap_v'] = emb_umap.embedding_ sc.pl.embedding(adata_var, 'featmap_v', legend_fontsize=10,color=['clusters'], projection='2d', size=20, ) # sc.pl.embedding(adata_var, 'umap_v', legend_fontsize=10,color=['clusters_original'], projection='2d', size=20, ) adata.obsm['X_featmap_v'] = adata_var.obsm['X_featmap_v'] return adata_var
[docs] def featuremap_var_3d(emb_var_3d, color=None, symbol=None, marker_size=3): """ Plot the feature variation embedding in 3D. Parameters ---------- emb_var_3d : np.ndarray 3D embedding of feature variation. color : string Color indicator. The default is None. symbol : string Symbol indicator. The default is None. marker_size : int Marker size. The default is 3. """ import plotly # importlib.reload(nbformat) import plotly.express as px fig_3d = px.scatter_3d(emb_var_3d, x=0, y=1, z=2, color=color, symbol=symbol) if marker_size is None: marker_size = 120000 / emb_var_3d.shape[0] fig_3d.update_traces(marker_size=marker_size) # Modify the point size fig_3d.update_layout(autosize=False, width=500,height=500,) fig_3d.show()