Source code for lib5c.algorithms.clustering.util

"""
Module containing utility functions for clustering 5C interactions.
"""

import numpy as np

from lib5c.util.parallelization import parallelize_regions


[docs]def clusters_to_array(clusters, size): """ Assembles clusters into a 2-D array for plotting on a heatmap. Parameters ---------- clusters : list of clusters The clusters to be converted to a 2-D array. size : int The height and width of the array to generate. This should be equal to the number of bins in the region. Returns ------- 2-D array A 2-D array with each clusters having been assigned a different sequential integer value for all its pixels. The next consecutive integer is a gap value, and then the one after that is a default value for all pixels not in a cluster. Notes ----- It is recommended to plot the resulting array using a rapidly-changing colorscale such as plt.get_cmap('gist_ncar'). """ num_clusters = len(clusters) array = np.zeros([size, size]) + num_clusters + 2 for i in range(num_clusters): for peak in clusters[i]: array[peak['x']][peak['y']] = i array[peak['y']][peak['x']] = i return array
[docs]def get_vector(peak): """ Gets an array representing a peak's location. Parameters ---------- peak : peak Returns ------- 1D numpy array of length 2 The peak's location as an (x, y) ordered pair. """ return np.array([peak['x'], peak['y']])
[docs]def center_of_mass(cluster): """ Computes the center of mass, or centroid, of a cluster. Parameters ---------- cluster : cluster The cluster to consider. Returns ------- 1D numpy array of length 2 The centroid of the cluster. Notes ----- For the purpose of this calculation, the mass of a peak is taken to be its value. """ m = 0 vector_sum = np.array([0, 0]) for peak in cluster: m += peak['value'] vector_sum += peak['value'] * get_vector(peak) return vector_sum / m
[docs]def belongs_to_which(peak, clusters): """ Identifies which cluster out of a list of clusters, if any, a peak belongs to. Parameters ---------- peak : peak The query peak to consider. clusters : list of clusters The clusters to look for the query peak in. Returns ------- int The index of the cluster within the list of clusters which contains the query peak, or -1 if no cluster in the list of clusters contains the query peak. """ return get_cluster(peak['x'], peak['y'], clusters)
[docs]def belongs_to(peak, cluster): """ Checks if a peak belongs to a cluster. Parameters ---------- peak : peak The query peak. cluster : cluster The cluster to search for it in. Returns ------- bool True if peak belongs to cluster, False otherwise. """ return belongs_to_which(peak, [cluster]) == 0
[docs]def get_cluster(x, y, clusters): """ Identifies which cluster, if any, a specified point belongs to. Parameters ---------- x : int x-coordinate of the point to consider. y : int y-coordinate of the point to consider. clusters : list of clusters List of clusters to search. Returns ------- int The index of the cluster which contains the point (x,y). If no cluster in the list contains the point, the value is -1. """ for i in range(len(clusters)): for peak in clusters[i]: if peak['x'] == x and peak['y'] == y: return i return -1
[docs]def ident(peak1, peak2): """ Checks whether two peaks are the same peak. Parameters ---------- peak1 : peak peak2 : peak Returns ------- bool True if the peaks are the same peak, False otherwise. """ return peak1['x'] == peak2['x'] and peak1['y'] == peak2['y']
[docs]def identify_nearby_clusters(cluster, clusters): """ Figures out which other clusters from a list of clusters are adjacent to a query cluster. Parameters ---------- cluster : cluster The query cluster to consider. clusters : list of clusters The clusters to check for adjacency to the query cluster. Returns ------- list of int The indices of clusters within the list of clusters that were found to be adjacent to the query cluster. Notes ----- This may include duplicates. To get rid of them, just use:: set(identify_nearby_clusters(cluster, clusters)) To identify one nearby cluster at random, use:: nearby_clusters = identify_nearby_clusters(cluster, clusters) if nearby_clusters: nearby_clusters[0] To see what clusters are near a single peak, use:: identify_nearby_clusters([peak], clusters) If cluster is in clusters, the cluster will be reported as adjacent to itself. As an example of how to avoid this in cases where it is undesirable, use:: filter(lambda x: x > 0, identify_nearby_clusters(clusters[0], clusters)) """ nearby = [] threshold = -1 for peak in cluster: up = get_cluster(peak['x'], peak['y'] + 1, clusters) if up > threshold: nearby.append(up) down = get_cluster(peak['x'], peak['y'] - 1, clusters) if down > threshold: nearby.append(down) left = get_cluster(peak['x'] - 1, peak['y'], clusters) if left > threshold: nearby.append(left) right = get_cluster(peak['x'] + 1, peak['y'], clusters) if right > threshold: nearby.append(right) ne = get_cluster(peak['x'] + 1, peak['y'] + 1, clusters) if ne > threshold: nearby.append(ne) se = get_cluster(peak['x'] + 1, peak['y'] - 1, clusters) if se > threshold: nearby.append(se) nw = get_cluster(peak['x'] - 1, peak['y'] + 1, clusters) if nw > threshold: nearby.append(nw) sw = get_cluster(peak['x'] - 1, peak['y'] - 1, clusters) if sw > threshold: nearby.append(sw) return nearby
[docs]def merge_clusters(clusters, merge_to_which): """ Recursively merges clusters together from smallest to largest according to a specified merge function. Parameters ---------- clusters : list of clusters The clusters to be merged. All elements will be removed from this list when this function is called. merge_to_which : function Function that takes in a list of clusters and returns the index of the cluster the first cluster in the list should be merged into. If the first cluster in the list should not be merged, this function should return -1. Returns ------- list of clusters The list of merged clusters. """ # list to store merged clusters merged_clusters = [] while clusters: # sort the clusters from smallest to largest clusters.sort(key=lambda x: len(x), reverse=False) # attempt to merge this cluster into another cluster by adjacency merge_to = merge_to_which(clusters) if merge_to == -1: # this one didn't merge merged_clusters.append(clusters.pop(0)) else: # merge this cluster into the chosen cluster clusters[merge_to].extend(clusters[0]) clusters.pop(0) return merged_clusters
[docs]@parallelize_regions def reshape_cluster_array_to_dict(cluster_array, ignored_values=None): r""" Reshapes loops dict structure into a nested dict structure. Parameters ---------- cluster_array: np.ndarray The entries of this array are cluster ID's. Values that will be ignored include '', 'n.s.', 'NA', 'NaN', np.nan. ignored_values: set, optional Set of values in cluster_array that should not be treated as cluster ID's. By default this will be {'', 'n.s.', 'NA', 'NaN', np.nan} Returns -------- Dict[Any, List[Dict[str, Any]]] The outer dict's keys are cluster ID's, its values are lists of points belonging to that cluster, with the points being provided as dicts with the following strucure:: { 'x': int, 'y': int, 'value': 0 } Notes ----- To rectangularize the returned data structure against a full list of cluster ID's, use something like:: cluster_dict = reshape_cluster_array_to_dict(cluster_array) for cluster_id in all_cluster_ids: if cluster_id not in cluster_dict: cluster_dict[cluster_id] = [] Examples -------- >>> import numpy as np >>> cluster_array = np.array([['', 'cow'], ['cow', 'grass']]) >>> reshape_cluster_array_to_dict(cluster_array) == \ ... {'grass': [{'x': 1, 'y': 1, 'value': 0}], ... 'cow': [{'x': 0, 'y': 1, 'value': 0}, ... {'x': 1, 'y': 0, 'value': 0}]} True """ # resolve ignored_values if ignored_values is None: ignored_values = {'', 'n.s.', 'NA', 'NaN', np.nan} # identify non-ignored cluster id's cluster_ids = set(np.unique(cluster_array)) - ignored_values # prepare data structure cluster_dict = {cluster_id: [] for cluster_id in cluster_ids} # fill in data structure for i in range(cluster_array.shape[0]): for j in range(cluster_array.shape[1]): cluster_id = cluster_array[i, j] if cluster_id in cluster_ids: cluster_dict[cluster_id].append({'x': i, 'y': j, 'value': 0}) return cluster_dict
[docs]def flatten_clusters(clusters): """ Flattens a list of clusters to a flat list of peaks. Parameters ---------- clusters : list of list of peaks The clusters to flatten. Returns ------- list of peaks The flattened peaks. """ return [peak for cluster in clusters for peak in cluster]
[docs]def peaks_to_array_index(peaks, shape): """ Convert a sparse list of peaks to a dense boolean array. Parameters ---------- peaks : list of peaks The peaks to convert. shape : tuple of int The shape of the resulting array. Returns ------- np.ndarray The dense boolean array. """ idx = np.zeros(shape, dtype=bool) for peak in peaks: idx[peak['x'], peak['y']] = True idx[peak['y'], peak['x']] = True return idx
[docs]def array_index_to_peaks(idx): """ Convert a dense boolean array to a sparse list of peaks. Parameters ---------- idx : np.ndarray Boolean array to convert. Returns ------- list of peaks The peaks. """ return [{'x': i, 'y': j, 'value': 0} for i in range(idx.shape[0]) for j in range(i+1) if idx[i, j]]