Python并发中的Future
本文由AI辅助创作
在 Python 的 concurrent.futures
模块中,Future
是一种对象,用于表示异步操作的结果。你可以将其视为一个占位符,用于在某个任务执行完成之前存储任务的结果。Future
对象允许你以非阻塞的方式提交任务,并在将来检查任务的状态、获取结果,或等待任务完成。
1. Future
的基本功能:
- 提交任务:当你通过
ThreadPoolExecutor.submit()
或ProcessPoolExecutor.submit()
提交一个任务时,返回的就是一个Future
对象。 - 检查状态:可以通过
Future
对象检查任务是否完成,是否出错等。 - 获取结果:可以通过调用
Future.result()
来获取任务的执行结果。如果任务尚未完成,result()
会阻塞,直到任务完成为止。
2. Future
的常用方法:
result(timeout=None)
:返回任务的结果。如果任务尚未完成,它将阻塞直到任务完成或超时。如果任务失败,它会抛出相应的异常。done()
:检查任务是否已经完成。返回True
表示任务已经完成,False
表示任务尚未完成。cancel()
:尝试取消尚未开始的任务。如果任务已经运行,则无法取消。cancelled()
:检查任务是否被取消。exception(timeout=None)
:如果任务引发了异常,返回该异常。如果任务未完成,它将等待,直到任务完成或超时。
3. 等待 Future
完成的几种方式:
方法 1:使用 result()
阻塞等待
最直接的方式是调用 Future.result()
,它会阻塞当前线程,直到任务完成并返回结果。
from concurrent.futures import ThreadPoolExecutor
import time
def task():
time.sleep(2)
return "Task completed!"
with ThreadPoolExecutor() as executor:
future = executor.submit(task)
# 阻塞等待任务完成,获取结果
result = future.result()
print(result)
在这个例子中,future.result()
阻塞当前线程,直到任务完成,返回 "Task completed!"
。
方法 2:使用 done()
检查任务是否完成
你可以用 done()
方法非阻塞地检查任务是否完成。
from concurrent.futures import ThreadPoolExecutor
import time
def task():
time.sleep(2)
return "Task completed!"
with ThreadPoolExecutor() as executor:
future = executor.submit(task)
while not future.done():
print("Waiting for task to complete...")
time.sleep(0.5)
print(f"Task result: {future.result()}")
在这个例子中,程序会持续检查任务是否完成,而不会阻塞主线程。
方法 3:使用 as_completed()
等待多个 Future
完成
如果你有多个 Future
,可以使用 concurrent.futures.as_completed()
来依次处理每个任务的完成。它会以任务完成的顺序返回 Future
。
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(n):
time.sleep(n)
return f"Task {n} completed!"
with ThreadPoolExecutor() as executor:
futures = [executor.submit(task, i) for i in range(1, 4)]
# 按照任务完成的顺序处理结果
for future in as_completed(futures):
print(future.result())
在这个例子中,as_completed(futures)
返回一个生成器,按任务完成的顺序返回 Future
,而不必等待所有任务完成。
方法 4:使用 wait()
来等待多个 Future
完成
concurrent.futures.wait()
允许你等待多个 Future
,并根据是否完成进行分类。
from concurrent.futures import ThreadPoolExecutor, wait
import time
def task(n):
time.sleep(n)
return f"Task {n} completed!"
with ThreadPoolExecutor() as executor:
futures = [executor.submit(task, i) for i in range(1, 4)]
# 等待所有任务完成
done, not_done = wait(futures)
# 处理完成的任务
for future in done:
print(future.result())
在这个例子中,wait()
返回两个集合:已完成任务的 Future
和未完成任务的 Future
。
4. 使用 timeout
来设置等待时间
你可以在 result()
或 exception()
方法中设置 timeout
参数,以防止程序长时间阻塞等待任务完成。超时后,它会抛出 concurrent.futures.TimeoutError
。
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def task():
time.sleep(2)
return "Task completed!"
with ThreadPoolExecutor() as executor:
future = executor.submit(task)
try:
# 等待 1 秒,如果任务未完成将引发 TimeoutError
result = future.result(timeout=1)
print(result)
except TimeoutError:
print("The task did not complete in the given time.")
在这个例子中,任务需要 2 秒完成,但我们只等待 1 秒,因此会抛出 TimeoutError
。
总结:
Future
是 Python 中处理并发任务的一个强大工具,它允许你异步执行任务并在任务完成后获取结果或处理异常。你可以使用 result()
阻塞等待,使用 done()
非阻塞检查,或使用 as_completed()
和 wait()
来等待多个 Future
。
等待多个Future组成的list
在 Python 中,如果你有一个 Future
数组(即多个 Future
对象),你可以使用 concurrent.futures.wait()
或 concurrent.futures.as_completed()
来等待所有 Future
完成并收集结果。
方法 1:使用 wait()
等待所有 Future
完成并收集结果
concurrent.futures.wait()
会阻塞当前线程,直到所有 Future
完成。完成后,你可以遍历这些 Future
并提取结果。
from concurrent.futures import ThreadPoolExecutor, wait
def task(n):
return f"Task {n} result"
# 创建一个线程池
with ThreadPoolExecutor() as executor:
# 提交多个任务并收集 Future 对象
futures = [executor.submit(task, i) for i in range(5)]
# 使用 wait 等待所有任务完成
done, _ = wait(futures)
# 提取所有完成任务的结果
results = [future.result() for future in done]
print("All tasks completed.")
print("Results:", results)
说明:
wait(futures)
返回两个集合:done
(已完成的Future
)和not_done
(未完成的Future
)。- 使用
result()
提取每个Future
的结果。
方法 2:使用 as_completed()
逐步获取完成的 Future
并收集结果
concurrent.futures.as_completed()
返回一个生成器,按任务完成的顺序返回 Future
。你可以依次从生成器中获取结果,并将它们收集到一个数组中。
from concurrent.futures import ThreadPoolExecutor, as_completed
def task(n):
return f"Task {n} result"
# 创建一个线程池
with ThreadPoolExecutor() as executor:
# 提交多个任务并收集 Future 对象
futures = [executor.submit(task, i) for i in range(5)]
# 使用 as_completed 获取每个完成的 Future 的结果
results = [future.result() for future in as_completed(futures)]
print("All tasks completed.")
print("Results:", results)
说明:
as_completed(futures)
生成一个按任务完成顺序的Future
对象,允许你在任务完成时立即处理结果。- 和
wait()
不同,as_completed()
允许你在每个任务完成时立刻处理该任务的结果,而不需要等待所有任务都完成。
总结:
- 如果你希望一次性等待所有
Future
完成并收集结果,wait()
是一个合适的选择。 - 如果你想在每个任务完成时就立即处理结果,
as_completed()
提供了更高的灵活性。
两种方法的返回结果顺序不同:
wait()
返回的结果顺序与Future
提交的顺序一致。as_completed()
返回的结果顺序与任务实际完成的顺序一致。
线程安全地将结果写入csv中
在并行处理环境下,多个线程可能同时尝试写入同一个文件(如 CSV 文件),这可能导致数据竞争或文件损坏。要确保线程安全地写入 CSV,你有几种方法可选:
1. 使用线程锁(Lock)
在 Python 中,可以使用 threading.Lock
来确保只有一个线程可以同时写入文件。通过在写入 CSV 文件时加锁,可以防止多个线程同时操作同一个文件,确保写入操作的原子性。
示例代码:
import csv
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
import time
# 创建一个全局锁
lock = Lock()
# 模拟文件处理函数,返回处理结果
def process_file(file_name):
time.sleep(1) # 模拟处理时间
return [file_name, "Processed"] # 结果作为一个列表返回,方便写入csv
# 线程安全的写入csv函数
def write_to_csv(file_name, data):
with lock: # 加锁确保线程安全
with open(file_name, mode='a', newline='') as file:
writer = csv.writer(file)
writer.writerow(data)
print(f"Wrote {data} to {file_name}")
# 创建一个线程池
with ThreadPoolExecutor() as executor:
file_names = ['file1', 'file2', 'file3', 'file4']
# 提交任务并在完成后将结果写入csv
futures = [executor.submit(process_file, file_name) for file_name in file_names]
for future in futures:
result = future.result() # 获取处理结果
write_to_csv('output.csv', result) # 将处理结果写入csv
解释:
lock
:在写入 CSV 文件时加锁,确保只有一个线程可以写入,避免数据竞争。write_to_csv
函数:每次写入时都获得锁,从而保证不会有多个线程同时写入文件。- 文件写入模式:使用
mode='a'
以追加模式打开文件,避免覆盖已有数据。
2. 使用 queue.Queue
收集结果,并由单独的线程写入
另一个方法是让各个线程将处理结果放入线程安全的 queue.Queue
中,然后使用一个单独的线程负责从队列中读取结果并写入 CSV。这样可以完全避免多个线程同时操作文件。
示例代码:
import csv
import queue
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
import time
# 线程安全的队列,用于存储处理结果
result_queue = queue.Queue()
# 模拟文件处理函数,返回处理结果
def process_file(file_name):
time.sleep(1) # 模拟处理时间
result = [file_name, "Processed"] # 结果作为一个列表返回,方便写入csv
result_queue.put(result) # 将结果放入队列中
# 单独的线程从队列中读取结果并写入CSV
def csv_writer_thread(file_name):
with open(file_name, mode='a', newline='') as file:
writer = csv.writer(file)
while True:
result = result_queue.get()
if result is None: # 如果收到停止信号,结束写入
break
writer.writerow(result)
print(f"Wrote {result} to {file_name}")
result_queue.task_done()
# 创建线程池
with ThreadPoolExecutor() as executor:
file_names = ['file1', 'file2', 'file3', 'file4']
# 创建一个单独的线程负责写入csv
writer_thread = Thread(target=csv_writer_thread, args=('output.csv',), daemon=True)
writer_thread.start()
# 提交任务
futures = [executor.submit(process_file, file_name) for file_name in file_names]
# 等待所有任务完成
for future in futures:
future.result()
# 向队列发送None,通知写入线程结束
result_queue.put(None)
# 等待队列中的所有任务完成
result_queue.join()
# 等待写入线程结束
writer_thread.join()
解释:
queue.Queue
:一个线程安全的队列,所有线程都可以将处理结果安全地放入该队列。csv_writer_thread
:一个独立的线程持续从队列中获取处理结果并写入 CSV 文件。当所有任务完成时,向队列发送None
信号,告诉写入线程结束。- 队列的优点:各个线程只负责处理和将结果放入队列,写入操作由单独的线程完成,避免了文件写入冲突。
3. 使用 pandas
处理并发写入(适合较大规模数据)
如果你正在处理大规模数据,并且需要更复杂的数据处理和写入逻辑,可以使用 pandas
来批量写入数据。你可以让每个线程将结果写入内存中的 DataFrame
,然后在所有任务完成后一次性写入 CSV 文件。这种方法适用于大量数据需要合并写入的场景。
示例代码:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import time
# 用于存储每个线程处理的结果
results = []
# 模拟文件处理函数,返回处理结果
def process_file(file_name):
time.sleep(1) # 模拟处理时间
return {'file_name': file_name, 'status': 'Processed'}
# 创建线程池
with ThreadPoolExecutor() as executor:
file_names = ['file1', 'file2', 'file3', 'file4']
# 提交任务并收集结果
futures = [executor.submit(process_file, file_name) for file_name in file_names]
for future in futures:
result = future.result() # 获取处理结果
results.append(result) # 将结果添加到列表中
# 使用 pandas 将结果一次性写入csv
df = pd.DataFrame(results)
df.to_csv('output.csv', index=False)
print("All results written to output.csv")
解释:
results
列表:每个线程将处理结果存入内存中的列表中,最终由pandas
统一处理写入 CSV。pandas.DataFrame
:处理大规模数据时,pandas
能高效地进行数据整合和写入。- 优点:所有线程的结果都存储在内存中,最终一次性批量写入,减少了频繁的文件 IO 操作。
总结:
- 锁机制:适合小规模的并发写入任务,直接使用
Lock
来保护写入操作。 - 队列机制:适合复杂的并发场景,使用队列让多个线程处理任务,单独一个线程安全地进行文件写入。
- 批量写入:适合大规模数据场景,使用
pandas
等工具将数据批量写入以提高效率。
你可以根据你的实际需求选择最适合的方法。