本文展示了一些提高 dali 资源使用率以及创建一个完全基于 cpu 的管道的技术。这些技术长期稳定内存使用率,将 cpu & gpu 管道的 batch 大小提高 50%。用特斯拉 v100 加速器显示 pytorch dali 可以达到接近 4000 个图像/秒的处理速度,比原生 pytorch 快了大约 4 倍。
简介
过去几年见证了深度学习硬件的长足进步。英伟达的最新产品,tesla v100 & geforce rtx 系列,包含特定的张量核,以加速常用的神经网络操作。特别是,v100 已经具备足够的性能。能够以每秒数千幅图像的速度训练神经网络。这使得在 imagenet 数据集上的单一 gpu 训练时间减少到几个小时。而在 202 年,在 imagenet 上训练 alexnet 模型花了 5 天时间!
如此强大的 gpu 使数据预处理管道变得紧张。为了解决这个问题,tensorflow 发布了一个新的数据加载器:tf.data.dataset。管道是用 c 编写的,使用基于图的方法,预处理操作被链接在一起形成一个管道。另一方面,pytorch 使用在 pil 库上用 python 编写的数据加载器,它具备良好的易于用和灵活性,诞生在速度方面不是那么出色。尽管 pil-simd 库确实改善了这种情况。
nvidia 数据加载库(dali)旨在解决数据预处理瓶颈,让数据在训练时全速运行。dali 主要用于在 gpu 上进行预处理,但是其大多数操作也有一个快速的 cpu 实现。本文主要关注 pytorch,但 dali 也支持 tensorflow、mxnet 和 tensorrt,尤其是 tensorrt 的支持非常好。它允许训练和推理使用完全相同的预处理代码。tensorflow 和 pytorch 这样的框架在数据加载器之间通常具有一定的差异,这可能会影响准确性。
以下是开始使用 dali 的一些重要资源:
dali home:
fast ai data preprocessing with nvidia dali:
dali developer guide:
getting started:
在本文的其余部分中,我将假设你对 imagenet 预处理及 dali imagenet 实例有一定的理解。我来谈谈在使用 dali 的时候遇到的问题,以及我是如何解决的。我们将研究 cpu 和 gpu 管道。
dali 长期内存使用
我在 dali 中遇到的第一个问题是,随着训练阶段的推移,ram 的使用率增加,这都会导致 oom 错误(即使在内存为 78gb 的虚拟机上也是如此)。它已经被标记位(278,344,486),但是还没有被修复。
我唯一能找到的解决办法并不美好:重新导入 dali,重新训练和验证管道:
del self.train_loader, self.val_loader, self.train_pipe,
self.val_pipe
torch.cuda.synchronize()
torch.cuda.empty_cache()
gc.collect()
importlib.reload(dali)
from dali import hybridtrainpipe, hybridvalpipe, daliiteratorcpu,
daliiteratorgpu
注意,有了这个亚博电竞网的解决方案,dali 仍然需要大量 ram 来获得最好的结果。考虑到现在的 ram 有多便宜,这不是什么大问题;相反,gpu 内存才是问题所在。从下表可以看出,使用 dali 时的最大批的大小可能比 torchvision 低 50%:
在下面的部分中,我将介绍一些减少 gpu 内存使用的方法。
构建完全基于 cpu 的管道
当不需要峰值吞吐量时(例如,当使用 resnet50 等中大型模型时),基于 cpu 的管道非常有用。cpu 训练管道只在 cpu 上执行解码和大小调整操作,而 cropmirnormalize 操作在 gpu 上运行。这点很重要。我发现,即使是用 dali 将输出传输到 gpu,也会占用大量的 gpu 内存。为了避免这种情况,我修改了示例 cpu 管道,使其完全在 cpu 上运行:
class hybridtrainpipe(pipeline):
def __init__(self, batch_size, num_threads, device_id, data_dir,
crop,
mean, std, local_rank=0, world_size=1,
dali_cpu=false, shuffle=true, fp16=false,
min_crop_size=0.08):
# as we're recreating the pipeline at every epoch, the seed
must be -1 (random seed)
super(hybridtrainpipe, self).__init__(batch_size,
num_threads, device_id, seed=-1)
# enabling read_ahead slowed down processing ~40%
self.input = ops.filereader(file_root=data_dir,
shard_id=local_rank, num_shards=world_size,
random_shuffle=shuffle)
# let user decide which pipeline works best with the chosen
model
if dali_cpu:
decode_device = "cpu"
self.dali_device = "cpu"
self.flip = ops.flip(device=self.dali_device)
else:
decode_device = "mixed"
self.dali_device = "gpu"
output_dtype = types.float
if self.dali_device == "gpu" and fp16:
output_dtype = types.float16
self.cmn = ops.cropmirrornormalize(device="gpu",
output_dtype=output_dtype,
output_layout=types.nchw,
crop=(crop, crop),
image_type=types.rgb,
mean=mean,
std=std,)
# to be able to handle all images from full-sized imagenet,
this padding sets the size of the internal nvjpeg buffers without
additional reallocations
device_memory_padding = 211025920 if decode_device == 'mixed'
else 0
host_memory_padding = 140544512 if decode_device == 'mixed'
else 0
self.decode =
ops.imagedecoderrandomcrop(device=decode_device,
output_type=types.rgb,
device_memory_padding=device_memory_padding,
host_memory_padding=host_memory_padding,
random_aspect_ratio=
[0.8, 1.25],
random_area=
[min_crop_size, 1.0],
num_attempts=100)
# resize as desired. to match torchvision data loader, use
triangular interpolation.
self.res = ops.resize(device=self.dali_device, resize_x=crop,
resize_y=crop,
interp_type=types.interp_triangular)
self.coin = ops.coinflip(probability=0.5)
print('dali "{0}" variant'.format(self.dali_device))
def define_graph(self):
rng = self.coin()
self.jpegs, self.labels = self.input(name="reader")
# combined decode & random crop
images = self.decode(self.jpegs)
# resize as desired
images = self.res(images)
if self.dali_device == "gpu":
output = self.cmn(images, mirror=rng)
else:
# cpu backend uses torch to apply mean & std
output = self.flip(images, horizontal=rng)
self.labels = self.labels.gpu()
return [output, self.labels]
dali 管道现在在 cpu 上输出一个 8 位张量。我们需要使用 pytorch 来完成 cpu->gpu 传输、浮点数转换和规范化。最后两个操作是在 gpu 上完成的,因为在实践中,它们非常快,并且减少了 cpu->gpu 内存带宽需求。在转到 gpu 之前,我试着固定张力,但没有从中获得任何性能提升。
将它与预取器组合在一起:
def _preproc_worker(dali_iterator, cuda_stream, fp16, mean, std,
output_queue, proc_next_input, done_event, pin_memory):
"""
worker function to parse dali output & apply final preprocessing
steps
"""
while not done_event.is_set():
# wait until main thread signals to proc_next_input --
normally once it has taken the last processed input
proc_next_input.wait()
proc_next_input.clear()
if done_event.is_set():
print('shutting down preproc thread')
break
try:
data = next(dali_iterator)
# decode the data output
input_orig = data[0]['data']
target = data[0]['label'].squeeze().long() # dali should
already output target on device
# copy to gpu and apply final processing in separate cuda
stream
with torch.cuda.stream(cuda_stream):
input = input_orig
if pin_memory:
input = input.pin_memory()
del input_orig # save memory
input = input.cuda(non_blocking=true)
input = input.permute(0, 3, 1, 2)
# input tensor is kept as 8-bit integer for transfer
to gpu, to save bandwidth
if fp16:
input = input.half()
else:
input = input.float()
input = input.sub_(mean).div_(std)
# put the result on the queue
output_queue.put((input, target))
except stopiteration:
print('resetting dali loader')
dali_iterator.reset()
output_queue.put(none)
class daliiteratorcpu(daliiterator):
"""
wrapper class to decode the dali iterator output & provide
iterator that functions in the same way as torchvision.
note that permutation to channels first, converting from 8-bit
integer to float & normalization are all performed on gpu
pipelines (pipeline): dali pipelines
size (int): number of examples in set
fp16 (bool): use fp16 as output format, f32 otherwise
mean (tuple): image mean value for each channel
std (tuple): image standard deviation value for each channel
pin_memory (bool): transfer input tensor to pinned memory, before
moving to gpu
"""
def __init__(self, fp16=false, mean=(0., 0., 0.), std=(1., 1.,
1.), pin_memory=true, **kwargs):
super().__init__(**kwargs)
print('using dali cpu iterator')
self.stream = torch.cuda.stream()
self.fp16 = fp16
self.mean = torch.tensor(mean).cuda().view(1, 3, 1, 1)
self.std = torch.tensor(std).cuda().view(1, 3, 1, 1)
self.pin_memory = pin_memory
if self.fp16:
self.mean = self.mean.half()
self.std = self.std.half()
self.proc_next_input = event()
self.done_event = event()
self.output_queue = queue.queue(maxsize=5)
self.preproc_thread = threading.thread(
target=_preproc_worker,
kwargs={'dali_iterator': self._dali_iterator,
'cuda_stream': self.stream, 'fp16': self.fp16, 'mean': self.mean,
'std': self.std, 'proc_next_input': self.proc_next_input,
'done_event': self.done_event, 'output_queue': self.output_queue,
'pin_memory': self.pin_memory})
self.preproc_thread.daemon = true
self.preproc_thread.start()
self.proc_next_input.set()
def __next__(self):
torch.cuda.current_stream().wait_stream(self.stream)
data = self.output_queue.get()
self.proc_next_input.set()
if data is none:
raise stopiteration
return data
def __del__(self):
self.done_event.set()
self.proc_next_input.set()
torch.cuda.current_stream().wait_stream(self.stream)
self.preproc_thread.join()
基于 gpu 的管道
在我的测试中,上面详述的新的完整 cpu 管道的速度大约是 tooview 数据加载程序的两倍,同时达到了几乎相同的最大批大小。cpu 管道在 resnet50 这样的大型模型中工作得很好,但是,当使用 alexnet 或 resnet18 这样的小型模型时,cpu 管道仍然无法跟上 gpu。对于这些情况,示例 gpu 管道表现最好。问题是,gpu 管道将最大可能的批大小减少了 50%,限制了吞吐量。
显著减少 gpu 内存使用的一种方法是,在一个阶段结束时,将验证管道保留在 gpu 之外,直到它真正需要被使用为止。这很容易做到,因为我们已经重新导入 dali 库并在每个阶段重新创建数据加载程序。
更多提示
使用 dali 的更多提示:
对于验证,均匀划分数据集大小的批大小最有效,例如当验证集大小为 50000 时,最好的批大小是 500 而不是 512,这避免了验证数据集会剩余一部分。
与 tensorflow 和 pytorch 数据加载程序类似,torchvision 和 dali 管道不会产生完全相同的输出,你将看到验证精度略有不同。我发现这是由于不同的 jpeg 图像解码器造成的。以前在大小调整上有问题,但现在是管道固定。另一方面,dali 支持 tensorrt,允许在训练和推理中使用完全相同的预处理。
对于峰值吞吐量,请尝试将数据加载程序工作线程数设置为虚拟 cpu 内核数。2 提供最佳性能(2 个虚拟内核=1 个物理内核)。
如果你想要绝对的最佳性能,并且不介意输出类似于 torchvision,请尝试关闭 dali 图像调整器上的三角形插值。
别忘了磁盘 io。确保有足够的内存来缓存数据集以及一个非常快的 ssd。dali 的磁盘传输速度可以达到 400mb/s!
集成在一起
为了方便地集成这些修改,我创建了一个数据加载器类,其中包含了这里描述的所有修改,包括 dali 和 torchvision 后端。用法很简单。实例化数据加载器:
dataset = dataset(data_dir,
batch_size,
val_batch_size
workers,
use_dali,
dali_cpu,
fp16)
然后获取训练和验证数据集加载程序:
train_loader = dataset.get_train_loader()
val_loader = dataset.get_val_loader()
在每个训练周期结束时重置数据加载器:
dataset.reset()
或者,可以在模型验证之前在 gpu 上重新创建验证管道:
dataset.prep_for_val()
基准
以下是我可以用 resnet18 使用的最大批处理大小:
因此,通过应用这些修改,在 cpu 和 gpu 模式下 dali 可以使用的最大批处理大小增加了约 50%!
以下是 shufflenet v2 0.5 和批大小 512 的吞吐量数据:
下面是使用 dali gpu 管道训练 torchvision 中包含的各种网络的一些结果:
所有测试都在一个 google cloud v100 实例上运行,该实例有 12 个 vcpus(6 个物理核)、78gb ram,使用 apex fp16 进行训练。要重现这些结果,请使用以下参数:
— fp16 — batch-size 512 — workers 10 — arch “shufflenet_v2_x0_5 or resnet18” — prof — use-dali
所以,有了dali,一台 tesla v100 的处理速度可以达到每秒处理近 4000 张图像!但这仅仅是 nvidia 超昂贵的 dgx-1 8 v100 gpu 的一半多一点。对我来说,能够在几个小时内在一个 gpu 上进行 imagenet 训练完全改变了生产力,希望对你来说也是如此!
本文提供的代码可以在如下网址找到:
via:
雷锋网雷锋网雷锋网