隊列集?
asyncio 隊列被設(shè)計成與 queue
模塊類似。盡管 asyncio隊列不是線程安全的,但是他們是被設(shè)計專用于 async/await 代碼。
注意asyncio 的隊列沒有 timeout 形參;請使用 asyncio.wait_for()
函數(shù)為隊列添加超時操作。
參見下面的 Examples 部分。
Queue?
- class asyncio.Queue(maxsize=0)?
先進,先出(FIFO)隊列
如果 maxsize 小于等于零,則隊列尺寸是無限的。如果是大于
0
的整數(shù),則當(dāng)隊列達到 maxsize 時,await put()
將阻塞至某個元素被get()
取出。不像標準庫中的并發(fā)型
queue
,隊列的尺寸一直是已知的,可以通過調(diào)用qsize()
方法返回。在 3.10 版更改: Removed the loop parameter.
這個類不是線程安全的(not thread safe)。
- maxsize?
隊列中可存放的元素數(shù)量。
- empty()?
如果隊列為空返回
True
,否則返回False
。
- coroutine get()?
從隊列中刪除并返回一個元素。如果隊列為空,則等待,直到隊列中有元素。
- get_nowait()?
立即返回一個隊列中的元素,如果隊列內(nèi)有值,否則引發(fā)異常
QueueEmpty
。
- coroutine join()?
阻塞至隊列中所有的元素都被接收和處理完畢。
當(dāng)條目添加到隊列的時候,未完成任務(wù)的計數(shù)就會增加。每當(dāng)消費協(xié)程調(diào)用
task_done()
表示這個條目已經(jīng)被回收,該條目所有工作已經(jīng)完成,未完成計數(shù)就會減少。當(dāng)未完成計數(shù)降到零的時候,join()
阻塞被解除。
- coroutine put(item)?
添加一個元素進隊列。如果隊列滿了,在添加元素之前,會一直等待空閑插槽可用。
- qsize()?
返回隊列用的元素數(shù)量。
- task_done()?
表明前面排隊的任務(wù)已經(jīng)完成,即get出來的元素相關(guān)操作已經(jīng)完成。
由隊列使用者控制。每個
get()
用于獲取一個任務(wù),任務(wù)最后調(diào)用task_done()
告訴隊列,這個任務(wù)已經(jīng)完成。如果
join()
當(dāng)前正在阻塞,在所有條目都被處理后,將解除阻塞(意味著每個put()
進隊列的條目的task_done()
都被收到)。如果被調(diào)用的次數(shù)多于放入隊列中的項目數(shù)量,將引發(fā)
ValueError
。
優(yōu)先級隊列?
后進先出隊列?
異常?
- exception asyncio.QueueEmpty?
當(dāng)隊列為空的時候,調(diào)用
get_nowait()
方法而引發(fā)這個異常。
- exception asyncio.QueueFull?
當(dāng)隊列中條目數(shù)量已經(jīng)達到它的 maxsize 的時候,調(diào)用
put_nowait()
方法而引發(fā)的異常。
例子?
隊列能被用于多個的并發(fā)任務(wù)的工作量分配:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())