Commit a3eec639 authored by szr712's avatar szr712

支持多卡训练

parent f4e935ff
......@@ -12,7 +12,7 @@ class Embedder(nn.Module):
return self.embed(x)
class PositionalEncoder(nn.Module):
def __init__(self, d_model, max_seq_len = 200, dropout = 0.1):
def __init__(self, d_model, max_seq_len = 256, dropout = 0.1):
super().__init__()
self.d_model = d_model
self.dropout = nn.Dropout(dropout)
......
......@@ -10,6 +10,8 @@ parameters:
- -src_voc 拼音字典
- -trg_voc 汉字字典
- -batchsize 默认64
- -master_batch_size 主卡batchsize
- -gpus gpu列表
# 验证集验证模型
```
......
import torch
from torch.nn.modules import Module
from torch.nn.parallel.scatter_gather import gather
from torch.nn.parallel.replicate import replicate
from torch.nn.parallel.parallel_apply import parallel_apply
from scatter_gather import scatter_kwargs
class _DataParallel(Module):
r"""Implements data parallelism at the module level.
This container parallelizes the application of the given module by
splitting the input across the specified devices by chunking in the batch
dimension. In the forward pass, the module is replicated on each device,
and each replica handles a portion of the input. During the backwards
pass, gradients from each replica are summed into the original module.
The batch size should be larger than the number of GPUs used. It should
also be an integer multiple of the number of GPUs so that each chunk is the
same size (so that each GPU processes the same number of samples).
See also: :ref:`cuda-nn-dataparallel-instead`
Arbitrary positional and keyword inputs are allowed to be passed into
DataParallel EXCEPT Tensors. All variables will be scattered on dim
specified (default 0). Primitive types will be broadcasted, but all
other types will be a shallow copy and can be corrupted if written to in
the model's forward pass.
Args:
module: module to be parallelized
device_ids: CUDA devices (default: all devices)
output_device: device location of output (default: device_ids[0])
Example::
>>> net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
>>> output = net(input_var)
"""
# TODO: update notes/cuda.rst when this class handles 8+ GPUs well
def __init__(self, module, device_ids=None, output_device=None, dim=0, chunk_sizes=None):
super(_DataParallel, self).__init__()
if not torch.cuda.is_available():
self.module = module
self.device_ids = []
return
if device_ids is None:
device_ids = list(range(torch.cuda.device_count()))
if output_device is None:
output_device = device_ids[0]
self.dim = dim
self.module = module
self.device_ids = device_ids
self.chunk_sizes = chunk_sizes
self.output_device = output_device
if len(self.device_ids) == 1:
self.module.cuda(device_ids[0])
def forward(self, *inputs, **kwargs):
if not self.device_ids:
return self.module(*inputs, **kwargs)
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids, self.chunk_sizes)
if len(self.device_ids) == 1:
return self.module(*inputs[0], **kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
outputs = self.parallel_apply(replicas, inputs, kwargs)
return self.gather(outputs, self.output_device)
def replicate(self, module, device_ids):
return replicate(module, device_ids)
def scatter(self, inputs, kwargs, device_ids, chunk_sizes):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim, chunk_sizes=self.chunk_sizes)
def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
def gather(self, outputs, output_device):
return gather(outputs, output_device, dim=self.dim)
def data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None):
r"""Evaluates module(input) in parallel across the GPUs given in device_ids.
This is the functional version of the DataParallel module.
Args:
module: the module to evaluate in parallel
inputs: inputs to the module
device_ids: GPU ids on which to replicate module
output_device: GPU location of the output Use -1 to indicate the CPU.
(default: device_ids[0])
Returns:
a Variable containing the result of module(input) located on
output_device
"""
if not isinstance(inputs, tuple):
inputs = (inputs,)
if device_ids is None:
device_ids = list(range(torch.cuda.device_count()))
if output_device is None:
output_device = device_ids[0]
inputs, module_kwargs = scatter_kwargs(inputs, module_kwargs, device_ids, dim)
if len(device_ids) == 1:
return module(*inputs[0], **module_kwargs[0])
used_device_ids = device_ids[:len(inputs)]
replicas = replicate(module, used_device_ids)
outputs = parallel_apply(replicas, inputs, module_kwargs, used_device_ids)
return gather(outputs, output_device, dim)
def DataParallel(module, device_ids=None, output_device=None, dim=0, chunk_sizes=None):
if chunk_sizes is None:
return torch.nn.DataParallel(module, device_ids, output_device, dim)
standard_size = True
for i in range(1, len(chunk_sizes)):
if chunk_sizes[i] != chunk_sizes[0]:
standard_size = False
if standard_size:
return torch.nn.DataParallel(module, device_ids, output_device, dim)
return _DataParallel(module, device_ids, output_device, dim, chunk_sizes)
\ No newline at end of file
......@@ -41,3 +41,5 @@ CUDA_VISIBLE_DEVICES=2 nohup python train_token_classification.py -src_data data
CUDA_VISIBLE_DEVICES=1 python train_token_classification.py -src_data data/train_file/pinyin_split_random_wo_tones -trg_data data/train_file/hanzi_split_random_wo_tones -epochs 100 -model_name token_classification_split_new -src_voc ./data/voc/pinyin.txt -trg_voc ./data/voc/hanzi.txt
CUDA_VISIBLE_DEVICES=1 python train_token_classification.py -src_data data/train_file/pinyin_split_random_wo_tones -trg_data data/train_file/hanzi_split_random_wo_tones -epochs 100 -model_name token_classification_split_new -src_voc ./data/voc/pinyin.txt -trg_voc ./data/voc/hanzi.txt -gpus 4,5,6,7
import torch
from torch.autograd import Variable
from torch.nn.parallel._functions import Scatter, Gather
def scatter(inputs, target_gpus, dim=0, chunk_sizes=None):
r"""
Slices variables into approximately equal chunks and
distributes them across given GPUs. Duplicates
references to objects that are not variables. Does not
support Tensors.
"""
def scatter_map(obj):
if isinstance(obj, Variable):
return Scatter.apply(target_gpus, chunk_sizes, dim, obj)
assert not torch.is_tensor(obj), "Tensors not supported in scatter."
if isinstance(obj, tuple):
return list(zip(*map(scatter_map, obj)))
if isinstance(obj, list):
return list(map(list, zip(*map(scatter_map, obj))))
if isinstance(obj, dict):
return list(map(type(obj), zip(*map(scatter_map, obj.items()))))
return [obj for targets in target_gpus]
return scatter_map(inputs)
def scatter_kwargs(inputs, kwargs, target_gpus, dim=0, chunk_sizes=None):
r"""Scatter with support for kwargs dictionary"""
inputs = scatter(inputs, target_gpus, dim, chunk_sizes) if inputs else []
kwargs = scatter(kwargs, target_gpus, dim, chunk_sizes) if kwargs else []
if len(inputs) < len(kwargs):
inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
elif len(kwargs) < len(inputs):
kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
inputs = tuple(inputs)
kwargs = tuple(kwargs)
return inputs, kwargs
......@@ -9,6 +9,7 @@ from Batch import create_masks, create_masks2
import dill as pickle
import os
from Process import get_len
from data_parallel import DataParallel
# os.environ["CUDA_VISIBLE_DEVICES"] = "1"
......@@ -105,12 +106,32 @@ def main():
parser.add_argument('-pkl_dir')
parser.add_argument('-src_voc')
parser.add_argument('-trg_voc')
parser.add_argument('-master_batch_size', type=int, default=64, help='batch size on the master gpu.')
parser.add_argument('-gpus', type=str, default='0,1,2,3')
opt = parser.parse_args()
start_time = time.localtime()
opt.device = 0 if opt.no_cuda is False else -1
if opt.device == 0:
assert torch.cuda.is_available()
opt.gpus_str = opt.gpus
opt.gpus = [int(gpu) for gpu in opt.gpus.split(',')]
opt.gpus = [i for i in range(len(opt.gpus))] if opt.gpus[0] >=0 else [-1]
if opt.master_batch_size == -1:
opt.master_batch_size = opt.batchsize // len(opt.gpus)
rest_batch_size = (opt.batchsize - opt.master_batch_size)
opt.chunk_sizes = [opt.master_batch_size]
for i in range(len(opt.gpus) - 1):
slave_chunk_size = rest_batch_size // (len(opt.gpus) - 1)
if i < rest_batch_size % (len(opt.gpus) - 1):
slave_chunk_size += 1
opt.chunk_sizes.append(slave_chunk_size)
print('training chunk_sizes:', opt.chunk_sizes)
if opt.device == 0:
assert torch.cuda.is_available()
......@@ -135,6 +156,16 @@ def main():
if opt.SGDR == True:
opt.sched = CosineWithRestarts(opt.optimizer, T_max=opt.train_len)
if len(opt.gpus) > 1:
model = DataParallel(model, device_ids=opt.gpus, chunk_sizes=opt.chunk_sizes).to(opt.device)
else:
model = model.to(opt.device)
for state in opt.optimizer.state.values():
for k, v in state.items():
if isinstance(v, torch.Tensor):
state[k] = v.to(device=opt.device, non_blocking=True)
if opt.checkpoint > 0:
print("model weights will be saved every %d minutes and at end of epoch to directory weights/" % (opt.checkpoint))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment