站点图标 Park Lam's 每日分享

Python实用工具:Azure Storage Blob库使用教程

一、Python的广泛性及重要性与本文写作对象

Python作为一种高级编程语言,凭借其简洁易读的语法和强大的功能,在当今科技领域展现出了卓越的广泛性和重要性。

在Web开发领域,Python的Django、Flask等框架能够帮助开发者快速搭建高效、稳定的Web应用,许多知名网站如Instagram、Pinterest等都基于Python开发。

数据分析和数据科学方面,Python拥有Pandas、NumPy、Matplotlib等强大的库,能够轻松处理大规模数据、进行复杂的数值计算以及可视化分析,为数据驱动的决策提供有力支持。

机器学习和人工智能领域,Python更是占据了主导地位,TensorFlow、PyTorch、Scikit-learn等库使得开发者能够便捷地实现各种机器学习算法和深度学习模型,推动了AI技术的快速发展。

在桌面自动化和爬虫脚本方面,Python的PyAutoGUI、Selenium、Requests、BeautifulSoup等库可以帮助开发者实现自动化任务和数据采集,提高工作效率。

金融和量化交易领域,Python的Pandas、NumPy、TA-Lib等库可用于金融数据处理、策略开发和回测,为金融行业提供了强大的技术支持。

教育和研究领域,Python因其简单易学的特点,成为了许多高校和科研机构的首选编程语言,用于教学和科研项目的开发。

本文将聚焦于Python的一个实用工具库——azure-storage-blob,它为Python开发者提供了与Azure Blob存储服务进行交互的便捷方式,能够帮助开发者轻松实现Blob存储的各种操作。

二、azure-storage-blob库的用途、工作原理、优缺点及License类型

azure-storage-blob库是微软为Python开发者提供的用于与Azure Blob存储服务进行交互的官方库。

用途

它主要用于在Python应用程序中实现Blob存储的各种操作,包括创建存储账户、容器和Blob,上传、下载和管理Blob数据等。无论是小型应用还是大型企业级系统,都可以利用该库来实现数据的存储和管理。

工作原理

该库基于Azure Blob存储服务的REST API构建,通过封装这些API,为Python开发者提供了简洁、易用的对象和方法。开发者可以通过创建Blob服务客户端、容器客户端和Blob客户端来分别操作Blob服务、容器和Blob。在与Azure Blob存储服务通信时,库会自动处理身份验证、请求构建和响应解析等底层细节,使得开发者可以专注于业务逻辑的实现。

优缺点

优点:

缺点:

License类型

azure-storage-blob库遵循MIT License,这是一种宽松的开源许可证,允许用户自由使用、修改和分发该库,只需保留原有的版权声明和许可声明即可。

三、azure-storage-blob库的使用方式

3.1 安装azure-storage-blob库

在使用azure-storage-blob库之前,需要先进行安装。可以使用pip命令来安装:

pip install azure-storage-blob

安装完成后,就可以在Python代码中导入该库并使用了。

3.2 身份验证方式

azure-storage-blob库支持多种身份验证方式,下面分别介绍:

3.2.1 使用连接字符串进行身份验证

连接字符串是一种包含存储账户名称和密钥的字符串,使用连接字符串进行身份验证是最简单的方式。以下是一个示例代码:

from azure.storage.blob import BlobServiceClient

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 输出客户端信息,验证连接是否成功
print(f"Blob服务客户端已创建,账户名称: {blob_service_client.account_name}")

3.2.2 使用共享访问签名(SAS)进行身份验证

共享访问签名(SAS)是一种安全的身份验证方式,它允许你授予对存储资源的有限访问权限,而无需共享账户密钥。以下是一个示例代码:

from azure.storage.blob import BlobServiceClient

# 存储账户名称和SAS令牌
account_name = "your_account_name"
sas_token = "your_sas_token"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient(account_url=f"https://{account_name}.blob.core.windows.net", credential=sas_token)

# 输出客户端信息,验证连接是否成功
print(f"Blob服务客户端已创建,账户名称: {blob_service_client.account_name}")

3.2.3 使用Azure Active Directory进行身份验证

使用Azure Active Directory进行身份验证是一种更安全的方式,它允许你使用Azure AD凭据来访问存储资源。以下是一个示例代码:

from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient

# 存储账户名称
account_name = "your_account_name"

# 获取默认凭据
credential = DefaultAzureCredential()

# 创建Blob服务客户端
blob_service_client = BlobServiceClient(account_url=f"https://{account_name}.blob.core.windows.net", credential=credential)

# 输出客户端信息,验证连接是否成功
print(f"Blob服务客户端已创建,账户名称: {blob_service_client.account_name}")

3.3 容器操作

容器是Blob存储中的一个逻辑分组,类似于文件系统中的目录。下面介绍如何使用azure-storage-blob库进行容器操作。

3.3.1 创建容器

以下是创建容器的示例代码:

from azure.storage.blob import BlobServiceClient

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 容器名称
container_name = "my-container"

try:
    # 创建容器
    container_client = blob_service_client.create_container(container_name)
    print(f"容器 '{container_name}' 已创建")
except Exception as e:
    print(f"创建容器时出错: {e}")

3.3.2 列出所有容器

以下是列出所有容器的示例代码:

from azure.storage.blob import BlobServiceClient

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

try:
    # 获取所有容器
    containers = blob_service_client.list_containers()

    print("所有容器列表:")
    for container in containers:
        print(f"- {container.name}")
except Exception as e:
    print(f"列出容器时出错: {e}")

3.3.3 删除容器

以下是删除容器的示例代码:

from azure.storage.blob import BlobServiceClient

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 容器名称
container_name = "my-container"

try:
    # 删除容器
    blob_service_client.delete_container(container_name)
    print(f"容器 '{container_name}' 已删除")
except Exception as e:
    print(f"删除容器时出错: {e}")

3.4 Blob操作

Blob是Azure Blob存储中的基本存储单元,可以是文件、图像、视频等任何类型的数据。下面介绍如何使用azure-storage-blob库进行Blob操作。

3.4.1 上传Blob

以下是上传Blob的示例代码:

from azure.storage.blob import BlobServiceClient
import os

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 容器名称
container_name = "my-container"

# 获取容器客户端
container_client = blob_service_client.get_container_client(container_name)

# 本地文件路径
local_file_path = "path/to/your/local/file.txt"

# Blob名称
blob_name = os.path.basename(local_file_path)

try:
    # 上传Blob
    with open(local_file_path, "rb") as data:
        container_client.upload_blob(name=blob_name, data=data)

    print(f"Blob '{blob_name}' 已上传到容器 '{container_name}'")
except Exception as e:
    print(f"上传Blob时出错: {e}")

3.4.2 下载Blob

以下是下载Blob的示例代码:

from azure.storage.blob import BlobServiceClient
import os

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 容器名称
container_name = "my-container"

# Blob名称
blob_name = "file.txt"

# 本地保存路径
local_save_path = "path/to/save/file.txt"

try:
    # 获取容器客户端
    container_client = blob_service_client.get_container_client(container_name)

    # 获取Blob客户端
    blob_client = container_client.get_blob_client(blob_name)

    # 下载Blob
    with open(local_save_path, "wb") as download_file:
        download_file.write(blob_client.download_blob().readall())

    print(f"Blob '{blob_name}' 已下载到 '{local_save_path}'")
except Exception as e:
    print(f"下载Blob时出错: {e}")

3.4.3 列出容器中的所有Blob

以下是列出容器中所有Blob的示例代码:

from azure.storage.blob import BlobServiceClient

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 容器名称
container_name = "my-container"

try:
    # 获取容器客户端
    container_client = blob_service_client.get_container_client(container_name)

    # 列出所有Blob
    blobs = container_client.list_blobs()

    print(f"容器 '{container_name}' 中的所有Blob:")
    for blob in blobs:
        print(f"- {blob.name}")
except Exception as e:
    print(f"列出Blob时出错: {e}")

3.4.4 删除Blob

以下是删除Blob的示例代码:

from azure.storage.blob import BlobServiceClient

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 容器名称
container_name = "my-container"

# Blob名称
blob_name = "file.txt"

try:
    # 获取容器客户端
    container_client = blob_service_client.get_container_client(container_name)

    # 删除Blob
    container_client.delete_blob(blob_name)

    print(f"Blob '{blob_name}' 已从容器 '{container_name}' 中删除")
except Exception as e:
    print(f"删除Blob时出错: {e}")

3.5 高级操作

3.5.1 生成共享访问签名(SAS)

以下是生成Blob的共享访问签名(SAS)的示例代码:

from datetime import datetime, timedelta
from azure.storage.blob import BlobServiceClient, BlobSasPermissions, generate_blob_sas

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 容器名称
container_name = "my-container"

# Blob名称
blob_name = "file.txt"

# 生成SAS的开始时间和过期时间
start_time = datetime.utcnow()
expiry_time = start_time + timedelta(hours=1)

try:
    # 生成Blob SAS
    sas_token = generate_blob_sas(
        account_name=blob_service_client.account_name,
        container_name=container_name,
        blob_name=blob_name,
        account_key=blob_service_client.credential.account_key,
        permission=BlobSasPermissions(read=True),
        start=start_time,
        expiry=expiry_time
    )

    # 构建SAS URL
    sas_url = f"https://{blob_service_client.account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}"

    print(f"Blob SAS URL: {sas_url}")
except Exception as e:
    print(f"生成SAS时出错: {e}")

3.5.2 批量操作

以下是批量上传多个Blob的示例代码:

from azure.storage.blob import BlobServiceClient
import os

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 容器名称
container_name = "my-container"

# 获取容器客户端
container_client = blob_service_client.get_container_client(container_name)

# 本地目录路径
local_dir_path = "path/to/your/local/directory"

try:
    # 获取目录下的所有文件
    files = [f for f in os.listdir(local_dir_path) if os.path.isfile(os.path.join(local_dir_path, f))]

    # 批量上传文件
    for file_name in files:
        local_file_path = os.path.join(local_dir_path, file_name)
        blob_name = file_name

        # 上传Blob
        with open(local_file_path, "rb") as data:
            container_client.upload_blob(name=blob_name, data=data)

        print(f"Blob '{blob_name}' 已上传到容器 '{container_name}'")

    print(f"成功上传 {len(files)} 个文件")
except Exception as e:
    print(f"批量上传Blob时出错: {e}")

3.5.3 异步操作

以下是使用异步方式上传Blob的示例代码:

import asyncio
from azure.storage.blob.aio import BlobServiceClient
import os

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 容器名称
container_name = "my-container"

# 本地文件路径
local_file_path = "path/to/your/local/file.txt"

# Blob名称
blob_name = os.path.basename(local_file_path)

async def upload_blob_async():
    try:
        # 创建异步Blob服务客户端
        blob_service_client = BlobServiceClient.from_connection_string(connect_str)

        # 获取异步容器客户端
        container_client = blob_service_client.get_container_client(container_name)

        # 上传Blob
        async with open(local_file_path, "rb") as data:
            await container_client.upload_blob(name=blob_name, data=data)

        print(f"Blob '{blob_name}' 已异步上传到容器 '{container_name}'")
    except Exception as e:
        print(f"异步上传Blob时出错: {e}")
    finally:
        # 关闭客户端
        await blob_service_client.close()

# 运行异步函数
asyncio.run(upload_blob_async())

四、实际案例

4.1 网站静态资源存储

假设你正在开发一个网站,需要存储大量的静态资源,如图片、CSS、JavaScript文件等。你可以使用Azure Blob存储来存储这些资源,并使用azure-storage-blob库来管理这些资源。

以下是一个示例代码,展示如何使用azure-storage-blob库上传网站静态资源到Azure Blob存储:

from azure.storage.blob import BlobServiceClient
import os

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 容器名称
container_name = "website-static"

# 本地静态资源目录路径
local_static_dir = "path/to/your/website/static/files"

try:
    # 获取容器客户端
    container_client = blob_service_client.get_container_client(container_name)

    # 确保容器存在
    if not container_client.exists():
        container_client.create_container()
        print(f"容器 '{container_name}' 已创建")

    # 遍历本地静态资源目录
    for root, dirs, files in os.walk(local_static_dir):
        for file in files:
            # 本地文件路径
            local_file_path = os.path.join(root, file)

            # 计算Blob名称(相对于静态资源目录的路径)
            blob_name = os.path.relpath(local_file_path, local_static_dir)

            # 上传Blob
            with open(local_file_path, "rb") as data:
                container_client.upload_blob(name=blob_name, data=data, overwrite=True)

            print(f"Blob '{blob_name}' 已上传到容器 '{container_name}'")

    print(f"网站静态资源已成功上传到Azure Blob存储")
except Exception as e:
    print(f"上传网站静态资源时出错: {e}")

4.2 数据备份和恢复

假设你需要定期备份你的应用程序数据到Azure Blob存储,并在需要时能够恢复这些数据。你可以使用azure-storage-blob库来实现这个功能。

以下是一个示例代码,展示如何使用azure-storage-blob库实现数据备份和恢复功能:

from azure.storage.blob import BlobServiceClient
import os
import datetime
import shutil

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 备份容器名称
backup_container_name = "data-backup"

# 本地数据目录路径
local_data_dir = "path/to/your/data"

# 本地备份目录路径
local_backup_dir = "path/to/your/backup"

def backup_data():
    try:
        # 获取备份容器客户端
        backup_container_client = blob_service_client.get_container_client(backup_container_name)

        # 确保容器存在
        if not backup_container_client.exists():
            backup_container_client.create_container()
            print(f"容器 '{backup_container_name}' 已创建")

        # 创建本地临时备份目录
        timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
        local_temp_backup_dir = os.path.join(local_backup_dir, f"backup_{timestamp}")
        os.makedirs(local_temp_backup_dir, exist_ok=True)

        # 复制数据到临时备份目录
        for item in os.listdir(local_data_dir):
            item_path = os.path.join(local_data_dir, item)
            if os.path.isfile(item_path):
                shutil.copy2(item_path, local_temp_backup_dir)

        # 上传备份文件到Azure Blob存储
        for file in os.listdir(local_temp_backup_dir):
            local_file_path = os.path.join(local_temp_backup_dir, file)
            blob_name = f"{timestamp}/{file}"

            with open(local_file_path, "rb") as data:
                backup_container_client.upload_blob(name=blob_name, data=data)

            print(f"备份文件 '{blob_name}' 已上传")

        # 删除临时备份目录
        shutil.rmtree(local_temp_backup_dir)

        print(f"数据备份完成,时间戳: {timestamp}")
    except Exception as e:
        print(f"备份数据时出错: {e}")

def restore_data(timestamp):
    try:
        # 获取备份容器客户端
        backup_container_client = blob_service_client.get_container_client(backup_container_name)

        # 创建本地临时恢复目录
        local_temp_restore_dir = os.path.join(local_backup_dir, f"restore_{timestamp}")
        os.makedirs(local_temp_restore_dir, exist_ok=True)

        # 下载指定时间戳的备份文件
        blobs = backup_container_client.list_blobs(name_starts_with=f"{timestamp}/")
        for blob in blobs:
            blob_name = blob.name
            file_name = os.path.basename(blob_name)
            local_file_path = os.path.join(local_temp_restore_dir, file_name)

            blob_client = backup_container_client.get_blob_client(blob_name)
            with open(local_file_path, "wb") as download_file:
                download_file.write(blob_client.download_blob().readall())

            print(f"恢复文件 '{blob_name}' 已下载")

        # 复制恢复文件到数据目录
        for item in os.listdir(local_temp_restore_dir):
            item_path = os.path.join(local_temp_restore_dir, item)
            if os.path.isfile(item_path):
                shutil.copy2(item_path, os.path.join(local_data_dir, item))

        # 删除临时恢复目录
        shutil.rmtree(local_temp_restore_dir)

        print(f"数据恢复完成,使用时间戳: {timestamp} 的备份")
    except Exception as e:
        print(f"恢复数据时出错: {e}")

# 示例使用
if __name__ == "__main__":
    # 备份数据
    backup_data()

    # 恢复数据(需要指定时间戳)
    # restore_data("20230101120000")

4.3 日志文件存储和分析

假设你需要收集和存储应用程序的日志文件,并进行分析。你可以使用Azure Blob存储来存储这些日志文件,并使用azure-storage-blob库来管理这些日志文件。

以下是一个示例代码,展示如何使用azure-storage-blob库实现日志文件的存储和分析:

from azure.storage.blob import BlobServiceClient
import os
import datetime
import logging
from io import StringIO

# 存储账户连接字符串
connect_str = "DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net"

# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# 日志容器名称
logs_container_name = "application-logs"

# 配置本地日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("application.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def upload_logs():
    try:
        # 获取日志容器客户端
        logs_container_client = blob_service_client.get_container_client(logs_container_name)

        # 确保容器存在
        if not logs_container_client.exists():
            logs_container_client.create_container()
            print(f"容器 '{logs_container_name}' 已创建")

        # 读取本地日志文件
        log_file_path = "application.log"
        if os.path.exists(log_file_path):
            # 生成Blob名称
            timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
            blob_name = f"logs_{timestamp}.log"

            # 上传日志文件
            with open(log_file_path, "rb") as data:
                logs_container_client.upload_blob(name=blob_name, data=data)

            print(f"日志文件 '{blob_name}' 已上传")

            # 清空本地日志文件
            open(log_file_path, 'w').close()
        else:
            print("本地日志文件不存在")
    except Exception as e:
        print(f"上传日志时出错: {e}")

def analyze_logs(days=7):
    try:
        # 获取日志容器客户端
        logs_container_client = blob_service_client.get_container_client(logs_container_name)

        # 获取最近指定天数的日志文件
        now = datetime.datetime.now()
        logs_to_analyze = []

        for blob in logs_container_client.list_blobs():
            # 从Blob名称中提取时间戳
            blob_name = blob.name
            if "logs_" in blob_name and ".log" in blob_name:
                try:
                    timestamp_str = blob_name.split("_")[1].split(".")[0]
                    blob_timestamp = datetime.datetime.strptime(timestamp_str, "%Y%m%d%H%M%S")

                    # 只分析最近指定天数的日志
                    if (now - blob_timestamp).days <= days:
                        logs_to_analyze.append(blob_name)
                except:
                    continue

        # 分析日志
        error_count = 0
        warning_count = 0
        info_count = 0

        for blob_name in logs_to_analyze:
            blob_client = logs_container_client.get_blob_client(blob_name)
            blob_data = blob_client.download_blob().readall().decode('utf-8')

            # 统计不同级别的日志数量
            error_count += blob_data.count("ERROR")
            warning_count += blob_data.count("WARNING")
            info_count += blob_data.count("INFO")

        # 输出分析结果
        print(f"最近 {days} 天的日志分析结果:")
        print(f"INFO 日志数量: {info_count}")
        print(f"WARNING 日志数量: {warning_count}")
        print(f"ERROR 日志数量: {error_count}")

        return {
            "info_count": info_count,
            "warning_count": warning_count,
            "error_count": error_count
        }
    except Exception as e:
        print(f"分析日志时出错: {e}")
        return None

# 示例使用
if __name__ == "__main__":
    # 记录一些示例日志
    logger.info("这是一条信息日志")
    logger.warning("这是一条警告日志")
    logger.error("这是一条错误日志")

    # 上传日志
    upload_logs()

    # 分析日志
    analyze_logs(days=1)

五、相关资源

关注我,每天分享一个实用的Python自动化工具。

退出移动版