一、s3transfer 库核心概述
s3transfer 是 AWS 官方推出的一款 Python 库,专门用于高效、可靠地处理与 Amazon S3 存储服务之间的文件传输操作。其工作原理是基于分块上传/下载、并发处理和重试机制,将大文件拆分为多个小块并行传输,同时支持断点续传,极大提升了传输效率和稳定性。

该库的优点十分突出:支持大文件分块传输、并发任务调度、自动重试失败请求、与 AWS SDK for Python(boto3)深度兼容;缺点则是功能高度聚焦于 S3 传输,不支持其他云存储服务,且需要依赖 boto3 配置 AWS 凭证。s3transfer 的开源协议为 Apache License 2.0,允许商业和非商业用途的自由使用、修改和分发。
二、s3transfer 安装与环境准备
2.1 安装方式
s3transfer 通常与 boto3 配套使用,因为它依赖 boto3 提供的 AWS 客户端和凭证管理功能。我们可以通过 Python 包管理工具 pip 直接安装,安装命令如下:
pip install s3transfer boto3
执行上述命令后,pip 会自动下载并安装 s3transfer 及其依赖的 boto3、botocore 等库,满足后续开发的环境需求。
2.2 AWS 凭证配置
要使用 s3transfer 操作 S3 存储桶,必须先配置 AWS 访问凭证,这是与 AWS 服务建立连接的前提。常见的配置方式有两种:
- 环境变量配置
在系统环境变量中设置AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY,这两个值可以从 AWS 控制台的 IAM 服务中获取。以 Linux/macOS 系统为例,配置命令如下:bash export AWS_ACCESS_KEY_ID="your-access-key-id" export AWS_SECRET_ACCESS_KEY="your-secret-access-key"
Windows 系统则可以通过“系统属性-高级-环境变量”界面添加对应的环境变量。 - 配置文件配置
在用户主目录下创建.aws文件夹,并在其中新建credentials文件,文件内容格式如下:ini
[default]
aws_access_key_id = your-access-key-id aws_secret_access_key = your-secret-access-key
同时,还可以在 .aws 文件夹下创建 config 文件,设置默认的 AWS 区域:ini
[default]
region = us-east-1
两种配置方式任选其一即可,配置完成后,s3transfer 会自动读取凭证信息,无需在代码中硬编码,保证了凭证的安全性。
三、s3transfer 核心功能与代码实例
s3transfer 的核心功能围绕 S3 的文件上传、下载、批量操作展开,其 API 设计简洁易懂,即使是 Python 新手也能快速上手。下面我们结合具体的代码实例,详细讲解每个功能的使用方法。
3.1 基本文件上传
基本文件上传适用于小文件的传输场景,s3transfer 会直接将文件内容发送到 S3 存储桶。在代码实现中,我们需要先通过 boto3 创建 S3 客户端,再利用 s3transfer 的 TransferManager 类来管理传输任务。
import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError
# 创建 boto3 S3 客户端
s3_client = boto3.client('s3')
# 初始化 TransferManager
transfer_manager = TransferManager(s3_client)
# 定义本地文件路径和 S3 存储桶及目标路径
local_file_path = 'test_file.txt'
bucket_name = 'your-s3-bucket-name'
s3_key = 'upload/test_file.txt'
try:
# 执行文件上传任务
future = transfer_manager.upload(local_file_path, bucket_name, s3_key)
# 等待上传任务完成
future.result()
print(f"文件 {local_file_path} 成功上传到 S3: s3://{bucket_name}/{s3_key}")
except TransferFailedError as e:
print(f"文件上传失败: {str(e)}")
finally:
# 关闭 TransferManager,释放资源
transfer_manager.shutdown()
代码说明:
- 首先导入所需的库和异常类,
TransferManager是 s3transfer 的核心类,负责任务的调度和执行;TransferFailedError用于捕获传输过程中可能出现的异常。 - 通过
boto3.client('s3')创建 S3 客户端,客户端会自动读取我们之前配置的 AWS 凭证。 - 初始化
TransferManager后,调用upload方法,传入本地文件路径、S3 存储桶名称和目标键(即文件在 S3 中的路径),该方法会返回一个Future对象。 - 调用
future.result()会阻塞当前线程,直到上传任务完成,这样可以确保我们能获取到上传的最终状态。 - 最后在
finally块中调用transfer_manager.shutdown(),关闭TransferManager,释放占用的系统资源,这是一个良好的编程习惯,避免资源泄露。
3.2 大文件分块上传
当传输的文件体积较大(比如超过 100MB)时,使用基本上传方式效率较低,且容易因为网络波动导致传输失败。此时,我们可以利用 s3transfer 的分块上传功能,将大文件拆分为多个小块(默认块大小为 8MB),并行上传到 S3,同时支持断点续传。
import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError
# 创建 S3 客户端
s3_client = boto3.client('s3')
# 配置 TransferManager 的分块上传参数
transfer_config = {
'multipart_threshold': 10 * 1024 * 1024, # 超过 10MB 的文件自动分块
'multipart_chunksize': 5 * 1024 * 1024 # 每个分块的大小为 5MB
}
# 初始化 TransferManager 并传入配置参数
transfer_manager = TransferManager(s3_client, config=transfer_config)
# 定义大文件路径和 S3 目标路径
local_large_file = 'large_data.zip'
bucket_name = 'your-s3-bucket-name'
s3_large_key = 'upload/large_data.zip'
try:
future = transfer_manager.upload(local_large_file, bucket_name, s3_large_key)
future.result()
print(f"大文件 {local_large_file} 成功分块上传到 S3")
except TransferFailedError as e:
print(f"大文件上传失败: {str(e)}")
finally:
transfer_manager.shutdown()
代码说明:
- 我们通过一个字典
transfer_config来配置分块传输的参数,multipart_threshold表示当文件大小超过该值时,自动启用分块上传;multipart_chunksize定义了每个分块的大小。 - 将配置参数传入
TransferManager的构造函数,这样TransferManager就会按照我们的配置来处理大文件传输。 - 分块上传的 API 调用方式与基本上传完全一致,
TransferManager会自动判断文件大小,选择合适的传输方式,对开发者来说是透明的,极大降低了使用门槛。
3.3 文件下载
文件下载的使用方法与上传类似,TransferManager 提供了 download 方法,支持从 S3 存储桶下载文件到本地。同样支持小文件直接下载和大文件分块下载,无需额外配置,TransferManager 会自动处理。
import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError
s3_client = boto3.client('s3')
transfer_manager = TransferManager(s3_client)
# 定义 S3 源文件和本地目标路径
bucket_name = 'your-s3-bucket-name'
s3_source_key = 'upload/test_file.txt'
local_download_path = 'downloaded_test_file.txt'
try:
future = transfer_manager.download(bucket_name, s3_source_key, local_download_path)
future.result()
print(f"文件成功从 S3 下载到本地: {local_download_path}")
except TransferFailedError as e:
print(f"文件下载失败: {str(e)}")
finally:
transfer_manager.shutdown()
代码说明:
download方法的参数顺序与upload相反,第一个参数是 S3 存储桶名称,第二个参数是文件在 S3 中的键,第三个参数是本地目标路径。- 其他代码逻辑与上传功能一致,通过
future.result()等待下载完成,捕获TransferFailedError处理异常,最后关闭TransferManager。
3.4 批量文件传输
在实际开发中,我们经常需要批量上传或下载多个文件,s3transfer 支持通过循环调用 upload 或 download 方法来实现批量操作,结合 concurrent.futures 模块,还可以进一步提升批量操作的效率。
import os
import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError
# 创建 S3 客户端
s3_client = boto3.client('s3')
transfer_manager = TransferManager(s3_client)
# 定义批量上传的本地文件夹和 S3 目标存储桶
local_folder = 'batch_upload_files'
bucket_name = 'your-s3-bucket-name'
s3_prefix = 'batch_upload/'
# 遍历本地文件夹中的所有文件
try:
futures = []
for filename in os.listdir(local_folder):
local_file_path = os.path.join(local_folder, filename)
# 跳过文件夹,只处理文件
if os.path.isfile(local_file_path):
s3_key = os.path.join(s3_prefix, filename)
future = transfer_manager.upload(local_file_path, bucket_name, s3_key)
futures.append(future)
# 等待所有上传任务完成
for future in futures:
future.result()
print("所有文件批量上传完成!")
except TransferFailedError as e:
print(f"批量上传过程中出现错误: {str(e)}")
except Exception as e:
print(f"未知错误: {str(e)}")
finally:
transfer_manager.shutdown()
代码说明:
- 首先通过
os.listdir遍历本地文件夹中的所有文件,使用os.path.isfile判断当前路径是否为文件,避免处理文件夹。 - 对于每个文件,构造其本地路径和 S3 目标键,调用
upload方法并将返回的Future对象添加到列表中。 - 循环遍历
Future对象列表,调用result()方法等待所有任务完成,这样可以实现多个文件的并行上传,提升批量操作的效率。 - 除了批量上传,批量下载的实现逻辑类似,只需要将
upload方法替换为download方法,遍历 S3 存储桶中的文件列表即可。
3.5 传输进度监控
在传输大文件时,我们往往需要了解实时的传输进度,s3transfer 支持通过回调函数来实现进度监控。我们可以自定义一个回调函数,在每次传输完一个分块后,更新并打印传输进度。
import os
import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError
# 自定义进度回调函数
class ProgressCallback:
def __init__(self, file_size):
self.file_size = file_size
self.transferred = 0
def __call__(self, bytes_transferred):
self.transferred += bytes_transferred
progress = (self.transferred / self.file_size) * 100
print(f"传输进度: {progress:.2f}% ({self.transferred}/{self.file_size} bytes)", end='\r')
# 创建 S3 客户端
s3_client = boto3.client('s3')
transfer_manager = TransferManager(s3_client)
# 定义文件路径
local_file = 'large_data.zip'
bucket_name = 'your-s3-bucket-name'
s3_key = 'upload/large_data.zip'
# 获取本地文件大小
file_size = os.path.getsize(local_file)
# 初始化进度回调对象
progress_callback = ProgressCallback(file_size)
try:
future = transfer_manager.upload(
local_file,
bucket_name,
s3_key,
callback=progress_callback
)
future.result()
print("\n文件上传完成!")
except TransferFailedError as e:
print(f"\n文件上传失败: {str(e)}")
finally:
transfer_manager.shutdown()
代码说明:
- 我们定义了一个
ProgressCallback类,其构造函数接收文件的总大小,__call__方法是回调函数的核心,每次被调用时会接收已传输的字节数,并计算当前的传输进度。 end='\r'用于实现进度条的单行刷新,避免打印过多的换行符,提升用户体验。- 在调用
upload方法时,通过callback参数传入进度回调对象,这样 s3transfer 会在传输过程中定期调用该回调函数,实时更新传输进度。 - 进度监控功能同样适用于下载操作,只需要在
download方法中传入回调函数即可。
四、s3transfer 高级配置与优化
为了进一步提升 s3transfer 的传输性能,我们可以对其进行高级配置,比如调整并发数、设置超时时间、修改分块大小等。下面我们介绍几种常见的优化方式。
4.1 调整并发数
s3transfer 的 TransferManager 支持通过 max_request_concurrency 参数调整并发请求数,并发数越高,传输速度越快,但同时也会占用更多的系统资源和网络带宽。我们可以根据实际的网络环境和硬件配置,合理调整该参数。
import boto3
from s3transfer import TransferManager
s3_client = boto3.client('s3')
# 配置最大并发请求数为 10
transfer_config = {
'max_request_concurrency': 10
}
transfer_manager = TransferManager(s3_client, config=transfer_config)
# 后续传输逻辑与之前一致
transfer_manager.shutdown()
4.2 设置超时时间
在网络不稳定的环境下,我们可以通过设置超时时间,避免传输任务长时间阻塞。超时时间可以通过 boto3 客户端的配置来实现。
import boto3
from s3transfer import TransferManager
# 创建 S3 客户端时设置超时时间
config = boto3.session.Config(
connect_timeout=30, # 连接超时时间 30 秒
read_timeout=60 # 读取超时时间 60 秒
)
s3_client = boto3.client('s3', config=config)
transfer_manager = TransferManager(s3_client)
# 后续传输逻辑与之前一致
transfer_manager.shutdown()
4.3 自定义重试策略
s3transfer 内置了重试机制,当传输请求失败时,会自动重试。我们可以通过修改 botocore 的重试配置,来自定义重试的次数和间隔时间。
import boto3
from botocore.config import Config
from s3transfer import TransferManager
# 自定义重试配置
retry_config = Config(
retries={
'max_attempts': 5, # 最大重试次数
'mode': 'standard' # 重试模式,standard 表示标准重试
}
)
s3_client = boto3.client('s3', config=retry_config)
transfer_manager = TransferManager(s3_client)
# 后续传输逻辑与之前一致
transfer_manager.shutdown()
五、s3transfer 实际应用案例:S3 文件备份工具
结合前面所学的知识,我们可以开发一个简单的 S3 文件备份工具,该工具能够将指定本地文件夹中的所有文件备份到 S3 存储桶,并支持进度监控和异常处理。
import os
import argparse
import boto3
from s3transfer import TransferManager
from s3transfer.exceptions import TransferFailedError
class S3BackupTool:
def __init__(self, bucket_name, aws_region=None):
self.bucket_name = bucket_name
# 创建 S3 客户端
client_config = {}
if aws_region:
client_config['region_name'] = aws_region
self.s3_client = boto3.client('s3',** client_config)
self.transfer_manager = TransferManager(self.s3_client)
class ProgressMonitor:
def __init__(self, total_size):
self.total_size = total_size
self.transferred = 0
def __call__(self, bytes_trans):
self.transferred += bytes_trans
progress = (self.transferred / self.total_size) * 100
print(f"备份进度: {progress:.2f}% ({self.transferred}/{self.total_size} bytes)", end='\r')
def backup_folder(self, local_folder, s3_prefix='backup/'):
"""备份本地文件夹到 S3 存储桶"""
if not os.path.isdir(local_folder):
raise ValueError(f"本地文件夹不存在: {local_folder}")
# 计算本地文件夹总大小
total_size = 0
for root, dirs, files in os.walk(local_folder):
for file in files:
file_path = os.path.join(root, file)
total_size += os.path.getsize(file_path)
progress_monitor = self.ProgressMonitor(total_size)
futures = []
try:
# 遍历文件夹,上传所有文件
for root, dirs, files in os.walk(local_folder):
for file in files:
local_file_path = os.path.join(root, file)
# 构造 S3 键,保留本地文件夹结构
relative_path = os.path.relpath(local_file_path, local_folder)
s3_key = os.path.join(s3_prefix, relative_path)
future = self.transfer_manager.upload(
local_file_path,
self.bucket_name,
s3_key,
callback=progress_monitor
)
futures.append(future)
# 等待所有任务完成
for future in futures:
future.result()
print("\n文件夹备份完成!")
except TransferFailedError as e:
print(f"\n备份过程中出现错误: {str(e)}")
raise
finally:
self.transfer_manager.shutdown()
if __name__ == '__main__':
# 使用 argparse 解析命令行参数
parser = argparse.ArgumentParser(description='本地文件夹备份到 AWS S3 工具')
parser.add_argument('--local-folder', required=True, help='需要备份的本地文件夹路径')
parser.add_argument('--bucket-name', required=True, help='目标 S3 存储桶名称')
parser.add_argument('--region', help='AWS 区域名称,如 us-east-1')
args = parser.parse_args()
# 初始化备份工具并执行备份
backup_tool = S3BackupTool(args.bucket_name, args.region)
backup_tool.backup_folder(args.local_folder)
案例说明:
- 该工具封装为
S3BackupTool类,通过命令行参数接收本地文件夹路径、S3 存储桶名称和 AWS 区域,使用argparse模块解析命令行参数,提升工具的易用性。 backup_folder方法是工具的核心,首先计算本地文件夹的总大小,用于进度监控;然后通过os.walk遍历文件夹中的所有文件,保留文件的相对路径结构,确保备份到 S3 后的文件结构与本地一致。- 集成了进度监控功能,实时显示备份进度;同时捕获
TransferFailedError异常,处理传输过程中可能出现的错误。 - 运行该工具时,可以在命令行中输入如下命令:
python s3_backup_tool.py --local-folder ./my_files --bucket-name my-backup-bucket --region us-east-1
六、相关资源
- Pypi地址:https://pypi.org/project/s3transfer
- Github地址:https://github.com/boto/s3transfer
- 官方文档地址:https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-transfer.html
关注我,每天分享一个实用的Python自动化工具。

