import random
import threading
import numpy as np
import matplotlib.pyplot as plt
import queue
import time
N = 150 # number of points per class
D = 2 # dimensionality (points are on 2D plane)
K = 3 # number of classes
X = np.zeros((N*K,D)) # data matrix (each row = single example)
y = np.zeros(N*K, dtype='uint8') # class labels
# We'll split up the data processing across two nodes. In the real world, these would be two hardware units such as GPUs.
# In this example, each node will be executed on separate threads
num_nodes = 2
# Code to generate spiral dataset
for j in range(K):
ix = range(N*j,N*(j+1))
r = np.linspace(0.0,1,N) # radius
t = np.linspace(j*4,(j+1)*4,N) + np.random.randn(N)*0.2 # theta
X[ix] = np.c_[r*np.sin(t), r*np.cos(t)]
y[ix] = j
# lets visualize the data:
plt.scatter(X[:, 0], X[:, 1], c=y, s=40, cmap=plt.cm.Spectral)
plt.show()
# initialize parameters randomly
h = 100 # size of hidden layer
W1 = 0.01 * np.random.randn(D, h)
b1 = np.zeros((1, h))
W2 = 0.01 * np.random.randn(h, K)
b2 = np.zeros((1, K))
# some hyperparameters
step_size = 1e-0
reg = 1e-3 # regularization strength
# The node class runs the forward and backward step of our network.
class node(object):
def __init__(self, W1, b1, W2, b2, ps, name):
self.W1 = W1
self.W2 = W2
self.b1 = b1
self.b2 = b2
self.task_name = name
self.dscores = []
self.ps = ps
def data(self, X, y):
self.X = X
self.y = y
self.batchSize = self.X.shape[0]
# called by the parameter server when it has calculated updated weights
def update(self, W1, b1, W2, b2):
dW2_reg = reg * self.W2
dW1_reg = reg * self.W1
self.W1 = W1
self.W2 = W2
self.b1 = b1
self.b2 = b2
self.W2 += -step_size * dW2_reg
self.W1 += -step_size * dW1_reg
def forward_and_backward(self):
self.forward()
self.backward()
def forward(self):
# l1_i: layer 1 input
# l1_o: layer 1 output
# l1_ao: layer 1 activation output
# l1_dw: layer 1 derivative of loss wrt layer weights
# l1_db: layer 1 derivative of loss wrt layer bias
# l1_delta: layer 1 derivative of loss wrt output
self.l1_i = self.X
self.l1_o = np.dot(self.l1_i, self.W1) + self.b1
self.l1_ao = np.maximum(0, self.l1_o)
self.l2_i = self.l1_ao
self.l2_o = np.dot(self.l2_i, self.W2) + self.b2
self.exp_scores = np.exp(self.l2_o)
self.probs = self.exp_scores / np.sum(self.exp_scores, axis=1, keepdims=True)
correct_logprobs = -np.log(self.probs[:, self.y])
self.data_loss = np.sum(correct_logprobs)
self.data_loss = self.data_loss / self.batchSize
self.dscores = self.probs
self.dscores[np.arange(self.dscores.shape[0]), self.y] -= 1
self.dscores /= self.batchSize
reg_loss = 0.5 * reg * np.sum(self.W1 * self.W1) + 0.5 * reg * np.sum(self.W2 * self.W2)
self.loss = self.data_loss + reg_loss
def backward(self):
self.l2_dw = np.dot(self.l1_ao.T, self.dscores)
self.l2_db = np.sum(self.dscores, axis=0, keepdims=True)
# next backprop into layer 1
self.l1_a_delta = np.dot(self.dscores, self.W2.T);
# through the activation layer
self.l1_delta = self.l1_a_delta
self.l1_delta[self.l1_ao <= 0] = 0
self.l1_dw = np.dot(self.l1_i.T, self.l1_delta)
self.l1_db = np.sum(self.l1_delta, axis=0, keepdims=True)
# send gradients to the parameter server. # In a real system, these updates would be sent over the network.
# The amount of data being sent is the total size of the network weights.
# Also note there are no dependencies between the gradients across layers. Thus, we don't have to wait until
# all the gradients have been calculated to send them to the parameter server. Gradient transmission can be
# pipelined with back-propagation, i.e., gradients for a layer can be sent to the parameter server right after
# they are calculated. For big networks with a large number of parameters and/or slow network, doing so can result
# in better resource utilization and improved efficiency.
# This "pipelining" applies to the forward pass as well. Only the parameters for a given layer are needed to run the
# forward pass for that layer. Thus, the transmission of updated parameter values from the parameter server can be
# pipelined with the execution of the forward pass.
self.ps.update_weights(self)
# The parameter server class. This class defines methods to receive gradients from the nodes and averages the gradients
# It then uses a constant learning rate to update the weights. A more sophisticated implementation would use a graduated
# learning rate, momentum etc to update the weights.
# The class also propagates the updated weights back to the nodes.
# Note that for a real system consisting of multiple GPUs on several machines (for example, for training Resnet 50 on ImageNet,
# the authors use 256 GPUs, 32 servers containing 8 GPUs each) sending each GPU's data to a centralized parameter server would
# be quite expensive. More advanced networking techniques are used in such systems. For example the authors report using the
# "halving/doubling" algorithm for inter-server reduce operations.
class param_server(object):
def __init__(self, W1, b1, W2, b2, y):
self.W1 = W1
self.W2 = W2
self.b1 = b1
self.b2 = b2
self.y = y
self.num_updates = 0
self.l2_dw = 0
self.l2_db = 0
self.l1_dw = 0
self.l1_db = 0
self.nodes = []
# accumulate gradients. Once gradients from all nodes have been received, compute the parameter updates and apply
# those updates to the network parameters. Then propagate the parameters back to the nodes.
def update_weights(self, node):
self.num_updates += 1
# perform a parameter update
self.l2_db += node.l2_db
self.l2_dw += node.l2_dw
self.l1_dw += node.l1_dw
self.l1_db += node.l1_db
self.nodes.append(node)
if (self.num_updates == num_nodes):
# we received gradients from all nodes, compute average and update the weights
self.W2 += -step_size * self.l2_dw/num_nodes
self.b2 += -step_size * self.l2_db/num_nodes
self.W1 += -step_size * self.l1_dw/num_nodes
self.b1 += -step_size * self.l1_db/num_nodes
# In a real system, these updates would be sent over the network. The amount of data being
# sent is the total size of the network. For example, VGG 19 has 144 Million Params, if each
# parameter is represented as a 4 byte float, the total data sent (to each node) ~ 576 MB
for node in self.nodes:
node.update(self.W1, self.b1, self.W2, self.b2)
# zero out accumulated gradients for next update cycle
self.l2_db = 0
self.l2_dw = 0
self.l1_dw = 0
self.l1_db = 0
self.nodes = []
self.num_updates = 0
task_queue = queue.Queue()
# This is the main task function that is executed by each thread. It pulls the latest node object from the task_queue and
# executes the forward and backward step on it. At the end of the backward step, each node sends its gradients to the
# parameter server.
def forward_and_backward_pass():
while True:
# task_queue.get will block if the queue is empty
node = task_queue.get()
# print("processing task: %s" % node.task_name)
node.forward_and_backward()
# print("finished processing task: %s" % node.task_name)
task_queue.task_done()
num_examples = X.shape[0]
# number of samples in each batch
B = (int)(num_examples/2)
# Create the nodes and parameter server
param_server_ = param_server(W1, b1, W2, b2, y)
node1 = node(W1, b1, W2, b2, param_server_, "node1")
node2 = node(W1, b1, W2, b2, param_server_, "node2")
nodes = [];
nodes.append(node1)
nodes.append(node2)
num_nodes = 2
# Create threads to execute the backward and forward pass for each node on a different thread
jobs = []
jobs.append(threading.Thread(target=forward_and_backward_pass))
jobs.append(threading.Thread(target=forward_and_backward_pass))
for j in jobs:
j.setDaemon(True)
j.start()
execution_time = 0
use_threading = 0
for i in range(12000):
# Create a random shuffling of the data. Karpathy's original code uses the same data in each iteration. In a real
# world application, each mini-batch will consist of a random shuffling (without replacement) of the training data
# One pass over the entire training data is called an "Epoch". My implementation is somewhere in the middle, where
# the entire training data is used in each training iteration, but each node gets a random shuffling of the data
# and thus operates on a different set of data every iteration.
idx = 0
random_inds = np.random.permutation(np.arange(0, num_examples))
X = X[random_inds, :]
y = y[random_inds]
# Send data to each node. On a real system, our training data will be stored in the main memory. A random "minibatch"
# will be sampled from the training data and sent over to the GPUs (usually over a PCIe connection, which can become
# a bottleneck when a minibatch consists of a large amount of data such as image data).
for node in nodes:
node.data(X[idx:B + idx, :], y[idx:B + idx])
idx += B
t0 = time.time()
if (use_threading):
for node_idx in range(0, 2):
task_queue.put(nodes[node_idx])
# Wait for the threads to finish. After executing its backward pass, each node will send its computed
# gradients to the parameter server. When the parameter server has received gradients from all nodes, it will
# compute the updated weights and send them back to each node
task_queue.join()
else:
for node_idx in range(0,2):
nodes[node_idx].forward_and_backward()
t1 = time.time()
execution_time += t1 - t0
# evaluate accuracy. In a real system, the accuracy will be determined on a separate set of training data that was
# not used for training. We don't do that in this toy example.
scores = node1.l2_o
predicted_class = np.argmax(scores, axis=1)
print('training time: %.2f' % execution_time)
print('training accuracy: %.2f' % (np.mean(predicted_class == y[0:B])))
I was wondering if you would need a lock around the param server object?
That’s a good point! Probably.. although even if the updates from one thread are interrupted by another, the average shouldn’t change, because it doesn’t matter which order the gradients are accumulated. However there could be some other race condition going on, so putting a lock around the update_weights function is a good idea.