Home » Python » Python爬虫:aiohttp异步网络请求库

Python爬虫:aiohttp异步网络请求库

·

引言

在互联网时代,数据采集已成为各行各业不可或缺的技能。Python凭借其简洁的语法和丰富的生态系统,成为网络爬虫开发的首选语言。在众多Python爬虫库中,aiohttp以其强大的异步处理能力和优秀的性能表现,成为开发高效网络爬虫的重要工具。本文将深入介绍aiohttp库的使用方法,帮助初学者快速掌握这一强大的异步网络请求库。

aiohttp简介

aiohttp是一个支持异步HTTP请求的Python库,它基于asyncio构建,不仅可以用作客户端发送请求,还能作为服务器处理请求。在爬虫领域,aiohttp的异步特性使其能够同时处理多个网络请求,大大提升了数据采集的效率。

主要特点

  1. 异步处理: 基于Python的asyncio框架,支持异步IO操作
  2. 高性能: 相比传统的同步请求库,能够更高效地处理并发请求
  3. 灵活性: 支持WebSocket,支持作为客户端和服务器端使用
  4. 现代化: 完全支持HTTP/1.1,并提供了现代化的API设计

安装方法

pip install aiohttp
# 安装加速插件 [aiodns + Brotli]
pip install aiohttp[speedups]

基础使用方法

让我们从一个简单的示例开始,展示如何使用aiohttp发送GET请求:

import aiohttp
import asyncio

async def fetch_page(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            # 确保响应状态码正常
            if response.status == 200:
                return await response.text()
            return None

async def main():
    url = "https://api.github.com/repos/aio-libs/aiohttp"
    content = await fetch_page(url)
    print(content)

# 运行异步函数
asyncio.run(main())

在上面的代码中,我们创建了一个异步函数fetch_page来获取网页内容。aiohttp.ClientSession()创建了一个会话对象,类似于requests库中的session,用于管理连接池和cookie。

进阶特性

并发请求处理

aiohttp的真正威力在于处理并发请求。以下示例展示如何同时处理多个URL:

import aiohttp
import asyncio
import time

async def fetch_page(session, url):
    async with session.get(url) as response:
        if response.status == 200:
            return await response.text()
        return None

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.create_task(fetch_page(session, url))
            tasks.append(task)
        # 等待所有任务完成
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        "https://api.github.com/repos/aio-libs/aiohttp",
        "https://api.github.com/repos/python/cpython",
        "https://api.github.com/repos/pallets/flask"
    ]
    
    start_time = time.time()
    results = await fetch_all(urls)
    end_time = time.time()
    
    print(f"获取{len(urls)}个页面耗时: {end_time - start_time:.2f}秒")
    
    for url, result in zip(urls, results):
        print(f"URL: {url}, 内容长度: {len(result) if result else 0}")

asyncio.run(main())

自定义请求头和超时设置

在实际爬虫开发中,经常需要设置自定义请求头和超时时间:

import aiohttp
import asyncio
from aiohttp import ClientTimeout

async def fetch_with_headers(url):
    # 设置请求头
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
    }
    
    # 设置超时时间为10秒
    timeout = ClientTimeout(total=10)
    
    async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
        try:
            async with session.get(url) as response:
                if response.status == 200:
                    return await response.text()
        except asyncio.TimeoutError:
            print(f"请求超时: {url}")
        except Exception as e:
            print(f"发生错误: {e}")
        return None

async def main():
    url = "https://api.github.com/repos/aio-libs/aiohttp"
    content = await fetch_with_headers(url)
    if content:
        print(f"成功获取内容,长度: {len(content)}")

asyncio.run(main())

错误处理和重试机制

在实际爬虫应用中,错误处理和重试机制是非常重要的:

import aiohttp
import asyncio
import time

async def fetch_with_retry(url, max_retries=3, delay=1):
    for attempt in range(max_retries):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    if response.status == 200:
                        return await response.text()
                    elif response.status == 429:  # Too Many Requests
                        print(f"达到请求限制,等待后重试...")
                        await asyncio.sleep(delay * (attempt + 1))
                        continue
                    else:
                        print(f"HTTP错误: {response.status}")
                        return None
        except aiohttp.ClientError as e:
            print(f"尝试 {attempt + 1}/{max_retries} 失败: {str(e)}")
            if attempt < max_retries - 1:
                await asyncio.sleep(delay * (attempt + 1))
            continue
        except Exception as e:
            print(f"未知错误: {str(e)}")
            return None
    
    print(f"达到最大重试次数,URL: {url}")
    return None

实际应用案例:GitHub仓库信息采集

下面是一个完整的实例,展示如何使用aiohttp采集GitHub仓库信息:

import aiohttp
import asyncio
import json
from datetime import datetime

async def fetch_repo_info(session, repo_name):
    url = f"https://api.github.com/repos/{repo_name}"
    headers = {
        'Accept': 'application/vnd.github.v3+json',
        'User-Agent': 'Python/aiohttp'
    }
    
    try:
        async with session.get(url, headers=headers) as response:
            if response.status == 200:
                data = await response.json()
                return {
                    'name': data['full_name'],
                    'stars': data['stargazers_count'],
                    'forks': data['forks_count'],
                    'last_update': data['updated_at']
                }
            else:
                print(f"获取仓库信息失败: {repo_name}, 状态码: {response.status}")
                return None
    except Exception as e:
        print(f"处理仓库 {repo_name} 时发生错误: {str(e)}")
        return None

async def main():
    repos = [
        "aio-libs/aiohttp",
        "pallets/flask",
        "django/django",
        "pytorch/pytorch"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_repo_info(session, repo) for repo in repos]
        results = await asyncio.gather(*tasks)
        
        # 保存结果
        valid_results = [r for r in results if r]
        with open('github_repos_info.json', 'w', encoding='utf-8') as f:
            json.dump(valid_results, f, indent=2)
        
        # 打印结果
        for repo in valid_results:
            print(f"\n仓库: {repo['name']}")
            print(f"Stars: {repo['stars']}")
            print(f"Forks: {repo['forks']}")
            print(f"最后更新: {repo['last_update']}")

if __name__ == "__main__":
    asyncio.run(main())

注意事项和最佳实践

  1. 遵守网站规则
    • 在开发爬虫时,务必阅读并遵守目标网站的robots.txt文件
    • 合理控制请求频率,避免对服务器造成压力
    • 遵守网站的服务条款和API使用限制
  2. 性能优化
    • 合理设置并发数量,避免过度占用系统资源
    • 使用连接池复用连接
    • 实现适当的重试机制和错误处理
  3. 代码维护
    • 做好日志记录
    • 实现异常处理机制
    • 模块化设计,便于维护和扩展

相关资源

总结

aiohttp作为一个强大的异步HTTP客户端/服务器框架,在Python爬虫开发中扮演着重要角色。通过本文的介绍,我们了解了aiohttp的基本用法、高级特性以及实际应用案例。在使用过程中,请记住遵守网站的使用规则,合理控制请求频率,确保爬虫程序的健康运行。随着异步编程在Python中的普及,掌握aiohttp将帮助你开发出更高效的爬虫应用。

希望本文能帮助你更好地理解和使用aiohttp库。如果你想深入学习,建议查看官方文档并动手实践,这将帮助你更快地掌握这个强大的工具。

关注我们,每天推荐一款实用的Python爬虫工具