大家好,我是小寒。
如果觉得不错,点赞、收藏安排起来吧。
数据集
# Parallel Computing
import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm
# Data Ingestion
import pandas as pd
# Text Processing
import re
from nltk.corpus import stopwords
import string
在我们开始之前,让我们设置n_workers为2倍的cpu核数。
n_workers = 2 * mp.cpu_count()
print(f"{n_workers} workers are available")
8 workers are available
read_csv函数提取大型 CSV 文件。然后,打印出 dataframe 的形状、列名和处理时间。
%%time
file_name="./US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)
print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")
输出:
Shape:(2845342, 47)
Column Names:
Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
'Astronomical_Twilight'],
dtype='object')
CPU times: total: 17.2 s
Wall time: 17.2 s
数据清洗
首先定义一个用于处理和清理文本的简单函数 clean_text
。
def clean_text(text):
# Remove stop words
stops = stopwords.words("english")
text = " ".join([word for word in text.split() if word not in stops])
# Remove Special Characters
text = text.translate(str.maketrans('', '', string.punctuation))
# removing the extra spaces
text = re.sub(' +',' ', text)
return text
串行处理
%%time
tqdm.pandas()
df['Description'] = df['Description'].progress_apply(clean_text)
输出:
「处理 280 万行 需要 9 分 5 秒。」
CPU times: total: 8 min 14s
Wall time: 9 min 5 s
并行处理
有多种方法可以并行处理文件,我们将了解所有这些方法。
Multiprocessing
multiprocessing
是一个内置的python包,常用于并行处理大文件。
我们将创建一个具有「8 个 worker 的多处理池,并使用map函数启动进程。并且使用tqdm来显示进度条。」
%%time
p = mp.Pool(n_workers)
df['Description'] = p.map(clean_text,tqdm(df['Description']))
100% 2845342/2845342 [02:58<00:00, 135646.12it/s]
CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s
Parallel
这里我们将使用 joblib 的 「Parallel」 和 「delayed」 函数。
Parallel 需要两个参数:n_jobs 和 backend 。 然后,我们将「clean_text 添加到 delayed」 函数 中。 创建一个循环,一次提供一个值。
def text_parallel_clean(array):
result = Parallel(n_jobs=n_workers,backend="multiprocessing")(
delayed(clean_text)
(text)
for text in tqdm(array)
)
return result
%%time
df['Description'] = text_parallel_clean(df['Description'])
这里比 Multiprocessing 多花了 13 秒。即使那样,并行处理也比串行处理快 4 分 59 秒。
100% 2845342/2845342 [04:03<00:00, 10514.98it/s]
CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s
Parallel Batch Processing
有一种更好的方法来处理大文件,方法是将它们分成批次,然后进行并行处理。
批处理函数
def proc_batch(batch):
return [
clean_text(text)
for text in batch
]
将文件切割
下面的函数将根据 n_workers 数量将文件分成多个批次。在我们的例子中,我们得到 8 个批次。
def batch_file(array,n_workers):
file_len = len(array)
batch_size = round(file_len / n_workers)
batches = [
array[ix:ix+batch_size]
for ix in tqdm(range(0, file_len, batch_size))
]
return batches
batches = batch_file(df['Description'],n_workers)
运行并行批处理
%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
delayed(proc_batch)
(batch)
for batch in tqdm(batches)
)
df['Description'] = [j for i in batch_output for j in i]
「我们改进了处理时间。该技术以处理复杂数据和训练深度学习模型而闻名。」
100% 8/8 [00:00<00:00, 2.19it/s]
CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s
tqdm 并发
tqdm 将多处理提升到一个新的水平。它简单而强大。
%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)
df["Description"] = process_map(
clean_text, df["Description"], max_workers=n_workers, chunksize=batch
)
100% 2845342/2845342 [03:48<00:00, 1426320.93it/s]
CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s
最后
—
「进群方式:加我微信,备注 “python”」

往期回顾







如果对本文有疑问可以加作者微信直接交流。进技术交流群的可以拉你进群。

文章转载自程序员学长,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




