Clustering.py
12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import numpy as np
import umap
import hdbscan
import matplotlib.pyplot as plt
import pandas as pd
import plotly.express as px
import umap.plot
from copy import deepcopy
from sklearn.cluster import AgglomerativeClustering
from typing import Tuple
class Clustering_and_DimRed():
"""
Class to perform dimensionality reduction with UMAP followed by clustering with HDBSCAN.
"""
def __init__(self,
n_dims_umap: int = 5,
n_neighbors_umap: int = 15,
min_dist_umap: float = 0,
metric_umap: str = "cosine",
min_cluster_size_hdbscan: int = 30,
metric_hdbscan: str = "euclidean",
cluster_selection_method_hdbscan: str = "eom",
number_clusters_hdbscan: int = None,
random_state: int = 42,
verbose: bool = True,
UMAP_hyperparams: dict = {},
HDBSCAN_hyperparams: dict = {}) -> None:
"""
Initializes the clustering and dimensionality reduction parameters for topic modeling.
Args:
n_dims_umap (int, optional): Number of dimensions to reduce to using UMAP.
n_neighbors_umap (int, optional): Number of neighbors for UMAP.
min_dist_umap (float, optional): Minimum distance for UMAP.
metric_umap (str, optional): Metric for UMAP.
min_cluster_size_hdbscan (int, optional): Minimum cluster size for HDBSCAN.
metric_hdbscan (str, optional): Metric for HDBSCAN.
cluster_selection_method_hdbscan (str, optional): Cluster selection method for HDBSCAN.
number_clusters_hdbscan (int, optional): Number of clusters for HDBSCAN. If None, HDBSCAN will determine the number of clusters automatically. Ensure that min_cluster_size is not too large to find enough clusters.
random_state (int, optional): Random state for UMAP and HDBSCAN.
verbose (bool, optional): Whether to print progress.
UMAP_hyperparams (dict, optional): Additional hyperparameters for UMAP.
HDBSCAN_hyperparams (dict, optional): Additional hyperparameters for HDBSCAN.
"""
# do some checks on the input arguments
assert n_dims_umap > 0, "n_dims_umap must be greater than 0"
assert n_neighbors_umap > 0, "n_neighbors_umap must be greater than 0"
assert min_dist_umap >= 0, "min_dist_umap must be greater than or equal to 0"
assert min_cluster_size_hdbscan > 0, "min_cluster_size_hdbscan must be greater than 0"
assert number_clusters_hdbscan is None or number_clusters_hdbscan > 0, "number_clusters_hdbscan must be greater than 0 or None"
assert random_state is None or random_state >= 0, "random_state must be greater than or equal to 0"
self.random_state = random_state
self.verbose = verbose
self.UMAP_hyperparams = UMAP_hyperparams
self.HDBSCAN_hyperparams = HDBSCAN_hyperparams
# update hyperparameters for UMAP
self.UMAP_hyperparams["n_components"] = n_dims_umap
self.UMAP_hyperparams["n_neighbors"] = n_neighbors_umap
self.UMAP_hyperparams["min_dist"] = min_dist_umap
self.UMAP_hyperparams["metric"] = metric_umap
self.UMAP_hyperparams["random_state"] = random_state
self.UMAP_hyperparams["verbose"] = verbose
self.umap = umap.UMAP(**self.UMAP_hyperparams)
self.HDBSCAN_hyperparams["min_cluster_size"] = min_cluster_size_hdbscan
self.HDBSCAN_hyperparams["metric"] = metric_hdbscan
self.HDBSCAN_hyperparams["cluster_selection_method"] = cluster_selection_method_hdbscan
self.number_clusters_hdbscan = number_clusters_hdbscan
self.hdbscan = hdbscan.HDBSCAN(**self.HDBSCAN_hyperparams)
def reduce_dimensions_umap(self, embeddings: np.ndarray) -> Tuple[np.ndarray, umap.UMAP]:
"""
Reduces dimensions of embeddings using UMAP.
Args:
embeddings (np.ndarray): Embeddings to reduce.
Returns:
tuple: A tuple containing two items:
- reduced_embeddings (np.ndarray): Reduced embeddings.
- umap_mapper (umap.UMAP): UMAP mapper for transforming new embeddings, especially embeddings of the vocabulary. (MAKE SURE TO NORMALIZE EMBEDDINGS AFTER USING THE MAPPER)
"""
mapper = umap.UMAP(**self.UMAP_hyperparams).fit(embeddings)
dim_red_embeddings = mapper.transform(embeddings)
dim_red_embeddings = dim_red_embeddings/np.linalg.norm(dim_red_embeddings, axis=1).reshape(-1,1)
return dim_red_embeddings, mapper
def cluster_hdbscan(self, embeddings: np.ndarray) -> np.ndarray:
"""
Cluster embeddings using HDBSCAN.
If self.number_clusters_hdbscan is not None, further clusters the data with AgglomerativeClustering to achieve a fixed number of clusters.
Args:
embeddings (np.ndarray): Embeddings to cluster.
Returns:
np.ndarray: Cluster labels.
"""
labels = self.hdbscan.fit_predict(embeddings)
outliers = np.where(labels == -1)[0]
if self.number_clusters_hdbscan is not None:
clusterer = AgglomerativeClustering(n_clusters=self.number_clusters_hdbscan) #one cluster for outliers
labels = clusterer.fit_predict(embeddings)
labels[outliers] = -1
# reindex to make the labels consecutive numbers from -1 to the number of clusters. -1 is reserved for outliers
unique_labels = np.unique(labels)
unique_labels_no_outliers = unique_labels[unique_labels != -1]
map2newlabel = {label: i for i, label in enumerate(unique_labels_no_outliers)}
map2newlabel[-1] = -1
labels = np.array([map2newlabel[label] for label in labels])
return labels
def cluster_and_reduce(self, embeddings: np.ndarray) -> Tuple[np.ndarray, np.ndarray, umap.UMAP]:
"""
Cluster embeddings using HDBSCAN and reduce dimensions with UMAP.
Args:
embeddings (np.ndarray): Embeddings to cluster and reduce.
Returns:
tuple: A tuple containing three items:
- reduced_embeddings (np.ndarray): Reduced embeddings.
- cluster_labels (np.ndarray): Cluster labels.
- umap_mapper (umap.UMAP): UMAP mapper for transforming new embeddings, especially embeddings of the vocabulary. (MAKE SURE TO NORMALIZE EMBEDDINGS AFTER USING THE MAPPER)
"""
dim_red_embeddings, umap_mapper = self.reduce_dimensions_umap(embeddings)
clusters = self.cluster_hdbscan(dim_red_embeddings)
return dim_red_embeddings, clusters, umap_mapper
def visualize_clusters_static(self, embeddings: np.ndarray, labels: np.ndarray):
"""
Reduce dimensionality with UMAP to two dimensions and plot the clusters.
Args:
embeddings (np.ndarray): Embeddings for which to plot clustering.
labels (np.ndarray): Cluster labels.
"""
# Reduce dimensionality with UMAP
reducer = umap.UMAP(n_components=2, random_state = self.random_state, n_neighbors=30, metric="cosine", min_dist=0)
embeddings_2d = reducer.fit_transform(embeddings)
# Create a color palette, then map the labels to the colors.
# We add one to the number of unique labels to account for the noise points labelled as -1.
palette = plt.cm.get_cmap("tab20", len(np.unique(labels)) + 1)
# Create a new figure
fig, ax = plt.subplots(figsize=(10, 8))
outlier_shown_in_legend = False
# Iterate through all unique labels (clusters and outliers)
for label in np.unique(labels):
# Find the embeddings that are part of this cluster
cluster_points = embeddings_2d[labels == label]
# If label is -1, these are outliers. We want to display them in grey.
if label == -1:
color = 'grey'
if not outlier_shown_in_legend:
ax.scatter(cluster_points[:, 0], cluster_points[:, 1], c=color, label='outlier', s = 0.1)
outlier_shown_in_legend = True
else:
ax.scatter(cluster_points[:, 0], cluster_points[:, 1], c=color, s = 0.1)
else:
color = palette(label)
# Plot the points in this cluster without a label to prevent them from showing up in the legend
ax.scatter(cluster_points[:, 0], cluster_points[:, 1], c=color, s = 0.1)
# Add a legend
ax.legend()
# Show the plot
plt.show()
def visualize_clusters_dynamic(self, embeddings: np.ndarray, labels: np.ndarray, texts: list[str], class_names: list[str] = None):
"""
Visualize clusters using Plotly and enable hovering over clusters to see the beginning of the texts of the documents.
Args:
embeddings (np.ndarray): Embeddings for which to visualize clustering.
labels (np.ndarray): Cluster labels.
texts (list[str]): Texts of the documents.
class_names (list[str], optional): Names of the classes.
"""
# Reduce dimensionality with UMAP
reducer = umap.UMAP(n_components=2, random_state = self.random_state, n_neighbors=30, metric="cosine", min_dist=0)
embeddings_2d = reducer.fit_transform(embeddings)
df = pd.DataFrame(embeddings_2d, columns=['x', 'y'])
df['text'] = [text[:200] for text in texts]
df["class"] = labels
if class_names is not None:
df["class"] = [class_names[label] for label in labels]
# Create a color palette, then map the labels to the colors.
# Exclude the outlier (-1) label from color palette assignment
unique_labels = [label for label in np.unique(labels) if label != -1]
palette = plt.cm.get_cmap("tab20", len(unique_labels))
# Create color map
color_discrete_map = {label: 'rgb'+str(tuple(int(val*255) for val in palette(i)[:3])) if label != -1 else 'grey' for i, label in enumerate(unique_labels)}
color_discrete_map[-1] = 'grey'
# plot data points where the color represents the class
fig = px.scatter(df, x='x', y='y', hover_data=['text', 'class'], color='class', color_discrete_map=color_discrete_map)
fig.update_traces(mode='markers', marker=dict(size=3)) # Optional: Increase the marker size
# make plot quadratic
fig.update_layout(
autosize=False,
width=1500,
height=1500,
margin=dict(
l=50,
r=50,
b=100,
t=100,
pad=4
)
)
# set title
fig.update_layout(title_text='UMAP projection of the document embeddings', title_x=0.5)
# show plot
fig.show()
def umap_diagnostics(self, embeddings, hammer_edges = False):
"""
Fit UMAP on the provided embeddings and generate diagnostic plots.
Params:
------
embeddings : array-like
The high-dimensional data for UMAP to reduce and visualize.
hammer_edges : bool, default False. Is computationally expensive.
"""
new_hyperparams = deepcopy(self.UMAP_hyperparams)
new_hyperparams["n_components"] = 2
mapper = umap.UMAP(**new_hyperparams).fit(embeddings)
# 1. Connectivity plot with points
print("UMAP Connectivity Plot with Points")
umap.plot.connectivity(mapper, show_points=True)
plt.show()
if hammer_edges:
# 2. Connectivity plot with edge bundling
print("UMAP Connectivity Plot with Hammer Edge Bundling")
umap.plot.connectivity(mapper, edge_bundling='hammer')
plt.show()
# 3. PCA diagnostic plot
print("UMAP PCA Diagnostic Plot")
umap.plot.diagnostic(mapper, diagnostic_type='pca')
plt.show()
# 4. Local dimension diagnostic plot
print("UMAP Local Dimension Diagnostic Plot")
umap.plot.diagnostic(mapper, diagnostic_type='local_dim')
plt.show()