暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

python处理大文件速度慢怎么办?

程序员学长 2023-02-28
628

大家好,我是小寒。

在使用 python 处理文件时,有时候会遇到处理超大型文件的情况,这会导致处理时间成倍的增加。
今天,我们将学习如何使用 「multiprocessing」「joblib」 和 「tqdm」 Python 包减少大文件的处理时间。

如果觉得不错,点赞、收藏安排起来吧。

数据集

我们将使用来自 Kaggle 的 「美国事故 (2016 - 2021)」 数据集,该数据集包含 280 万条记录和 47 列。
# Parallel Computing
import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm

# Data Ingestion 
import pandas as pd

# Text Processing 
import re 
from nltk.corpus import stopwords
import string

在我们开始之前,让我们设置n_workers为2倍的cpu核数。

n_workers = 2 * mp.cpu_count()

print(f"{n_workers} workers are available")

8 workers are available

在下一步中,我们将使用 「pandas」 read_csv
 函数提取大型 CSV 文件。然后,打印出 dataframe 的形状、列名和处理时间。
%%time
file_name="./US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)

print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

输出:

Shape:(2845342, 47)

Column Names:
Index(['ID''Severity''Start_Time''End_Time''Start_Lat''Start_Lng',
       'End_Lat''End_Lng''Distance(mi)''Description''Number''Street',
       'Side''City''County''State''Zipcode''Country''Timezone',
       'Airport_Code''Weather_Timestamp''Temperature(F)''Wind_Chill(F)',
       'Humidity(%)''Pressure(in)''Visibility(mi)''Wind_Direction',
       'Wind_Speed(mph)''Precipitation(in)''Weather_Condition''Amenity',
       'Bump''Crossing''Give_Way''Junction''No_Exit''Railway',
       'Roundabout''Station''Stop''Traffic_Calming''Traffic_Signal',
       'Turning_Loop''Sunrise_Sunset''Civil_Twilight''Nautical_Twilight',
       'Astronomical_Twilight'],
      dtype='object')

CPU times: total: 17.2 s
Wall time: 17.2 s

数据清洗

首先定义一个用于处理和清理文本的简单函数 clean_text

我们将使用它从文本行中过滤掉停用词。之后,我们将从句子中删除特殊字符和多余空格。
它将是确定「串行」「并行」「批处理」的处理时间的基线函数。
def clean_text(text): 
  # Remove stop words
  stops = stopwords.words("english")
  text = " ".join([word for word in text.split() if word not in stops])
  # Remove Special Characters
  text = text.translate(str.maketrans('''', string.punctuation))
  # removing the extra spaces
  text = re.sub(' +',' ', text)
  return text

串行处理

使用 pandas 处理这 280 万条记录,并将结果保存回 “Description”列。
%%time
tqdm.pandas()

df['Description'] = df['Description'].progress_apply(clean_text)

输出:

「处理 280 万行 需要 9 分 5 秒。」

CPU times: total: 8 min 14s
Wall time: 9 min 5 s

并行处理

有多种方法可以并行处理文件,我们将了解所有这些方法。

Multiprocessing

multiprocessing
 是一个内置的python包,常用于并行处理大文件。 

我们将创建一个具有「8 个 worker 的多处理池,并使用map函数启动进程。并且使用tqdm来显示进度条。」

%%time
p = mp.Pool(n_workers) 

df['Description'] = p.map(clean_text,tqdm(df['Description']))

输出:
我们将处理时间缩短了近「3 倍」。处理时间从「9 分 5 秒」下降到「3分51秒」
100%  2845342/2845342 [02:58<00:00, 135646.12it/s]
CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s

Parallel

这里我们将使用 joblib 的 「Parallel」 和 「delayed」 函数。

  • Parallel 需要两个参数:n_jobs 和 backend
  • 然后,我们将「clean_text 添加到 delayed」 函数 中。
  • 创建一个循环,一次提供一个值。
def text_parallel_clean(array):
  result = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(clean_text)
  (text) 
  for text in tqdm(array)
  )
  return result

%%time
df['Description'] = text_parallel_clean(df['Description'])

输出:

这里比 Multiprocessing 多花了 13 秒。即使那样,并行处理也比串行处理快 4 分 59 秒。

100%  2845342/2845342 [04:03<00:00, 10514.98it/s]

CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s

Parallel Batch Processing

有一种更好的方法来处理大文件,方法是将它们分成批次,然后进行并行处理。

批处理函数
def proc_batch(batch):
  return [
  clean_text(text)
  for text in batch
  ]

将文件切割

下面的函数将根据 n_workers 数量将文件分成多个批次。在我们的例子中,我们得到 8 个批次。

def batch_file(array,n_workers):
  file_len = len(array)
  batch_size = round(file_len / n_workers)
  batches = [
  array[ix:ix+batch_size]
  for ix in tqdm(range(0, file_len, batch_size))
  ]
  return batches

batches = batch_file(df['Description'],n_workers)

运行并行批处理
最后,我们将使用「Parallel」「delayed」来处理批处理。
%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(proc_batch)
  (batch) 
  for batch in tqdm(batches)
  )

df['Description'] = [j for i in batch_output for j in i]

输出:

「我们改进了处理时间。该技术以处理复杂数据和训练深度学习模型而闻名。」

100%  8/8 [00:00<00:00, 2.19it/s]

CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s

tqdm 并发

tqdm 将多处理提升到一个新的水平。它简单而强大。

我会把它推荐给每一位数据科学家。 
%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)

df["Description"] = process_map(
    clean_text, df["Description"], max_workers=n_workers, chunksize=batch
)

输出:
「通过一行代码,我们得到了最好的结果。」
100%  2845342/2845342 [03:48<00:00, 1426320.93it/s]

CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s 

你需要找到一个平衡点并选择最适合你情况的技术。
「它可以是串行处理、并行处理或批处理。如果你使用的是较小、不太复杂的数据集,则并行处理可能适得其反。」
       

最后

今天的分享就到这里。如果觉得不错,点赞,转发安排起来吧。
接下来我们会分享更多的 「深度学习案例以及python相关的技术」,欢迎大家关注。
最后,最近新建了一个 python 学习交流群,会经常分享 「python相关学习资料,也可以问问题,非常棒的一个群」

「进群方式:加我微信,备注 “python”」



往期回顾


Fashion-MNIST 服装图片分类-Pytorch实现

python 探索性数据分析(EDA)案例分享

深度学习案例分享 | 房价预测 - PyTorch 实现

万字长文 |  面试高频算法题之动态规划系列

面试高频算法题之回溯算法(全文六千字)  

    




如果对本文有疑问可以加作者微信直接交流。进技术交流群的可以拉你进群。

文章转载自程序员学长,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论