למה ה-scraper הסטנדרטי שלך זוחל?
בואו נהיה כנים. רוב ה-scrapers שנכתבים בתחילת הדרך נראים אותו דבר: לולאה שרצה על רשימת URL-ים, שולחת בקשה עם `requests`, מחכה לתשובה, מנתחת את ה-HTML, ושומרת. וזה עובד. בערך. עד שאתה צריך לגרד 100,000 עמודים ולא עשרה.
הבעיה היא שכל פעולת רשת היא בזבוז זמן טהור למעבד שלך. בזמן שהסקריפט שלך מחכה לתשובה מהשרת, המעבד יושב בחיבוק ידיים. הוא יכול היה לעשות מאות דברים אחרים בזמן הזה. זו הסיבה ש-scraping סינכרוני מרגיש כל כך איטי. הוא מבצע משימה אחת בלבד בכל רגע נתון, ומבלה 95% מהזמן בבהייה בקיר הדיגיטלי.
כאן `asyncio` נכנס לתמונה. זו לא מקביליות של threads או processes. זה משהו חכם יותר: ניהול משימות מבוסס אירועים (event loop). במקום לחכות, הקוד שלך אומר ל-event loop: "תתחיל את הבקשה הזאת, וכשהיא תחזור עם תשובה, תודיע לי. בינתיים, אני אמשיך לעבוד על משהו אחר". התוצאה היא קפיצת מדרגה בביצועים, במיוחד במשימות עתירות I/O כמו web scraping.
אבני הבניין: Queue, Semaphore, ו-gather
כדי לבנות pipeline אסינכרוני חזק, אנחנו צריכים שלושה כלים מרכזיים מהספרייה הסטנדרטית של פייתון:
asyncio.Queue: זה המסוע במפעל שלנו. הוא מאפשר ל-coroutine אחד (שלב ב-pipeline) להעביר עבודה בצורה בטוחה ל-coroutine אחר (השלב הבא). למשל, שלב ה-fetcher יכניס HTML גולמי לתור אחד, ושלב ה-parser ייקח אותו משם. זה מונע בלאגן ומפריד בין תחומי אחריות.asyncio.Semaphore: זה השומר בכניסה למועדון. הוא מגביל את כמות המשימות שיכולות לרוץ במקביל. למה זה קריטי? כי אם נשלח 500 בקשות במקביל לשרת אחד, סביר להניח שנקבל חסימה תוך שניות. סמפור מאפשר לנו לומר: "אני רוצה להריץ עד 10 בקשות רשת במקביל, ולא יותר". זה הכלי המרכזי שלנו לניהול rate limiting בצד הלקוח.asyncio.gather: זה המנהל שאוסף את כל העובדים (ה-coroutine workers שלנו) ומפעיל אותם יחד. הוא דואג שכל המשימות יתחילו לרוץ ומחכה שכולן יסתיימו לפני שהתוכנית ממשיכה.
השילוב של שלושתם מאפשר לנו לבנות מערכת שמזכירה פס ייצור: פריטים זורמים בין תחנות עבודה, כשכל תחנה עובדת בקצב שלה ובמקביל לאחרות, תוך שמירה על מגבלות המערכת.
שלב אחרי שלב: Fetch, Parse, ו-Persist
בואו נבנה pipeline קלאסי עם שלושה שלבים. נשתמש בספריית `aiohttp` לבקשות רשת אסינכרוניות ו-`lxml` לפענוח מהיר של HTML.
הקמת ה-Pipeline והתורים
דבר ראשון, נגדיר את התורים שיחברו בין השלבים שלנו ואת הסמפור שישלוט על קצב הבקשות.
import asyncio
import aiohttp
from lxml import html
async def main():
urls_to_fetch_queue = asyncio.Queue()
html_to_parse_queue = asyncio.Queue()
results_to_persist_queue = asyncio.Queue()
# נמלא את התור הראשוני עם כמה URL-ים לדוגמה
for i in range(20):
await urls_to_fetch_queue.put(f"http://example.com/page{i}")
# סמפור שיגביל אותנו ל-8 בקשות רשת במקביל
semaphore = asyncio.Semaphore(8)
# יצירת ה-workers שלנו
fetch_workers = [asyncio.create_task(fetcher(urls_to_fetch_queue, html_to_parse_queue, semaphore)) for _ in range(4)]
parse_workers = [asyncio.create_task(parser(html_to_parse_queue, results_to_persist_queue)) for _ in range(2)]
persist_worker = asyncio.create_task(persister(results_to_persist_queue))
# נחכה שהתור הראשוני יתרוקן וכל הפריטים יעובדו
await urls_to_fetch_queue.join()
await html_to_parse_queue.join()
await results_to_persist_queue.join()
# נבטל את ה-workers שרצים בלולאות אינסופיות
for worker in fetch_workers + parse_workers:
worker.cancel()
persist_worker.cancel()
print("Pipeline finished.")
if __name__ == "__main__":
asyncio.run(main())
שלב 1: ה-Fetcher
ה-fetcher לוקח URL מהתור, משתמש בסמפור כדי להגביל את עצמו, מבצע את בקשת הרשת עם `aiohttp`, ומעביר את ה-HTML הלאה.
async def fetcher(in_queue, out_queue, semaphore):
async with aiohttp.ClientSession() as session:
while True:
url = await in_queue.get()
try:
async with semaphore:
print(f"Fetching {url}")
async with session.get(url, timeout=10) as response:
if response.status == 200:
content = await response.text()
await out_queue.put((url, content))
else:
print(f"Error fetching {url}: Status {response.status}")
except Exception as e:
print(f"Exception fetching {url}: {e}")
finally:
in_queue.task_done()
שלבים 2 ו-3: ה-Parser וה-Persister
ה-parser מקבל HTML, מוציא ממנו את המידע הרלוונטי (כאן רק את הכותרת), ומעביר הלאה. ה-persister לוקח את התוצאה הסופית ושומר אותה (כאן רק מדפיס למסך, אבל בעולם האמיתי זה יהיה כתיבה למסד נתונים או לקובץ).
async def parser(in_queue, out_queue):
while True:
url, content = await in_queue.get()
try:
tree = html.fromstring(content)
title = tree.xpath('//h1/text()')
if title:
print(f"Parsed title from {url}")
await out_queue.put({"url": url, "title": title[0]})
except Exception as e:
print(f"Exception parsing {url}: {e}")
finally:
in_queue.task_done()
async def persister(in_queue):
while True:
result = await in_queue.get()
# כאן נכנסת הלוגיקה של שמירה ל-DB, קובץ CSV, וכו'.
print(f"PERSISTED: {result}")
in_queue.task_done()
היופי במודל הזה הוא הגמישות. אם ה-parsing הוא צוואר הבקבוק, אפשר פשוט להוסיף עוד `parse_workers`. אם הרשת איטית, אפשר להגדיל את מספר ה-`fetch_workers` ואת ערך הסמפור (בזהירות!). כל חלק במערכת עובד באופן עצמאי, מה שהופך את כל הארכיטקטורת scraping להרבה יותר עמידה וסקיילבילית.
איפה כל הסיפור הזה מתפוצץ בפנים
הבטחנו עולם ורוד של ביצועים, אבל יש מלכודת אחת קטלנית שיכולה להרוס הכל: קריאות חוסמות (blocking calls).
ה-event loop של `asyncio` הוא כמו אמן ג'אגלינג עם כדור אחד. הוא יכול לזרוק אותו באוויר (להתחיל פעולת I/O) ולתפוס אותו כשהוא נופל (כשהפעולה מסתיימת), ובזמן שהכדור באוויר הוא חופשי לעשות דברים אחרים. קריאה חוסמת היא כמו להכריח את אמן הג'אגלינג להחזיק את הכדור ביד ולא לעשות כלום. כל המערכת קופאת.
מהי קריאה חוסמת? כל דבר שלא משתמש ב-`await` ודורש זמן: `requests.get()`, `time.sleep()`, פעולות קריאה/כתיבה רגילות לקבצים, או חישובים כבדים שדורשים 100% CPU. אם תכניס אחת כזאת לתוך worker אסינכרוני, הרסת את כל המטרה. כל ה-workers האחרים יעצרו ויחכו שהקריאה החוסמת תסתיים. ראיתי פרויקטים שבהם `scraper` אסינכרוני עבד לאט יותר מגרסה סינכרונית פשוטה, רק בגלל קריאה חוסמת אחת שנשכחה בקוד.
תרחיש כישלון קלאסי
דמיין `persister` שכותב ל-SQLite. פעולת הכתיבה לדיסק היא חוסמת. אם 1000 תוצאות מגיעות לתור בבת אחת, ה-`persister` יתקע את ה-event loop בכל פעם שהוא כותב ל-DB. בינתיים, ה-`fetcher` וה-`parser` לא יכולים לעבוד, למרות שיש להם עוד עבודה בתורים שלהם. התורים מתמלאים, הזיכרון עולה, והביצועים צונחים. הפתרון הוא להריץ את הקוד החוסם ב-executor נפרד (thread pool) באמצעות `loop.run_in_executor`, אבל זה כבר נושא למאמר אחר.
ניהול שגיאות וחזרתיות (Retries)
בעולם האמיתי, בקשות נכשלות. שרתים מחזירים שגיאות 503, חיבורי רשת נופלים, וחשוב מכל, אנחנו נתקלים ב-rate limiting. pipeline חזק חייב לטפל בזה. במקום פשוט להדפיס שגיאה, ה-`fetcher` שלנו צריך להיות חכם יותר.
כשנתקלים בשגיאת רשת זמנית או בתשובת 429 Too Many Requests, הפתרון הנכון הוא לא לוותר. אפשר להחזיר את ה-URL לתור המקורי עם מונה ניסיונות. לדוגמה, אם בקשה נכשלת, מכניסים לתור元-tuple כמו `(url, retry_count + 1)`. ה-worker שייקח את המשימה הזו יראה שניסיון קודם נכשל ויוכל להמתין זמן קצר (exponential backoff) לפני שהוא מנסה שוב. אחרי 3-5 ניסיונות כושלים, אפשר להעביר את ה-URL לתור שגיאות נפרד לבדיקה ידנית.
מתי לא להשתמש ב-Asyncio?
למרות כל היתרונות, `asyncio` הוא לא פתרון קסם לכל בעיה. אם ה-scraper שלך מבלה את רוב זמנו בחישובים כבדים על המידע שהוא מוריד (CPU-bound) ולא בהמתנה לרשת (I/O-bound), `asyncio` לא יעזור לך הרבה. למשל, אם אתה מריץ מודלים של NLP על כל עמוד שאתה מוריד, צוואר הבקבוק הוא המעבד, לא הרשת.
במקרים כאלה, ארכיטקטורה מבוססת `multiprocessing` תהיה יעילה בהרבה, כי היא מאפשרת להריץ קוד על מספר ליבות מעבד במקביל. לפעמים הפתרון הטוב ביותר הוא שילוב: `asyncio` ל-fetching מהיר על ליבה אחת, והעברת העבודה עתירת ה-CPU ל-process pool נפרד. כלים כמו Scrapy עושים את זה מאחורי הקלעים בצורה אלגנטית.
שאלות נפוצות
ההבדל המרכזי הוא במודל המקביליות. Multi-threading משתמש במספר תהליכונים (threads) שרצים במקביל ומנוהלים על ידי מערכת ההפעלה, מה שמוסיף תקורה של החלפת הקשר (context switching). לעומת זאת, asyncio scraping פועל על תהליכון יחיד ומנהל משימות רבות על ידי החלפה ביניהן כשהן ממתינות לפעולות I/O. עבור scraping, שרובו המתנה לרשת, asyncio בדרך כלל יעיל וחסכוני יותר במשאבים מאשר ניהול של מאות threads.
קביעת גודל הסמפור היא אמנות יותר ממדע ודורשת ניסוי וטעייה. התחל עם ערך שמרני כמו 5-10 בקשות במקביל. עקוב אחר אחוזי ההצלחה של הבקשות שלך; אם אתה רואה הרבה שגיאות 429 או 503, הקטן את הערך. אם הכל יציב וה-CPU נמוך, נסה להגדיל אותו בהדרגה. הגודל האופטימלי תלוי לחלוטין בשרת היעד - יש אתרים שיחסמו אותך אחרי 5 בקשות במקביל, ויש כאלה שיעמדו גם ב-50.
טכנית כן, אבל זו טעות קריטית שהורגת את הביצועים. קריאה ל-`requests.get()` היא פעולה חוסמת שתקפיא את ה-event loop כולו, ותבטל את כל היתרון של asyncio. אם אתה חייב להשתמש בספרייה חוסמת, עליך לעטוף את הקריאה ב-`loop.run_in_executor()`, מה שיריץ אותה ב-thread pool נפרד וימנע את חסימת הלולאה הראשית. לספריות פופולריות כמו `requests` יש חלופות אסינכרוניות מצוינות כמו `aiohttp` או `httpx`.
זו בעיה קלאסית של 'backpressure' וזו בדיוק הסיבה שאנחנו משתמשים ב-Queues. אם שלב הפענוח איטי יותר, ה-`html_to_parse_queue` יתחיל להתמלא ב-HTML שממתין לעיבוד. אם התור גדל ללא הגבלה, הוא עלול לצרוך את כל הזיכרון הזמין. ניתן לפתור זאת על ידי הגבלת גודל התור (למשל `asyncio.Queue(maxsize=1000)`), מה שיגרום ל-fetchers להמתין כשהתור מלא, או על ידי הקצאת יותר workers לשלב האיטי, למשל 4 `parse_workers` מול 2 `fetch_workers`.
לא באופן ישיר. Asyncio הוא כלי לניהול יעיל של בקשות, לא כלי לעקיפת הגנות. חסימות כמו Cloudflare מנתחות מאפיינים של הבקשה עצמה - טביעת אצבע של ה-TLS, כותרות HTTP, והתנהגות JavaScript. Asyncio יכול להחמיר את המצב אם תשלח בקשות מהר מדי ותפעיל rate limiting. כדי להתמודד עם הגנות מתקדמות, תצטרך כלים וטכניקות אחרות, כמו שימוש ב-Playwright עם יכולות התחמקות או שירותי <a href="/blog/bypass-cloudflare-2025">עקיפת Cloudflare</a> ייעודיים, במקביל לארכיטקטורה האסינכרונית שלך.
