Skip to content

PIPELINING PARALLELISM

We have introduced the characteristics of pipelining parallelism in COMMON DISTRIBUTED PARALLEL STRATEGIES.

From OneFlow's global view, pipelining can be achieved by simply setting the placement attribute of Tensor.

The following code is a simple example that will run the network in QUICKSTART with pipelining parallelism. nn.Flatten, nn.Linear(28*28, 512) and nn.ReLU() run on GPU0, and the rest layers of the network run on GPU1.

Code
import oneflow as flow

BATCH_SIZE = 16
BROADCAST = [flow.sbp.broadcast]
P0 = flow.placement("cuda", [0])
P1 = flow.placement("cuda", [1])

class Stage0Module(flow.nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = flow.nn.Flatten()
        self.linear0 = flow.nn.Linear(28*28, 512)
        self.relu0 = flow.nn.ReLU()

    def forward(self, x):
        out = self.flatten(x)
        out = self.linear0(out)
        out = self.relu0(out)
        return out

class Stage1Module(flow.nn.Module):
    def __init__(self):
        super().__init__()
        self.linear1 = flow.nn.Linear(512, 512)
        self.relu1 = flow.nn.ReLU()
        self.linear2 = flow.nn.Linear(512, 10)
        self.relu2 = flow.nn.ReLU()

    def forward(self, x):
        out = self.linear1(x)
        out = self.relu1(out)
        out = self.linear2(out)
        out = self.relu2(out)
        return out

class PipelineModule(flow.nn.Module):
    def __init__(self):
        super().__init__()
        self.m_stage0 = Stage0Module()
        self.m_stage1 = Stage1Module()

        self.m_stage0.to_global(placement=P0, sbp=BROADCAST)
        self.m_stage1.to_global(placement=P1, sbp=BROADCAST)

    def forward(self, x):
        out_stage0 = self.m_stage0(x)
        in_stage1 = out_stage0.to_global(placement=P1, sbp=BROADCAST)
        out_stage1 = self.m_stage1(in_stage1)
        return out_stage1

module_pipeline = PipelineModule()
sgd = flow.optim.SGD(module_pipeline.parameters(), lr=0.001)

class PipelineGraph(flow.nn.Graph):
    def __init__(self):
        super().__init__()
        self.module_pipeline = module_pipeline
        self.module_pipeline.m_stage0.config.stage_id = 0
        self.module_pipeline.m_stage1.config.stage_id = 1
        self.loss_fn = flow.nn.CrossEntropyLoss()
        self.config.set_gradient_accumulation_steps(2)
        self.add_optimizer(sgd)

    def build(self, x, y):
        out = self.module_pipeline(x)
        loss = self.loss_fn(out, y)
        loss.backward()
        return loss

graph_pipeline = PipelineGraph()

x = flow.randn(BATCH_SIZE, 1, 28, 28)
x = x.to_global(P0, BROADCAST)
y = flow.randint(0, 10, (BATCH_SIZE,))
y = y.to_global(P1, BROADCAST)

for i in range(20):
    loss = graph_pipeline(x, y)
    print(loss.to_local())

When the code above is saved as a script (pipeline.py), it can be then launched by the launch module:

python3 -m oneflow.distributed.launch --nproc_per_node 2 ./pipeline.py

More Details

Setting placement and SBP

Setting up the placement and SBP at the begining:

BROADCAST = [flow.sbp.broadcast]
P0 = flow.placement("cuda", [0])
P1 = flow.placement("cuda", [1])

P0 and P1 represent the 0th GPU and the 1st GPU on the 0th machine respectively.

By calling nn.Module.to_global or Tensor.to_global, the model or tensor will be distributed to the devices specified before, breaking a network into stages.

Here we define a PipelineModule that specifically sets the pipeline for each stage.

    class PipelineModule(flow.nn.Module):
        def __init__(self):
            #...

            self.m_stage0.to_global(placement=P0, sbp=BROADCAST)
            self.m_stage1.to_global(placement=P1, sbp=BROADCAST)

        def forward(self, x):
            out_stage0 = self.m_stage0(x)
            in_stage1 = out_stage0.to_global(placement=P1, sbp=BROADCAST)
            out_stage1 = self.m_stage1(in_stage1)
            return out_stage1

Transforming the Local Tensor to the Global Tensor

The example uses randomly generated data as input.

    x = flow.randn(BATCH_SIZE, 1, 28, 28)
    x = x.to_global(P0, BROADCAST)

The launch will start two processes when you launch the training by the launch module because the command-line parameter is --nproc_per_node 2. Both processes will execute the code in the script.

The statement x = flow.randn(BATCH_SIZE, 1, 28, 28) returns the Local Tensor (the local data only valid in current process). when running x = x.to_global(P0, BROADCAST), OneFlow will automatically integrate the Local Tensor of all processes into the Global Tensor.

In practice, each computing device can load data locally, and then convert the Local Tensor to the Global Tensor via to_global.

Stage ID and Settings for Gradient Accumulation

We can set Stage ID by setting the config.stage_id attribute of Module. The Stage ID is numbered starting from 0 and increasing by 1.

Call self.config.set_gradient_accumulation_steps method to set the step size of gradient accumulation.

The information needed to implement micro-batch in pipelining parallelism can be obtained by these two configurations.

    self.module_pipeline.m_stage0.config.stage_id = 0
    self.module_pipeline.m_stage1.config.stage_id = 1
    self.config.set_gradient_accumulation_steps(2)
Back to top