Source code for pyoints.clustering

# BEGIN OF LICENSE NOTE
# This file is part of Pyoints.
# Copyright (c) 2018, Sebastian Lamprecht, Trier University,
# lamprecht@uni-trier.de
#
# Pyoints is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyoints is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Pyoints. If not, see <https://www.gnu.org/licenses/>.
# END OF LICENSE NOTE
"""Clustering algorithms to assign group points.
"""

import numpy as np
from numbers import Number
from collections import defaultdict
from sklearn.cluster import DBSCAN

from . import (
    assertion,
    classification,
)
from .indexkd import IndexKD

from .misc import print_rounded


[docs]def clustering(indexKD, r, get_class, order=None, clusters=None, auto_set=True): """Generic clustering based on spatial neighbourhood. Parameters ---------- indexKD : IndexKD Spatial index with `n` points. r : positive float Radius to identify the cluster affiliation of neighbored points. get_class : callable Function to define the cluster id (affiliation) of a point. It receives a list of cluster ids of neigboured points to define the cluster id of selected point. It returns -1 if the point is not associated with any cluster. order : optional, array_like(int) Defines the order to apply the clustering algorithm. It can also be used to subsample points for clustering. If None, the order is defined by decreasing point density. clusters : optional, array_like(int, shape=(n)) List of `n` integers. Each element represents the preliminary cluster id of a point in `indexKD`. A cluster id of `-1` represents no class. auto_set : optional, bool Defines whether or not a cluster id is set automatically if -1 (no class) was returned by `get_class`. If True, a new cluster id is set to `max(clusters) + 1`. Returns ------- dict Dictionary of clusters. The keys correspond to the class ids. The values correspond to the point indices associated with the cluster. """ if not isinstance(indexKD, IndexKD): raise TypeError("'indexKD' needs to be of type 'IndexKD'") if not (assertion.isnumeric(r) and r > 0): raise ValueError("'r' needs to be a number greater zero") if not callable(get_class): raise TypeError("'get_class' needs to be callable") if order is None: # order by density count = indexKD.ball_count(r) order = np.argsort(count)[::-1] else: order = assertion.ensure_numvector(order, max_length=len(indexKD)) if clusters is None: out_clusters = -np.ones(len(indexKD), dtype=int) else: out_clusters = assertion.ensure_numvector( clusters, min_length=len(indexKD), max_length=len(indexKD) ) if not isinstance(auto_set, bool): raise TypeError("'auto_set' needs to be of type boolean") nextId = out_clusters.max() + 1 coords = indexKD.coords # calculate spatial neighborhood nIdsIter = indexKD.ball_iter(coords[order, :], r) for pId, nIds in zip(order, nIdsIter): cIds = [out_clusters[nId] for nId in nIds if out_clusters[nId] != -1] if len(cIds) > 0: out_clusters[pId] = get_class(cIds) elif auto_set: out_clusters[pId] = nextId nextId += 1 return out_clusters
[docs]def majority_clusters(indexKD, r, **kwargs): """Clustering by majority voting. The algorithm assigns points iteratively to the most dominant class within a given radius. Parameters ---------- indexKD : IndexKD Spatial index with `n` points. r : positive float Radius to identify the cluster affiliation of neighbored points. \*\*kwargs : optional Optional arguments of the `clustering` function. See Also -------- clustering Examples -------- >>> coords = [(0, 0), (1, 1), (2, 1), (3, 3), (0, 1), (2, 3), (3, 4)] >>> clusters = majority_clusters(IndexKD(coords), 2) >>> print_rounded(clusters) [ 1 1 -1 0 1 0 0] """ return clustering(indexKD, r, classification.majority, **kwargs)
[docs]def weight_clusters(indexKD, r, weights=None, **kwargs): """Clustering by class weight. Parameters ---------- indexKD : IndexKD Spatial index with `n` points. r : positive float Radius to identify the cluster affiliation of neighbored points. weights : optional, array_like(Number, shape=(len(indexKD))) Weighting of each point. The class with highest weight wins. If None, all weights are set to 1, which results in similar results than `majority_clusters`. \*\*kwargs : optional Optional arguments passed to `clustering`. Examples -------- Clustering with equal weights. >>> coords = [(0, 0), (0, 1), (1, 1), (0, 0.5), (2, 2), (2, 2.5), (2.5, 2)] >>> indexKD = IndexKD(coords) >>> initial_clusters = np.arange(len(coords), dtype=int) >>> clusters = weight_clusters(indexKD, 1.5, clusters=initial_clusters) >>> print_rounded(clusters) [0 0 4 3 6 5 5] Clustering with differing weights. >>> weights = np.arange(len(coords)) >>> clusters = weight_clusters( ... indexKD, ... 1.5, ... weights=weights, ... clusters=initial_clusters ... ) >>> print_rounded(clusters) [3 6 6 3 5 5 5] See Also -------- clustering, majority_clusters """ if weights is None: weights = np.ones(len(indexKD), dtype=float) else: weights = assertion.ensure_numvector( weights, min_length=len(indexKD), max_length=len(indexKD) ) def get_class(cIds): cWeight = defaultdict(lambda: 0) for cId in cIds: cWeight[cId] += weights[cId] for key in cWeight: if cWeight[key] > cWeight[cId]: cId = key weights[cId] = float(cWeight[cId]) / len(cIds) return cId return clustering(indexKD, r, get_class, **kwargs)
[docs]def dbscan( indexKD, min_pts, epsilon=None, quantile=0.8, factor=3): """Variant of the DBSCAN algorithm [1] with automatic estimation of the `epsilon` parameter using point density. Useful for automatic outlier identification. Parameters ---------- indexKD : IndexKD Spatial index with `n` points to cluster. min_pts : int Corresponds to the `min_pts` parameter of the DBSCAN algorithm. epsilon : optional, positive float Corresponds to the `epsilon` parameter of DBSCAN algorithm. If None, a suitable value is estimated by investigating the nearest neighbour distances `dists` of all points in `indexKD` with ```epsilon = np.percentile(dists, quantile * 100) * factor```. quantile : optional, positive float Used to calculate `epsilon`. factor: optional, positive float Used to calculate `epsilon`. References ---------- [1] M. Ester, et al. (1996): "A Density-Based Algorithm for Discovering Clusters in Large Spatial Databases with Noise", KDD-96 Proceedings, pp. 226-231. Examples -------- >>> coords = [(0, 0), (0, 1), (1, 1), (0, 0.5), (2, 2), (2, 2.5), (19, 29)] >>> indexKD = IndexKD(coords) User defined epsilon. >>> clusters = dbscan(indexKD, 1, epsilon=1) >>> print_rounded(clusters) [0 0 0 0 1 1 2] Automatic epsilon estimation for outlier removal. >>> clusters = dbscan(indexKD, 2) >>> print_rounded(clusters) [ 0 0 0 0 0 0 -1] Adjust automatic epsilon estimation to achieve small clusters. >>> clusters = dbscan(indexKD, 1, quantile=0.7, factor=1) >>> print_rounded(clusters) [0 0 1 0 2 2 3] """ if not isinstance(indexKD, IndexKD): raise TypeError("'indexKD' needs to be of type 'IndexKD'") if not (isinstance(min_pts, int) and min_pts >= 0): m = "'min_pts' needs to be an integer greater or equal zero" raise ValueError(m) coords = indexKD.coords # Estimate epsilon based on density if epsilon is None: if not (isinstance(quantile, Number) and quantile > 0): raise ValueError("'quantile' needs to be a number greater zero") if not (isinstance(factor, Number) and factor > 0): raise ValueError("'factor' needs to be a number greater zero") if min_pts > 0: dists = indexKD.knn(coords, k=min_pts + 1)[0][:, 1:] else: dists = indexKD.nn[0] epsilon = np.percentile(dists, quantile * 100) * factor else: if not (isinstance(epsilon, Number) and epsilon > 0): raise ValueError("'epsilon' needs to be a number greater zero") # perform dbscan return DBSCAN(eps=epsilon, min_samples=min_pts).fit_predict(coords)