相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下Python的计算时间,减少大家在算法上的等待时间。下面为大家讲述有关PyTorch的内容。 1.介绍:在PyTorch模块中,我将展示如何使用torch和检查、初始化GPU设备pycuda,以及如何使算法更快。 PyTorch是建立在torch的机器学习库。它得到了Facebook AI研究小组的支持。在最近开发之后,由于它的简单性,动态图形以及它本质上是Python,它在被开发出来之后变得非常流行。它的速度仍然没有落后,在很多情况下可以说是表现的非常好的。 pycuda允许你从python访问Nvidia的CUDA并行计算API。 2.如何检查cuda的可用性?要检查是否有cuda可用的设备Torch,可以简单地运行一下下面的代码:
3.如何获取有关你的cuda设备的更多信息?要获取设备的基本信息,可以使用torch.cuda。但是,要获取有关设备的更多信息,可以使用pycuda,这是一个围绕CUDA库开发的python包装器。你可以使用:
或者你可以这么用:
我编写了一个简单的类来获取有关cuda兼容GPU的信息:
要获取当前的内存使用情况,你可以使用pyTorch的函数:
在运行应用程序后,可以使用一个简单的命令来清除缓存:
但是,使用这个命令不会通过张量释放占用的GPU内存,因此它无法增加可用于PyTorch的GPU内存量。 这些内存方法仅适用于GPU。所以这才是真正需要它的地方。 4.如何存储张量并在GPU上运行模型?
如果你想要在CPU上存储一些内容,可以简单地编写代码:
这个向量是存储在CPU上的,你对它执行的任何操作都是在CPU上执行完成的。而要将其转移到GPU上,你只需要执行以下操作.cuda:
或者,
这将为它选择默认设备,该默认设备可通过以下命令查看:
或者,你也可以执行以下操作:
你也可以将一个模型发送到GPU设备。例如,考虑一个简单的模块nn.Sequential:
要将其发送到GPU设备,只需执行以下操作:
你可以检查它是否在GPU设备上,为此,你必须检查它的参数是否在GPU设备上,例如:
5.如果有多个GPU,如何选择和使用GPU?你可以为当前应用/存储选择一个GPU,该GPU可以与你上一个应用/存储选择的GPU不同。 正如在第(2)部分中已经看到的那样,我们可以获得所有与cuda兼容的设备及其Id使用pycuda的情况,在此我们就不再赘述了。 考虑到你有3个cuda兼容的设备,你可以初始化和分配tensors到一个特定的设备,就像这样。
当你对这些Tensors 进行任何操作时,无论选择什么设备,都可以执行该操作,结果会和Tensor保存在同一个设备上。
如果你有多个GPU,则可以在其中划分应用程序的工作,但是他们之间的通信会带来开销。但是,如果你不需要过多的进行传递信息,那你可以尝试一下。 实际上还有一个问题。在PyTorch中的所有GPU操作中,默认情况下都是异步的。尽管在CPU和GPU或两个GPU之间复制数据时确实进行了必要的同步,但是,如果你在命令torch.cuda.Stream()的帮助下创建自己的数据流,那么你讲不得不处理指令的同步 举一个PyTorch文档中的例子,这是不正确的:
如果你想充分利用多个GPU,那么你可以:
6.数据并行性在数据并行性中,我们将数据(从数据生成器中获得的一个批次的数据)分割为较小的小型批次的数据,然后将其发送到多个GPU进行并行计算。 在PyTorch中,数据并行中是使用torch.nn.DataParallel实现的 我们将看到一个简单的例子来了解实际情况。为此,我们将必须使用nn.parallel的某些功能:
或者,只需要简单地:
7.数据并行比较训练集数据+Val w/数据加载器+SSD中对真实数据的数据扩充 方式1*V100/CUDA 9/CuDNN 74*V100/CUDA 9/CuDNN 7Pytorch27分钟10分钟Keras(TF)38分钟18分钟Tensorflow33分钟22分钟Chainer29分钟8分钟MXNet(Gluon)29分钟10分钟 训练集W/在内存中的综合数据 方式1*V100/CUDA 9/CuDnn 74*V100/CUDA 9 / CuDNN 7Pytorch25分钟8分钟Keras(TF)36分钟15分钟Tensorflow25分钟14分钟Chainer27分钟7分钟MxNet(Gluon)28分钟8分钟 现在,你可以清楚的看到,即使必须在开始和结束时与主设备进行通信,并行处理也绝对是有帮助的。并且仅在多GPU情况下,PyTorch比所有结果提供结果的时间更快仅仅略低于Chainer。Pytorch只需要通过对DataParallel的一次调用,就会使其变得简单。 8.torch.multiprocessingtorch.multiprocessing是Python multiprocessing模块的包装,其API与原始模块100%兼容。因此,你可以在此处使用Python的多处理模块中的Queue',Pipe',Array'等。此外,为了使其更快,他们添加了一个方法,share_memory_()该方法允许数据进入一个状态,任何进程都可以直接使用它,因此将该数据作为参数传递给不同的进程将不会复制该数据。 你可以共享Tensors,模型的parameters,也可以根据需要在CPU或GPU上共享它们。
你可以在此处的“池和进程”部分中使用上面的方法,并且要获得更快的速度,可以使用share_memory_()方法Tensor在所有进程之间共享(例如)而不被复制。
下一期我们继续看加快Python计算的另一种方法——Numba~ 相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下Python的计算时间,减少大家在算法上的等待时间。今天给大家介绍Numba这一块的内容。 1.简介所以什么是Numba呢?Numba是Python的即时编译器,也就是说当你调用Python 函数时,你的全部或部分代码都会被计时转换成为机器码进行执行,然后它就会以你的本机机器码速度运行,Numba由Anaconda公司赞助,并得到了许多组织的支持。 使用Numba,你可以加速所有以集中计算的、计算量大的python函数(例如循环)的速度。它还支持numpy库!因此,你也可以在计算中使用numpy,并加快整体计算的速度,因为python中的循环非常慢。你还可以使用python标准库中的数学库的许多功能,例如sqrt等。 2.为什么选择Numba?所以,为什么要选择Numba?特别是当存在有许多其他编译器,例如cython或任何其他类似的编译器,或类似pypy的东西时。 选择Numba的理由很简单,那就是因为你不需要离开使用Python编写代码的舒适区。是的,你没看错,你不需要为了加速数据的运行速度而改变你的代码,这与从具有类型定义的相似cython代码获得的加速相当。那不是更好么? 你只需要在函数周围添加一个熟悉的Python功能,也就是装饰器(包装器)。目前类的装饰器也在开发之中。 所以,你只需要添加一个装饰器就可以了。例如: from numba import jit@jitdef function(x): # 循环或数值密集型的计算 return x 它看起来仍然像是纯python代码,不是吗? 3. Numba如何工作?Numb使用LLVM编译器基础结构,从纯Python代码生成优化的机器码。使用Numba的代码运行速度与C,C ++或Fortran中的类似代码相媲美。 这是代码的编译方式: 首先,获取,优化Python函数并将其转换为Numba的中间表示形式,然后类似于Numpy的类型推断一样进行类型判断(因此python float为float64),然后将其转换为LLVM可解释的代码。然后,该代码被馈送到LLVM的即时编译器以发出机器代码。 你可以根据需要在运行时生成代码或在CPU(默认)或GPU上导入代码。 4.使用基本的Numba功能(只需要@jit!)小菜一碟! 为了获得最佳的性能,numba建议在你的jit包装器中使用参数nopython = True,但它根本不会使用Python解释器。或者你也可以使用@njit。如果你使用nopython = True的包装器失败并出现错误,则可以使用简单的@jit包装器,该包装器将编译部分代码,对其进行循环,然后将其转换为函数,再编译为机器码,然后将其余部分交给python解释器。 因此,你只需要执行以下操作: from numba import njit, jit@njit # 或者@jit(nopython=True)def function(a, b): # 循环或数值密集型计算 return result 使用@jit时,请确保你的代码具有Numba可以编译的内容,例如计算密集型循环,使用它支持的库(Numpy)及其支持的函数。否则,它将无法编译任何内容。 首先,numba在首次用作机器代码后还会缓存这些函数。因此,在第一次使用之后,它会变得更快,因为你无需再次编译该代码,因为你使用的参数类型和你之前使用的相同。 而且,如果你的代码是可以并行化运行的,那么也可以将parallel = True作为参数传递,但是必须跟参数nopython = True结合使用。目前,它仅可以在CPU上工作。 你也可以指定你想要的函数签名,但是它不会编译你给他的任何其他类型的参数,比如: 你还可以指定你希望函数具有的函数签名,但是对于提供给它的任何其他类型的参数,它将不会编译。例如: from numba import jit, int32@jit(int32(int32, int32))def function(a, b): #循环或数值型密集型计算 return result#或者你还没有导入类型的名称#你可以将他们作为字符串传递@jit('int32(int32, int32)')def function(a, b): #循环或数值型密集型计算 return result 现在,你的函数将只接受两个int32并返回一个int32。这样,你可以更好地控制自己的函数。你甚至可以根据需要传递多个)函数签名。 你还可以使用numba提供的其他装饰器: 1. @vectorize:允许将标量参数用作numpy ufunc, 1. @guvectorize:产生NumPy广义ufuncs 1. @stencil:将函数声明为类似模板操作的内核, 1. @jitclass:对于支持jit的类, 1. @cfunc:声明一个用作本机回调的函数(从C / C ++等调用), 1. @overload:注册自己的函数实现以在nopython模式下使用,例如@overload(scipy.special.j0)。 Numba还具有预先(AOT)编译功能,它生成一个编译后的扩展模块,该模块不依赖于Numba。但: 1. 它只允许使用常规函数(不能使用ufuncs), 1. 你必须指定一个函数签名。你只能指定一个,因为许多指定使用不同的名称。 它还会为你的CPU架构系列生成通用代码。 5. @vectorize包装器通过使用@vectorize包装器,你可以将对标量进行操作的函数转换为数组,例如,如果你正在使用math仅在标量上运行的python 库,则可以对数组使用。这提供了类似于numpy数组操作(ufuncs)的速度。例如: @vectorizedef func(a, b): # 对标量进行运算 return result 你还可以将target参数传递给此包装器,该包装器的值可以等于parallel用于并行化代码,cuda用于在cuda / GPU上运行代码的值。 @vectorize(target="parallel")def func(a, b): # 对标量进行运算 return result 假设你的代码具有足够的计算密集性或数组足够大,则使用numpy进行矢量化target = "parallel"或"cuda"通常比numpy实现运行得更快。如果不是这样的话,这将花费大量时间来制作线程和为不同的线程拆分元素,这可能会超过整个过程的实际计算时间。因此,工作应该足够繁重才能加快速度。 6.在GPU上运行函数你也可以像包装器一样传递@jit来在cuda / GPU上运行函数。为此,你将必须numba库中导入cuda。但是在GPU上运行代码不会像以前那样容易。为了在GPU上的数百个甚至数千个线程上运行函数,它需要完成一些初始计算。你必须声明和管理网格,块和线程的层次结构。但是这并不难。 要在GPU上执行一个函数,你必须定义一个 kernel function(内核函数)或一个device function(设备函数)。首先,让我们看一下kernel function(核函数)。 关于内核函数需要记住的几点: a)内核在被调用时显式声明其线程层次结构,即块数和每个块的线程数。你可以编译一次内核,然后使用不同的块和网格大小多次调用它。 b)内核无法返回值。因此,你将不得不在原始数组上进行更改,或者传递另一个数组来存储结果。对于计算标量,你将必须传递一个一元数组。 # 定义一个内核函数from numba import cuda@cuda.jitdef func(a, result): # 然后是一些CUDA相关的计算 # 你的计算密集的代码 # 你的答案储存在'result'中 因此,要启动内核,你将必须传递两个东西: 1. 每个块的线程数, 1. 块的数量。 例如: threadsperblock = 32blockspergrid = (array.size + (threadsperblock - 1)) // threadsperblockfunc[blockspergrid, threadsperblock](array) 每个线程中的内核函数必须知道它在哪个线程中,知道它负责数组的哪个元素。通过Numba,只需一次调用即可轻松获得元素的这些位置。 @cuda.jitdef func(a, result): pos = cuda.grid(1) # 对一维数组 # x, y = cuda.grid(2) # 对二维数组 if pos < a.shape[0]: result[pos] = a[pos] * (some computation) 为了节省将numpy数组复制到特定设备并再次将结果存储在numpy数组中的时间,Numba提供了一些函数来声明和发送数组到特定的设备,如:numba.cuda.device_array,numba.cuda.device_array_like,numba.cuda.to_device,等等,以节省不必要的时间复制到cpu(除非必要)。 另一方面,device function只能从设备内部(通过内核或其他设备函数)好处是,你可以从device function返回一个值。因此,你可以使用此函数的返回值来计算kernel function或device function的一些内容。 from numba import cuda@cuda.jit(device=True)def device_function(a, b): return a + b Numba 在其cuda库中还具有原子操作,随机数生成器,共享内存实现(以加快数据访问速度)等。 ctypes / cffi / cython互操作性: · cffi- 在nopython模式下支持CFFI函数的调用。 · ctypes — 在nopython模式下支持ctypes包装器函数的调用… · Cython导出的函数是可调用的。 下一期我们来看加快Python算法的另一种方法——数据并行化! 相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下Python的计算时间,减少大家在算法上的等待时间。以下给大家讲解关于数据并行化这方面的内容。 1.介绍随着时间和处理器计算能力的增长,数据呈指数级增长,我们需要找到有效地处理数据的方法。那我们应该怎么办呢? GPU是一种非常有效的解决方案。但是,GPU并不是为了机器学习而设计的,它是专门为复杂的图像处理和游戏而设计的。我们使算法能够在现有GPU上运行,并且确实取得了成果。现在,谷歌推出了一种名为TPU(张量处理单元)的新设备,该设备专门针对TensorFlow上的机器学习工作而量身定做的,其结果确实令人激动。同时英伟达在这方面也并没有退缩。 但是我们将来会在某个时候达到顶峰。即使我们我们现在拥有大量可用的数据集,但是单台机器或计算单元也不足以处理这样的负载。我们将不得不使用多台机器来完成我们的任务。我们将不得不并行化完成我们的任务。 接下来,我们将研究大多数情况下你将在Python中使用的一些方法。然后再介绍一下Dask和torch.multiprocessing。 2.池和进程Python库的Pool和Process方法都来自于multiprocessing它为我们的任务启动了一个新的过程,但是方式有所不同。Process每次调用仅执行一个进程: import multiprocessing as mpp = mp.Process(target= ##目标函数, args= ##参数到函数)# 此调用将只生产一个进程,该进程将处理在后台使用给定的参数处理目标函数 但是这个过程还没有开始。要启动它,你必须执行以下操作: p.start 现在,你可以将其保留在此处,或者通过以下方式检查该过程是否完成: p.join#现在它将等待进程完成。 不检查过程是否已完成有许多用途。例如,在客户端-服务器应用程序中,数据包丢失的可能性或无响应进程的可能性确实很低,我们可以忽略它,这可以使我们的速度大大提高。[取决于申请程序] 对于多个进程,你必须创建多个Process。你想做多少就可以做多少。当你调用.start它们时,它们全部都将会启动。 processes =[mp.Process(target=func, args=(a, b)) for (a, b) in list]for p in processes: p.startfor p in processes: p.join 另一方面, Pool启动固定数量的进程,然后我们可以为这些进程分配一些任务。因此,在特定的时间实例中,只有固定数量的进程将在运行,其余的将在等待状态中。进程的数量通常被选作设备的内核数,如果此参数为空,也是可以作为默认的状态的。 pool = mp.Pool(processes=2) 现在有许多方法可以应用在Pool。在Data Science中,我们可以避免使用的是Pool.apply和Pool.map,因为它们会在任务完成后立即返回结果。Pool.apply仅采用一个参数,并且仅使用一个过程,而Pool.map将接受许多参数,并将其放入我们Pool的过程中。 results = [pool.apply(func, (x)) for x in X]# 或者 results = pool.map(func, (arg)) # 仅需要一个参数 考虑到我们前面的客户端-服务器应用程序的例子,此处预定义了要运行的最大进程数,因此,如果我们有很多请求/数据包,则n(仅在Pool中的最大进程)将运行一次,而其他将在等待其中一个进程插槽的队列中排队。 向量的所有元素的平方 # 我们如何使用数据框# A: 你可以使用一些可以并行化的函数df.shape# (100, 100)dfs = [df.iloc[i*25:i*25+25, 0] for i in range(4)]with Pool(4) as p: res = p.map(np.exp, dfs)for i in range(4): df.iloc[i*25:i*25+25, 0] = res[i]# 它可以方便的对数据进行预处理 什么时候使用什么? 如果你有很多任务,但其中很少的任务是计算密集型的,则应使用Process。因为如果它们需要大量计算,它们可能会阻塞你的CPU,并且你的系统可能会崩溃。如果你的系统可以一次处理所有这些操作,那么他们就不必在队列中等待机会了。 并且当你的任务数量固定且它们的计算量很大时,应使用Pool。因为你同时释放他们,那么你的系统很可能会崩溃。 3.线程处理什么!线程处理在python中进行? python中的线程声誉。人们的这一点看法是对的。实际上,线程在大多数情况下是不起作用的。那么问题到底是什么呢? 问题就出在GIL(全局解释器锁定)上。GIL是在Python的开发初期就引入的,当时甚至在操作系统中都没有线程的概念。选择它是因为它的简单性。 GIL一次仅允许一个CPU进程。也就是说,它一次仅允许一个线程访问python解释器。因此,一个线程将整个解释器Lock,直到它完成。 对于单线程程序,它非常快,因为只有一个Lock要维护。随着python的流行,有效地推出GIL而不损害所有相关应用程序变得越来越困难。这就是为什么它仍然存在的原因。 但是,如果你的任务不受CPU限制,则仍然可以使用多线程并行(y)。也就是说,如果你的任务受I / O约束,则可以使用多个线程并获得加速。因为大多数时候这些任务都在等待其他代理(例如磁盘等)的响应,并且在这段时间内它们可以释放锁,而让其他任务同时获取它。⁴ NOTE: (来自于官方网页)The GIL is controversial because it prevents multithreaded CPython programs from taking full advantage of multiprocessor systems in certain situations. Note that potentially blocking or long-running operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL. Therefore it is only in multithreaded programs that spend a lot of time inside the GIL, interpreting CPython bytecode, that the GIL becomes a bottleneck. 以下是对官方网页的解释: GIL是有争议的,因为它阻止多线程CPython程序在某些情况下充分利用多处理器系统。注意,潜在的阻塞或长时间运行的操作,如I/O、图像处理和NumPy数字处理,都发生在GIL之外。因此,只有在花费大量时间在GIL内部解释CPython字节码的多线程程序中,GIL才会成为瓶颈。 因此,如果你的任务受IO限制,例如从服务器下载一些数据,对磁盘进行读/写等操作,则可以使用多个线程并获得加速。 from threading import Thread as timport queueq = queue.Queue # 用于放置和获取线程的结果func_ = lambda q, args: q.put(func(args))threads = [t(target=func_, args=(q, args)) for args in args_array]for t in threads: t.startfor t in threads: t.joinres = for t in threads: res.append(q.get) # 这些结果不一定是按顺序排列的 要保存线程的结果,可以使用类似于Queue 的方法。为此,你将必须如上所示定义函数,或者可以在函数内部使用Queue.put,但是为此,你必须更改函数定义以Queue`做为参数。 现在,你在队列中的结果不一定是按顺序排列的。如果希望结果按顺序排列,则可以传入一些计数器作为参数,如id作为参数,然后使用这些id来标识结果的来源。 threads = [t(func_, args = (i, q, args)) for i, args in enumerate(args_array)]# 并相应地更新函数NOTE:在pandas中的多处理中由于某些原因 'read.csv' 的方法并没有提供太多的加速,你可以考虑使用Dask做为替代 线程还是进程? 一个进程是重量级的,因为它可能包含许多自己的线程(包含至少一个线程),并且分配了自己的内存空间,而线程是轻量级的,因为它在父进程的内存区域上工作,因此制作起来更快。 进程内的线程之间的通信比较容易,因为它们共享相同的内存空间。而进程间的通信(IPC-进程间通信)则比较慢。但是,共享相同数据的线程又可能进入竞争状态,应谨慎使用Locks或使用类似的解决方案。 4.DaskDask是一个并行计算库,它不仅有助于并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),而且还有助于并行化低级任务/功能,并且可以通过制作任务图来处理这些功能之间的复杂交互。[ 即使用低级调度程序 ]这类似于Python的线程或多处理模块。 他们也有一个单独的机器学习库dask-ml,这与如现有的库(如sklearn,xgboost和tensorflow)集成在一起。 from dask import delayed as delay@delaydef add(x, y): return x+y@delaydef sq(x): return x**2# 现在你可以以任何方式使用这些函数,Dask将使你的执行并行化。顾名思义,Dask不会立即执行函数调用,而是根据对输入和中间结果调用函数的方式生成计算图。计算最终结果:result.compute Dask在做任何事情的时候都有一种内在的并行性。对于如何处理DataFrame的,你可以将其视为分而治之的方法,它将DataFrame分为多个块,然后并行应用给定的函数。 df = dask.DataFrame.read_csv("BigFile.csv", chunks=50000)# 你的DataFrame已经被划分为了多个块,你应用的每个函数将分别并行的应用所有的模块。它有大部分的Pandas功能,你可以使用:agg = df.groupby(["column"]).aggregate(["sum", "mean"])agg.columns = new_column_namesdf_new = df.merge(agg.reset_index, on="column", how="left")# 虽然到目前为止还没有计算结果,但是使用.compute可以并行计算。df_new.compute.head 它们还具有用于在计算机集群上运行它们的接口。 5.torch.multiprocessingtorch.multiprocessing是Python multiprocessing模块的封装函数,其API与原始模块100%兼容。因此,你可以在此处使用Python的 multiprocessing模块中的Queue',Pipe',Array'等。此外,为了使其更快,他们添加了一个方法,share_memory_该方法允许数据进入一个状态,在这个状态下任何进程都可以直接使用它,因此将该数据作为参数传递给不同的进程不会复制该数据。 。 你可以共享Tensors,模型的parameters,也可以根据需要在CPU或GPU上共享它们。 来自Pytorch的警告:(关于GPU上的共享) CUDA API要求导出到其他进程的分配在被其他进程使用时仍然有效。你应该小心,确保你共享的CUDA张量不会超出范围,只要有必要。这对于共享模型参数应该不是问题,但是传递其他类型的数据时应该小心。注意,这个限制不适用于共享CPU内存。 你可以在此处的"Pool and Process"部分中使用上面的方法,并且要获得更快的速度,可以使用share_memory_方法在所有进程之间共享一个Tensor(例如)而不被需要复制。 # 使用多个过程训练一个模型:import torch.multiprocessing as mpdef train(model): for data, labels in data_loader: optimizer.zero_grad loss_fn(model(data), labels).backward optimizer.step # 这将更新共享参数model = nn.Sequential(nn.Linear(n_in, n_h1), nn.ReLU, nn.Linear(n_h1, n_out))model.share_memory #需要"fork"方法工作processes = for i in range(4): # NO.的过程 p = mp.Process(target=train, args=(model,)) p.start processes.append(p)for p in processes: p.join 下一期继续看加快Python算法的第4种方法——Dask! 相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下Python的计算时间,减少大家在算法上的等待时间。今天给大家讲述最后一方面的内容,关于Dask的方法运用。 1.简介随着对机器学习算法并行化的需求不断增加,由于数据大小甚至模型大小呈指数级增长,如果我们拥有一个工具,可以帮助我们并行化处理Pandas的DataFrame,可以并行化处理Numpy的计算,甚至并行化我们的机器学习算法(可能是来自sklearn和Tensorflow的算法)也没有太多的麻烦,那它对我们会非常有帮助。 好消息是确实存在这样的库,其名称为Dask。Dask是一个并行计算库,它不仅有助于并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),而且还有助于并行化低级任务/功能,并且可以通过制作任务图来处理这些功能之间的复杂交互。[ 即使用低级调度程序 ]这类似于Python的线程或多处理模块。 他们也有一个单独的机器学习库dask-ml,这与如现有的库集成如sklearn,xgboost和tensorflow。 Dask通过绘制任务之间的交互图来并行化分配给它的任务。使用Dask的.visualize方法来可视化你的工作将非常有帮助,该方法可用于所有数据类型以及你计算的复杂任务链。此方法将输出你的任务图,并且如果你的任务在每个级别具有多个节点(即,你的任务链结构在多个层次上具有许多独立的任务,例如数据块上的并行任务),然后Dask将能够并行化它们。 注意: Dask仍是一个相对较新的项目。它还有很长的路要走。不过,如果你不想学习全新的API(例如PySpark),Dask是你的最佳选择,将来肯定会越来越好。Spark / PySpark仍然遥遥领先,并且仍将继续改进。这是一个完善的Apache项目。 2.数据类型Dask中的每种数据类型都提供现有数据类型的分布式版本,例如pandas中的DataFrame、numpy中的ndarray和Python中的list。这些数据类型可以大于你的内存,Dask将以Blocked方式对数据并行(y)运行计算。Blocked从某种意义上说,它们是通过执行许多小的计算(即,以块为单位)来执行大型计算的,而块的数量为chunks的总数。 a)数组: 网格中的许多Numpy数组作为Dask数组 Dask Array对非常大的数组进行操作,将它们划分为块并并行执行这些块。它有许多可用的numpy方法,你可以使用这些方法来加快速度。但是其中一些没有实现。 只要支持numpy切片,Dask Array就可以从任何类似数组结构中读取数据,并且可以通过使用并且通过使用Dask . Array .from_array方法具有.shape属性。它还可以读取.npy和.zarr文件。 import dask.array as daimport numpy as nparr = numpy.random.randint(1, 1000, (10000, 10000))darr = da.from_array(arr, chunks=(1000, 1000))# 它会生成大小为(1000,1000)的块darr.npartitioins# 100 当你的数组真的很重时(即它们无法放入内存)并且numpy对此无能为力时,可以使用它。因此,Dask将它们分为数组块并为你并行处理它们。 现在,Dask对每种方法进行惰性评估。因此,要实际计算函数的值,必须使用.compute方法。它将以块为单位并行计算结果,同时并行化每个独立任务。 result = darr.compute 1)元素数量较少时,Numpy比Dask快;2)Dask接管了Numpy,耗时约1e7个元素;3)Numpy无法产生更多元素的结果,因为它无法将它们存储在内存中。 b)DataFrame: 5个Pandas DataFrame在一个Dask DataFrame中提供每月数据(可以来自diff文件) 与Dask Arrays相似,Dask DataFrames通过将文件划分为块并将这些块的计算函数并行化,从而对不适合内存非常大的数据文件进行并行计算。 import dask.dataframe as dddf = dd.read_csv("BigFile(s).csv", blocksize=50e6) 现在,你可以应用/使用pandas库中可用的大多数功能,并在此处应用。 agg = df.groupby(["column"]).aggregate(["sum", "mean", "max", "min"])agg.columns = new_column_names #请查看notebookdf_new = df.merge(agg.reset_index, on="column", how="left")df_new.compute.head c)Bag: Dask Bag包并行处理包含多个数据类型元素Python的list相似对象。当你尝试处理一些半结构化数据(例如JSON Blob或日志文件)时,此功能很有用。 import dask.bag as dbb = db.from_txt("BigSemiStructuredData.txt")b.take(1) Daskbag逐行读取,.take方法输出指定行数的元组。 Dask Bag在这样的Python对象集合上实现例如map,filter,fold,和groupby等操作。它使用Python迭代器并行地完成这个任务,占用的内存很小。它类似于PyToolz的并行版本或PySpark RDD的Python版本。 filtered = b.filter(lambda x: x["Name"]=="James")\ .map(lambda x: x["Address"] = "New_Address")filtered.compute 3.延时如果你的任务有点简单,并且你不能或不想使用这些高级集合来执行操作,则可以使用低级调度程序,该程序可帮助你使用dask.delayed接口并行化代码/算法。dask.delayed也可以进行延迟计算。 import dask.delayed as delay@delaydef sq(x): return x**2@delay def add(x, y): return x+y@delay def sum(arr): sum=0 for i in range(len(arr)): sum+=arr[i] return sum 你可以根据需要在这些函数之间添加复杂的交互,使用上一个任务的结果作为下一个任务的参数。Dask不会立即计算这些函数,而是会为你的任务绘制图形,有效地合并你使用的函数之间的交互。 inputs = list(np.arange(1, 11))#将外接程序 dask.delayed 加入到列表temp = for i in range(len(inputs)): temp.append(sq(inputs[i])) # 计算输入的sq并保存 # 延迟计算在列表inputs=temp; temp = for i in range(0, len(inputs)-1, 2): temp.append(add(inputs[i]+inputs[i+1])) # 添加两个连续 # 结果从prev步骤inputs = tempresult = sum(inputs) # 将所有prev步骤的结果相加results.compute 你可以将延迟添加到具有许多可能的小块的任何可并行化代码中,从而获得加速的效果。它可以是你想计算的许多函数,例如上面的示例,或者可以使用并行读取多个文件pandas.read_csv。 4.分布式首先,到目前为止,我们一直使用Dask的默认调度器来计算任务的结果。但是你可以根据需要从Dask提供的选项中更改它们。 Dask 带有四个可用的调度程序: · threaded:由线程池支持的调度程序 · processes:由进程池支持的调度程序 · single-threaded(又名" sync"):同步调度程序,用于调试 · distributed:用于在多台计算机上执行图形的分布式调度程序 result.compute(scheduler="single-threaded") #用于调试# 或者dask.config.set(scheduler="single-threaded")result.compute#注:(从官方网页)#当被称为GIL的函数释放时,线程任务将工作得很好,而多处理总是启动时间较慢,并且在任务之间需要大量的通信。# 你可以通过其中一个得到调度程序 commands:dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync#单线程的最后一个 但是,Dask还有一个调度器,dask.distributed由于以下原因它可能是首选使用的: 1. 它提供了异步API的访问,尤其是Future, 1. 它提供了一个诊断仪表板,可以提供有关性能和进度的宝贵见解 1. 它可以更复杂地处理数据位置,因此在需要多个流程的工作负载上,它比多处理调度程序更有效。 你可以创建一个Dask的dask.distributed调度程序,通过导入和创建客户端实现分布式调度器 from dask.distributed import Clientclient = Client # Set up a local cluster# 你可以导航到http://localhost:8787/status 查看# 诊断仪表板,如果你有Bokeh安装的话 现在,你可以使用client.submit方法,将函数和参数作为其参数,从而将任务提交到此集群。然后我们可以使用client.gather或.result方法来收集结果。 sent = client.submit(sq, 4) # sq: square 函数result = client.gather(sent) # 或者 sent.result 你也可以仅使用dask.distributed.progress来查看当前单元格中任务的进度。你还可以明确选择使用dask.distributed.wait来等待任务完成。 Note: (Local Cluster)有时您会注意到Dask正在超出内存使用,即使它正在划分任务。它可能发生在您身上,因为您试图在数据集上使用的函数需要您的大部分数据进行处理,而多重处理可能使情况变得更糟,因为所有工作人员都可能试图将数据集复制到内存中。这可能发生在聚合的情况下。或者您可能想限制Dask只使用特定数量的内存。 在这些情况下,您可以使用Dask.distributed。LocalCluster参数,并将它们传递给Client,从而使用本地机器的核心构建LocalCluster。 from dask.distributed import Client, LocalClusterclient = Client(n_workers=1, threads_per_worker=1, processes=False, memory_limit='25GB', scheduler_port=0, silence_logs=True, diagnostics_port=0)client 'scheduler_port=0'和' stics_port=0'将为这个特定的客户端选择随机端口号。在'process =False'的情况下,dask的客户端不会复制数据集,这可能发生在您所创建的每个进程中。您可以根据自己的需要或限制对客户机进行调优,要了解更多信息,可以查看LocalCluster的参数。您还可以在同一台机器的不同端口上使用多个客户机。 5.机器学习Dask也有一个库,可以帮助并允许大多数流行的机器学习库,例如sklearn,tensorflow和xgboost。 在机器学习中,你可能会遇到几个不同的问题。而具体的策略取决于你面临的问题: 1. 大型模型:数据适合放入RAM,但是训练时间太长。许多超参数组合,许多模型的大型集合等。 1. 大型数据集:数据大于RAM,并且不能选择抽样。 因此,你应该: · 对于内存中适合的问题,只需使用scikit-learn(或你最喜欢的ML库)即可; · 对于大型模型,请使用dask_ml.joblib和你最喜欢的scikit-learn估算器 · 对于大型数据集,请使用dask_ml估算器。 a)预处理: dask_ml.preprocessing包含一些sklearn的一些功能,如RobustScalar(稳健标量),StandardScalar(标准标量),LabelEncoder(标签编码器),OneHotEncoder(独热编码),PolynomialFeatures(多项式特性)等等,以及它的一些自己的如Categorizer(分类器),DummyEncoder(虚拟编码),OrdinalEncoder(序数编码器)等。 你可以像使用PandasDataFrame一样使用它们。 from dask_ml.preprocessing import RobustScalardf = da.read_csv("BigFile.csv", chunks=50000)rsc = RobustScalardf["column"] = rsc.fit_transform(df["column"]) 你可以使用Dask的DataFrame上的预处理方法,从Sklearn的Make_pipeline方法生成一个管道。 b)超参数搜索: Dask具有sklearn用于进行超参数搜索的方法,例如GridSearchCV,RandomizedSearchCV等等。 from dask_ml.datasets import make_regressionfrom dask_ml.model_selection import train_test_split, GridSearchCVX, y = make_regression(chunks=50000)xtr, ytr, xval, yval = test_train_split(X, y)gsearch = GridSearchCV(estimator, param_grid, cv=10)gsearch.fit(xtr, ytr) 而且,如果要partial_fit与估算器一起使用,则可以使用dask-ml的IncrementalSearchCV。 注意:(来自Dask)如果要使用后拟合任务(如评分和预测),则使用基础估计量评分方法。如果你的估算器(可能来自sklearn )无法处理大型数据集,则将估算器包装在" dask_ml.wrappers.ParallelPostFit" 周围。它可以并行化" predict"," predict_proba"," transform"等方法。 c)模型/估计器: Dask具有一些线性模型(的LinearRegression,LogisticRegression等),一些聚类模型(Kmeans和SpectralClustering),一种使用Tensorflow 的方法,使用Dask训练XGBoost模型的方法。 如果训练数据较小,则可以将sklearn的模型与结合使用Dask,如果ParallelPostFit包装数据较大,则可以与包装器一起使用(如果测试数据较大)。 from sklearn.linear_model import ElasticNetfrom dask_ml.wrappers import ParallelPostFitel = ParallelPostFit(estimator=ElasticNet)el.fit(Xtrain, ytrain)preds = el.predict(Xtest) 如果数据集不大但模型很大,则可以使用joblib。sklearns编写了许多用于并行执行的算法(你可能使用过n_jobs=-1参数),joblib该算法利用线程和进程来并行化工作负载。要用于Dask并行化,你可以创建一个Client(客户端)(必须),然后使用with joblib.parallel_backend('dask'):包装代码。 import dask_ml.joblibfrom sklearn.externals import joblibclient = Clientwith joblib.parallel_backend('dask'): # 你的 scikit-learn 代码 注意:DASK JOBLIB后端对于扩展CPU绑定的工作负载非常有用; 在RAM中包含数据集的工作负载,但具有许多可以并行完成的单独操作。要扩展到受RAM约束的工作负载(大于内存的数据集),你应该使用Dask的内置模型和方法。 而且,如果你训练的数据太大而无法容纳到内存中,那么你应该使用Dask的内置估算器来加快速度。你也可以使用Dask的wrapper.Incremental它使用基础估算器的partial_fit方法对整个数据集进行训练,但实际上是连续的。 Dask的内置估计器很好地扩展用于大型数据集与多种优化算法,如admm,lbfgs,gradient_descent等,并且正则化器如 L1,L2,ElasticNet等。 from dask_ml.linear_model import LogisticRegressionlr = LogisticRegressionlr.fit(X, y, solver="lbfgs") |
|
来自: 文炳春秋 > 《Python资料》