使用多进程库计算科学数据时出现内存错误

问题背景

我经常使用爬虫来做数据抓取,多线程爬虫方案是必不可少的,正如我在使用 Python 进行科学计算时,需要处理大量存储在 CSV 文件中的数据。由于每个处理过程需要很长时间才能完成,而您拥有多核处理器,所以您尝试使用多进程库中的 Pool 方法来提高计算效率。

您按照如下方式构建了多进程调用:

代码语言:javascript
复制
pool = Pool()
vector_components = []
for sample0 in range(samples):
    vector_field_x_i = vector_field_samples_x[sample]
    vector_field_y_i = vector_field_samples_y[sample]
    vector_component = pool.apply_async(vector_field_decomposer, args=(x_dim, y_dim, x_steps, y_steps,
                                                                       vector_field_x_i, vector_field_y_i))
    vector_components.append(vector_component)
pool.close()
pool.join()
​
vector_components = map(lambda k: k.get(), vector_components)
​
for vector_component in vector_components:
    CsvH.write_vector_field(vector_component, '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')

使用此代码,当您处理 500 个元素,每个元素大小为 100 x 100 的数据时,一切正常。但是,当您尝试处理 500 个元素,每个元素大小为 400 x 400 时,在调用 get() 时会收到内存错误。

解决方案

出现内存错误的原因是您的代码在内存中保留了多个列表,包括 vector_field_x、vector_field_y、vector_components,以及在 map() 调用期间创建的 vector_components 的副本。当您尝试处理较大的数据时,这些列表可能变得非常大,从而导致内存不足。

为了解决此问题,您需要避免在内存中保存完整的列表。您可以使用多进程库中的 imap() 方法来实现这一点。imap() 方法返回一个迭代器而不是完整的列表,因此您不必将所有结果都保存在内存中。

以下是使用 imap() 方法重写代码的示例:

代码语言:javascript
复制
import multiprocessing
from functools import partial
​
def vector_field_decomposer(x_dim, y_dim, x_steps, y_steps, vector_fields):
    vector_field_x_i = vector_fields[0]
    vector_field_y_i = vector_fields[1]
    # Do whatever is normally done here.
​
if __name__ == "__main__":
    num_workers = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(num_workers)
    # Calculate a good chunksize (based on implementation of pool.map)
    chunksize, extra = divmod(samples // 4 * num_workers)
    if extra:
        chunksize += 1
​
    # Use partial so many arguments can be passed to vector_field_decomposer
    func = partial(vector_field_decomposer, x_dim, y_dim, x_steps, y_steps)
    # We use a generator expression as an iterable, so we don't create a full list.
    results = pool.imap(func, 
                        ((vector_field_samples_x[s], vector_field_samples_y[s]) for s in xrange(samples)),
                        chunksize=chunksize)
    for vector in results:
        CsvH.write_vector_field(vector_component, 
                                '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
    pool.close()
    pool.join()

通过使用这种方法,您可以避免出现内存错误,并能够处理较大的数据。

请确保你的计算任务是可以并行化的,并且注意到在Windows系统上,mclapply可能不如在Unix-like系统(如Linux或Mac OS X)上有效。在Windows系统上,你可能需要使用parLapply函数来代替。如果有更多专业知识不懂得可以评论区一起讨论。