Here at New Relic, we collect 1.37 billion data points per minute. A vast amount of the data we collect, analyze, and display for our customers is stored as time series. In an effort to build relationships between applications and other entities, such as servers and containers, for new, intelligent products like New Relic Radar, we're constantly exploring faster and more efficient methods of grouping time series data. Given the amount of data we collect, faster clustering times are crucial.

### Speeding up k-means clustering

A popular method of grouping data is *k*-means clustering. The basic principle of *k*-means involves determining the distances between each data point and grouping them into meaningful clusters. This process is usually demonstrated using two-dimensional data on a plane. It’s certainly possible to cluster in more than two dimensions but our ability to visualize the data quickly becomes more complicated. For example, the following graphic shows the convergence of *k*-means clustering in two arbitrary dimensions over several iterations:

(GIF by Chire - Own work, GFDL)

Unfortunately, this method does not map well to time series since they are generally one-dimensional over time. However, we can still compute a distance factor between two time series using a number of different functions. In these examples, we use the mean squared error (MSE) to explore different *k*-means implementations. In testing these implementations, we noticed many suffered from serious performance issues, but we were still able to demonstrate how it's possible to speed up *k*-means clustering, in some cases by an order of magnitude.

For this exercise, we’ll use Python’s NumPy package. If you’re following along on your own, you can copy and paste the code right into a Jupyter Notebook. Let’s start by importing the packages we’ll use along the way:

import time import numpy as np import matplotlib.pyplot as plt %matplotlib inline

In the following tests, we start by generating 10,000 random time series, 500 samples long. Then we add noise to sine waves of random lengths. While this kind of data isn't really ideal for *k*-means clustering, it should be enough to strain unoptimized implementations.

n = 10000 ts_len = 500 phases = np.array(np.random.randint(0, 50, [n, 2])) pure = np.sin([np.linspace(-np.pi * x[0], -np.pi * x[1], ts_len) for x in phases]) noise = np.array([np.random.normal(0, 1, ts_len) for x in range(n)]) signals = pure * noise # Normalize everything between 0 and 1 signals += np.abs(np.min(signals)) signals /= np.max(signals) plt.plot(signals[0])

### The first implementation

Let’s start with the most basic and straightforward implementation we encountered. `euclid_dist`

implements a simple MSE estimator for the distance function, and `k_means`

implements the basic k-means algorithm. We chose `num_clust`

random time series from our initial dataset as centroids (which represent the middle of each cluster). For `num_iter`

iterations, we continually moved the centroids around while minimizing the distance between them and the other time series.

def euclid_dist(t1, t2): return np.sqrt(((t1-t2)**2).sum()) def k_means(data, num_clust, num_iter): centroids = signals[np.random.randint(0, signals.shape[0], num_clust)] for n in range(num_iter): assignments={} for ind, i in enumerate(data): min_dist = float('inf') closest_clust = None for c_ind, j in enumerate(centroids): dist = euclid_dist(i, j) if dist < min_dist: min_dist = dist closest_clust = c_ind if closest_clust in assignments: assignments[closest_clust].append(ind) else: assignments[closest_clust]=[] assignments[closest_clust].append(ind) for key in assignments: clust_sum = 0 for k in assignments[key]: clust_sum = clust_sum + data[k] centroids[key] = [m / len(assignments[key]) for m in clust_sum] return centroids

t1 = time.time() centroids = k_means(signals, 100, 100) t2 = time.time() print("Took {} seconds".format(t2 - t1))

Took 1138.8745470046997 seconds

It took almost 20 minutes to cluster the data. That isn't terrible, but it's certainly not great. To speed this up for the next implementation, we decided to eliminate as many for loops as possible.

### The vectorized implementation

One of the awesome benefits of using NumPy is vectorized operations. (If you're unfamiliar with vector operations, check out this excellent review.)

The k-means algorithm calls for pairwise comparisons between each centroid and data point. This means, in our previous iteration, we compared each of our 100 centroids to 10,000 time series for a total of one million comparisons per iteration. Keep in mind that each comparison involved two sets of 500 samples. Since we iterated 100 times, that gave us a total of 100 million comparisons—quite a bit of work for a single CPU. While Python is a reasonably efficient language, it's hard to beat operations written in C. For this reason most of NumPy's core operations are written in C and vectorized to minimize overhead incurred by loops.

Let's explore how we vectorized our code in order to eliminate as many `for`

loops as possible.

First, we divided the code into functional blocks. This gave us a better idea of what each section was responsible for. Next, we altered the `calc_centroids`

step to only iterate over centroids (instead of each time series). This way, we were passing all the time series and one centroid to `euclid_dist`

. We also allocated the `dist`

matrix up front instead of treating it like a dictionary and expanding it over time. NumPy's `argmin`

compared each vector pair in one shot.

In `move_centroids`

, we collapsed another for loop using vector operations, and we iterated only over the unique set of centroids. If we lost a centroid, we added the appropriate number by randomly choosing from our time series dataset (this rarely happens in the wild).

Lastly, we added an early stopping check to `k_means`

to halt our iterations if there were no updates to the centroids.

Here’s a look at the code:

def euclid_dist(t1, t2): return np.sqrt(((t1-t2)**2).sum(axis = 1)) def calc_centroids(data, centroids): dist = np.zeros([data.shape[0], centroids.shape[0]]) for idx, centroid in enumerate(centroids): dist[:, idx] = euclid_dist(centroid, data) return np.array(dist) def closest_centroids(data, centroids): dist = calc_centroids(data, centroids) return np.argmin(dist, axis = 1) def move_centroids(data, closest, centroids): k = centroids.shape[0] new_centroids = np.array([data[closest == c].mean(axis = 0) for c in np.unique(closest)]) if k - new_centroids.shape[0] > 0: print("adding {} centroid(s)".format(k - new_centroids.shape[0])) additional_centroids = data[np.random.randint(0, data.shape[0], k - new_centroids.shape[0])] new_centroids = np.append(new_centroids, additional_centroids, axis = 0) return new_centroids def k_means(data, num_clust, num_iter): centroids = signals[np.random.randint(0, signals.shape[0], num_clust)] last_centroids = centroids for n in range(num_iter): closest = closest_centroids(data, centroids) centroids = move_centroids(data, closest, centroids) if not np.any(last_centroids != centroids): print("early finish!") break last_centroids = centroids return centroids

t1 = time.time() centroids = k_means(signals, 100, 100) t2 = time.time() print("Took {} seconds".format(t2 - t1))

adding 1 centroid(s) early finish! took 206.72993397712708 seconds

Just a bit longer than 3.5 minutes. Not bad! But we wanted to finish even faster.

### The k-means++ implementation

For the next iteration, we applied the* k*-means++ algorithm. This algorithm seeks to choose better initial centroids. Let's see if this optimization helped…

def init_centroids(data, num_clust): centroids = np.zeros([num_clust, data.shape[1]]) centroids[0,:] = data[np.random.randint(0, data.shape[0], 1)] for i in range(1, num_clust): D2 = np.min([np.linalg.norm(data - c, axis = 1)**2 for c in centroids[0:i, :]], axis = 0) probs = D2/D2.sum() cumprobs = probs.cumsum() ind = np.where(cumprobs >= np.random.random())[0][0] centroids[i, :] = np.expand_dims(data[ind], axis = 0) return centroids def k_means(data, num_clust, num_iter): centroids = init_centroids(data, num_clust) last_centroids = centroids for n in range(num_iter): closest = closest_centroids(data, centroids) centroids = move_centroids(data, closest, centroids) if not np.any(last_centroids != centroids): print("Early finish!") break last_centroids = centroids return centroids

t1 = time.time() centroids = k_means(signals, 100, 100) t2 = time.time() print("Took {} seconds".format(t2 - t1))

early finish! took 180.91435194015503 seconds

Adding the *k*-means++ algorithm yielded slightly better performance compared to our previous iteration. However, this optimization really started to pay off once we parallelized it.

### The parallel implementation

Until this point, all our implementations were single-threaded, so we decided to explore parallelizing parts of the *k*-means++ algorithm. Since we were using a Jupyter Notebook, we opted to manage parallelism with ipyparallel for parallel computing. With ipyparallel, we didn’t have to worry about forking the entire server, but there were a few idiosyncrasies we needed to address. For instance, we had to inform our worker nodes to load NumPy.

import ipyparallel as ipp c = ipp.Client() v = c[:] v.use_cloudpickle() with v.sync_imports(): import numpy as np

(For details on launching workers, see the pyparallel getting started guide).

In this iteration, we focused on two areas for parallelization. First, `calc_centroids`

had a loop that iterated over each centroid and compared it with our time series data. We used `map_sync`

to send each of these iterations to our workers.

Next, we parallelized a similar loop in the *k*-means++ centroid search. Note the call to `v.push`

: since our lambda referenced `data`

, we needed to make sure it was available on the worker nodes. We accomplished this by calling ipyparallel's `push`

method to copy the variable into the workers' global scope.

Take a look:

def calc_centroids(data, centroids): return np.array(v.map_sync(lambda x: np.sqrt(((x - data)**2).sum(axis = 1)), centroids)) def closest_centroids(points, centroids): dist = calc_centroids(points, centroids) return np.argmin(dist, axis=0) def init_centroids(data, num_clust): v.push(dict(data=data)) centroids = np.zeros([num_clust, data.shape[1]]) centroids[0,:] = data[np.random.randint(0, data.shape[0], 1)] for i in range(1, num_clust): D2 = np.min(v.map_sync(lambda c: np.linalg.norm(data - c, axis = 1)**2, centroids[0:i,:]), axis = 0) probs = D2/D2.sum() cumprobs = probs.cumsum() ind = np.where(cumprobs >= np.random.random())[0][0] centroids[i, :] = np.expand_dims(data[ind], axis = 0) return centroids

adding 2 centroid(s) early finish! took 143.49819207191467 seconds

At just a bit more than two minutes, this iteration achieved our fastest time yet!

### Next steps: go faster!

In these tests, we relied on central processing unit (CPU) cores. CPUs offer convenient parallelization, but we suspect that with a little more effort, we could implement clustering orders of a magnitude faster using graphical processing units (GPUs). We might be able to do this using TensorFlow, open-source software for numerical computations and machine learning. In fact, TensorFlow already includes a *k*-means implementation, but we’ll almost certainly have to tweak it to support time-series clustering. At any rate, we’ll never stop looking for more efficient and faster clustering algorithms to help manage our users’ data.

The views expressed on this blog are those of the author and do not necessarily reflect the views of New Relic. Any solutions offered by the author are environment-specific and not part of the commercial solutions or support offered by New Relic. Please join us exclusively at the Explorers Hub (discuss.newrelic.com) for questions and support related to this blog post. This blog may contain links to content on third-party sites. By providing such links, New Relic does not adopt, guarantee, approve or endorse the information, views or products available on such sites.