DMTK Framework

Introduction

DMTK provides a parameter server based framework for training machine learning models on big data with numbers of machines. It is currently a standard C++ library and provides a series of friendly programming interfaces. With such easy-to-use APIs, machine learning researchers and practitioners do not need to worry about the system routine issues such as distributed model storage and operation, inter-process and inter-thread communication, multi-threading management, and so on. Instead, they are able to focus on the core machine learning logics: data, model, and training.

The DMTK framework adopts a typical client-server architecture. A number of server instances run on multiple machines and are responsible for maintaining the global model parameters. The training routines access and update the parameters with some client APIs that call the underlying communication facilities.

In order to adapt to different cluster environments, the DMTK framework supports two inter-process communication libraries: MPI and ZMQ. And one can easily switch between them without any code change in the application side. When compiling the library, one can enable the macro _MPI_VERSION_ in meta.h to use MPI communication facilities, or disable (comment) this macro for using ZMQ as communication library. In the MPI mode, the server routines are bound to the worker processes running as server threads. And in the ZMQ mode, the server is a standalone executable, so one needs to compile the server project separately in this case.

In addition to the flexible communication facilities, another feature of the DMTK framework is its friendly APIs and data-driven programming fashion. Multi-threading is widely used in machine learning programs for fully leveraging the computation resource. However, programming with multi-threading is complicated. With our framework, one just needs to implement the core routine of data representation and training, and push the data into the framework in a data feeding thread. It is the framework's responsibility to start and manage the working threads. Along with the multi-threading features, a computation-communication pipeline is also implemented in the framework in order to improve the training efficiency.

Key Concepts

This section briefly introduces some key concepts in the DMTK framework. For more details, please refer to the user manual.

Model Representation -- Data Structures

The basic data structure in our current library is table/matrix since many machine learning models could be represented as (a set of) matrices. We will support more data structures and user-customized data structures in the future.

Row

The template class Row<T> is the primary data structure. A Row corresponds to one row in a table (matrix), and it could also be regarded logically as a key-value container in which keys are nonnegative integers and values are of type T. By default, the library supports four value types: int (int32_t), long long (int64_t), float and double. It is easy to modify the source code for supporting more data types. In addition to supporting different data types, a row could be either dense or sparse:

  • Dense Row: A dense row can be constructed with Format::Dense. In a dense row, the keys (indexes) are fixed continuous integers ranging from 0 to capacity-1, like a vector of fixed size in C++. Actually the dense row is implemented by an array so that users can access the element in an efficient manner;
  • Sparse Row: In some scenarios such as LDA, model representations are sparse in its nature and sparse rows are appropriate for such applications. Users can construct a sparse row with Format::Sparse. Actually the sparse row is implemented by a hash table and the capacity could be flexibly auto-grown, like unordered_map in C++.

Table

An instance of Table class is a set of rows. Each row in a table has a distinct row ID. And the rows in a table may have different formats.

Data Interface

The training procedure in the client side of the framework is a data driven process. That is, users (usually in main thread) pack and feed training data (as well as test data or hyper-parameters) into a queue and the background workers (parameter loader and trainer threads) keep monitoring the queue, fetching and consuming the data. The way of packing a block of data is to inherit and override the DataBlockBase class, which is an interface of representing a set of data samples as well as some hyper-parameters. Users push a data block into the framework through the method PushDataBlock(DataBlockBase*).

Within an iteration, a data block will be first transferred to the parameter loader. The parameter loader will parse the data block, load necessary parameters from servers, and fill in the local cache. And then the data block as well as the cached parameters will be shared and processed by all local trainers.

Training

Iteration

An iteration is a time period during which a data block is processed by a number of worker threads collaboratively within a process. A data block will be first transferred to the parameter loader. And then it will be shared by all local trainers after the parameter loading routine. Please note that within an iteration, all local trainers handle the same data block and it is the users' responsibility to partition the samples in a data block to each trainers.

Clock

A clock is a period of inter-process synchronization. The parameter server supports several delta-based synchronization mechanisms, which could be configured by setting the field of max_delay in the initialization parameter Config.

  • BSP (max_delay=0): All worker processes are barriered forcibly at the end of a clock;
  • SSP (max_delay>0): The difference of the numbers of clocks between the fastest worker process and the slowest worker process could not be greater than max_delay;
  • ASP (max_delay<0): All worker processes do not wait for each other.
These delta-based synchronization mechanisms are naturally supported by calling the Add method for model update. In fact, average-based synchronization mechanisms like iterative model average (IMA) or ADMM are also supported since the average-based mechanism can be reduced to a delta-based mechanism as follows: (x'_1 + ... x'_n) / n = x + (x'_1 - x) / n + ... + (x'_1 - x) / n.

ParameterLoaderBase

At the beginning of each iteration, the parameter loading thread is responsible for parsing the data block, loading necessary (a portion of or the entire model parameters according to the application and data) from servers and filling the local cache. For completing such logic, one needs to inherit the interface of the ParameterLoaderBase class and override the ParseAndRequest(DataBlockBase*) method. One does not need to care about the parameter loading thread - it is the framework's responsibility to start and manage the thread.

TrainerBase

A trainer is mainly responsible for training with data blocks and updating models. One can also implement other logics such as progressive evaluation of the model during training. Each trainer has its own training thread and it is the framework's responsibility to handle the threading issue. One just needs to implement the key machine learning logic by inheriting the interface of the TrainerBase class and overriding the TrainIteration(DataBlockBase*) method.

After the parameter loading routine in an iteration, the data block and cached parameters will be shared with all local trainers within a process. Every trainer will call the TrainIteration(DataBlockBase*) method to handle the same data block collaboratively. The trainer also provides corresponding methods to access the cached parameters and to update the model.

Steps of Programming

1. Defining your own DataBlock

Inherit the DataBlockBase class and implement your own data block logic.

2. Implementing Parameter Loader and Trainer

Before starting the environment, you need to create a parameter loader object and a couple of trainer objects which will be further passed into the framework.

Inherit the ParameterLoaderBase class and override the ParseAndRequest(DataBlockBase*) method for parsing data block and loading corresponding parameters from the servers. The ParameterLoaderBase class provides the RequestXXX method for fetching parameters from servers and filling the local cache.

Inherit the TrainerBase class and override the TrainIteration(DataBlockBase*) method for training with a data block and updating the model for an iteration. The TrainerBase class provides the GetXXX method for accessing parameters in local cache and the Add methods for updating the parameters.

3. Starting and Closing the Environment

You can pass the configuration to the environment by setting a Config object and feed it into the Init entrance method. To start the environment, you will call the starting method Init(std::vector&, ParameterLoaderBase*, const Config&, int*, char***). Of course, there is a method Close(bool) that you need to call at the end to close the environment.

4. Configuration and Initialization

You should tell the framework how many tables your application need and the detailed setting of each table. In addition, sometimes you may want to initialize the tables with non-trivial values before training. For completing such logic, you need to implement some codes between these two methods: BeginConfig() and EndConfig(). In the code area between these two methods, you can call the AddXXXTable(...) method to create tables in both local worker and servers; and the SetXXXRow(...) method to set rows. If you want to initiaze the server tables with non-trivial values before training, you should call the AddToServer(...) method.

5. Training

The training logic should be placed in the code area between the two methods BeginTrain() and EndTrain(). Training in the framework is a data driven process. In other word, you are just required to feed data into the framework and do not need to handle the training routine by yourself. For a clock period, you need to call the methods BeginClock() and EndClock() to tell the framework that the data fed between these function calls are within the clock period. In a clock period, you may feed one or more data blocks into the framework by calling the method PushDataBlock(DataBlockBase*).

Download and Compiling

One can download the source code of DMTK from GitHub project https://github.com/Microsoft/multiverso. In addition, DMTK depends on ZeroMQ and MPI for communication, in which ZeroMQ is always needed and MPI is optional for MPI mode.

Windows

  1. Install ZeroMQ. The Windows installer could be downloaded from http://miru.hk/archive/ZeroMQ-4.0.4~miru1.0-x64.exe;
  2. Download the C++ wrapper of ZeroMQ zmq.hpp into $(ZMQDir)/include folder;
  3. Install msmpi v5 if you would like to adopt MPI for inter-process communication, including MSMPI_Setup and MSMPI_SDK. Then set the $(MsmpiDir)/bin folder in the system path(For use of mpiexec.exe and smpd.exe);
  4. The Visual Studio solution file is under the folder windows. One can configurate the project and compile it.

Linux (tested with Ubuntu 12.04)

  1. Run ./third_party/install.sh to download the Zeromq and MPICH2 (If you have installed before, just ignore and modified related path in Makefile);
  2. Run make all -j4 to build the project.