Python作为一种功能强大且易于学习的编程语言,凭借其丰富的库和工具,在当今技术领域中占据着举足轻重的地位。无论是Web开发、数据分析和数据科学、机器学习和人工智能、桌面自动化和爬虫脚本、金融和量化交易,还是教育和研究等领域,Python都发挥着重要作用。它的广泛性和重要性使得开发者们能够更加高效地完成各种任务,解决各类复杂问题。在众多的Python库中,python-bigquery 库在大数据处理和分析方面表现出色,接下来我们将详细介绍这个库。

一、python-bigquery 概述
(一)用途
python-bigquery 是一个用于与 Google BigQuery 进行交互的 Python 库。Google BigQuery 是一种无服务器的企业数据仓库,可帮助用户使用 SQL 查询分析 PB 级数据。通过 python-bigquery 库,开发者可以在 Python 环境中轻松地执行 SQL 查询、加载数据、导出数据等操作,无需离开 Python 环境,大大提高了数据处理和分析的效率。
(二)工作原理
python-bigquery 库通过 Google Cloud API 与 BigQuery 服务进行通信。它提供了一组 Python 接口,允许开发者使用 Python 代码来操作 BigQuery。当开发者执行一个查询或其他操作时,库会将这些操作转换为 BigQuery API 请求,并将结果返回给开发者。
(三)优缺点
优点:
- 简单易用:提供了简洁的 Python 接口,使得开发者可以轻松地与 BigQuery 进行交互。
- 高效性能:能够处理大规模数据集,执行复杂查询的效率较高。
- 灵活性:支持多种数据格式的导入和导出,方便与其他数据处理工具集成。
- 与 Python 生态系统集成:可以与 Pandas、NumPy 等 Python 数据科学库无缝集成,便于进行数据分析和可视化。
缺点:
- 依赖网络连接:由于需要通过网络与 Google Cloud API 通信,因此在网络不稳定的情况下可能会影响性能。
- 成本考虑:使用 BigQuery 服务需要付费,对于大规模数据处理可能会产生较高的成本。
(四)License 类型
python-bigquery 库遵循 Apache License 2.0。这是一种宽松的开源许可证,允许用户自由使用、修改和分发代码,只需保留原始许可证声明即可。
二、安装 python-bigquery
在使用 python-bigquery 库之前,需要先进行安装。可以使用 pip 来安装这个库,打开终端并执行以下命令:
pip install google-cloud-bigquery
安装完成后,还需要进行一些配置才能正常使用。首先,需要在 Google Cloud 平台上创建一个项目,并启用 BigQuery API。然后,创建一个服务账号并下载其凭证文件(JSON 格式)。最后,设置环境变量 GOOGLE_APPLICATION_CREDENTIALS 指向该凭证文件的路径。
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/credentials.json"
这样就完成了 python-bigquery 库的安装和配置工作,可以开始使用它来进行数据处理和分析了。
三、python-bigquery 的使用方式
(一)创建 BigQuery 客户端
在使用 python-bigquery 库进行任何操作之前,需要先创建一个 BigQuery 客户端对象。这个客户端对象是与 BigQuery 服务进行通信的入口点。
from google.cloud import bigquery
# 创建 BigQuery 客户端
client = bigquery.Client()
(二)执行 SQL 查询
执行 SQL 查询是使用 BigQuery 的主要场景之一。python-bigquery 库提供了简单的方法来执行 SQL 查询并获取结果。
1. 基本查询
以下是一个执行基本 SQL 查询的示例,查询 BigQuery 公共数据集中的 natality 表,获取出生体重超过 4000 克的婴儿数量:
from google.cloud import bigquery
# 创建 BigQuery 客户端
client = bigquery.Client()
# 定义 SQL 查询
query = """
SELECT
COUNT(*) AS high_birth_weight_count
FROM
`bigquery-public-data.samples.natality`
WHERE
weight_pounds > 8.8 # 8.8 磅约等于 4000 克
"""
# 执行查询
query_job = client.query(query)
# 获取查询结果
results = query_job.result()
# 处理结果
for row in results:
print(f"出生体重超过 4000 克的婴儿数量: {row.high_birth_weight_count}")
在这个示例中,首先创建了一个 BigQuery 客户端对象。然后定义了一个 SQL 查询字符串,查询出生体重超过 8.8 磅(约 4000 克)的婴儿数量。使用客户端对象的 query 方法执行查询,并获取查询作业对象。最后,通过调用查询作业对象的 result 方法获取查询结果,并遍历结果集打印出统计结果。
2. 参数化查询
为了防止 SQL 注入攻击,提高查询的安全性和灵活性,可以使用参数化查询。以下是一个参数化查询的示例,查询指定年份和月份的出生记录:
from google.cloud import bigquery
# 创建 BigQuery 客户端
client = bigquery.Client()
# 定义 SQL 查询,使用参数占位符
query = """
SELECT
year, month, COUNT(*) AS birth_count
FROM
`bigquery-public-data.samples.natality`
WHERE
year = @year
AND month = @month
GROUP BY
year, month
"""
# 设置查询参数
query_params = [
bigquery.ScalarQueryParameter("year", "INT64", 2000),
bigquery.ScalarQueryParameter("month", "INT64", 1)
]
# 配置查询作业
job_config = bigquery.QueryJobConfig()
job_config.query_parameters = query_params
# 执行查询
query_job = client.query(query, job_config=job_config)
# 获取查询结果
results = query_job.result()
# 处理结果
for row in results:
print(f"{row.year} 年 {row.month} 月的出生记录数量: {row.birth_count}")
在这个示例中,SQL 查询字符串中使用了 @year 和 @month 作为参数占位符。然后创建了查询参数列表,并将其设置到查询作业配置中。最后执行查询并处理结果。
3. 异步查询
对于长时间运行的查询,可以使用异步查询方式,这样在查询执行期间可以执行其他任务。以下是一个异步查询的示例:
from google.cloud import bigquery
import time
# 创建 BigQuery 客户端
client = bigquery.Client()
# 定义 SQL 查询
query = """
SELECT
state, AVG(weight_pounds) AS average_birth_weight
FROM
`bigquery-public-data.samples.natality`
GROUP BY
state
ORDER BY
average_birth_weight DESC
"""
# 执行异步查询
query_job = client.query(query)
# 检查查询状态
print("查询状态:", query_job.state)
# 执行其他任务
print("正在执行其他任务...")
time.sleep(2)
# 等待查询完成并获取结果
query_job.result() # 等待查询完成
# 获取查询状态
print("查询状态:", query_job.state)
# 处理结果
results = query_job.result()
for row in results:
print(f"{row.state}: 平均出生体重 = {row.average_birth_weight:.2f} 磅")
在这个示例中,执行查询后立即检查查询状态,然后执行其他任务(这里使用 time.sleep(2) 模拟)。调用 query_job.result() 方法会阻塞当前线程,直到查询完成。最后获取并处理查询结果。
(三)加载数据到 BigQuery
除了查询数据,还可以使用 python-bigquery 库将数据加载到 BigQuery 表中。以下是一个将 CSV 文件加载到 BigQuery 表的示例:
from google.cloud import bigquery
# 创建 BigQuery 客户端
client = bigquery.Client()
# 定义数据集和表 ID
dataset_id = "my_dataset"
table_id = "my_table"
# 确保数据集存在
dataset_ref = client.dataset(dataset_id)
try:
client.get_dataset(dataset_ref)
except Exception:
dataset = bigquery.Dataset(dataset_ref)
dataset = client.create_dataset(dataset)
print(f"创建数据集 {dataset_id}")
# 定义表的架构
schema = [
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("city", "STRING", mode="NULLABLE"),
]
# 创建表
table_ref = dataset_ref.table(table_id)
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table)
print(f"创建表 {table_id}")
# 定义 CSV 文件路径
csv_path = "data.csv"
# 配置加载作业
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1 # 跳过 CSV 文件的标题行
job_config.autodetect = False # 不自动检测架构,使用上面定义的架构
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE # 覆盖表中已有的数据
# 从本地文件加载数据
with open(csv_path, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
# 等待加载作业完成
job.result()
# 检查加载结果
table = client.get_table(table_ref)
print(f"加载完成。表 {table_id} 现在有 {table.num_rows} 行数据")
在这个示例中,首先创建了一个 BigQuery 客户端对象。然后定义了数据集和表的 ID,并确保数据集存在。接着定义了表的架构并创建了表。之后配置了加载作业,指定了 CSV 文件的格式、跳过标题行等选项。最后从本地 CSV 文件加载数据到 BigQuery 表中,并等待加载作业完成。
(四)从 BigQuery 导出数据
除了加载数据,还可以将 BigQuery 表中的数据导出到其他格式,如 CSV、JSON 等。以下是一个将 BigQuery 表数据导出到 CSV 文件的示例:
from google.cloud import bigquery
import os
# 创建 BigQuery 客户端
client = bigquery.Client()
# 定义数据集和表 ID
dataset_id = "my_dataset"
table_id = "my_table"
# 获取表引用
table_ref = client.dataset(dataset_id).table(table_id)
# 定义导出的 GCS 路径
gcs_path = "gs://my-bucket/exported_data.csv"
# 配置提取作业
job_config = bigquery.ExtractJobConfig()
job_config.destination_format = bigquery.DestinationFormat.CSV
job_config.field_delimiter = ","
job_config.print_header = True
# 执行提取作业
extract_job = client.extract_table(
table_ref,
gcs_path,
location="US", # 表所在的位置
job_config=job_config,
)
# 等待提取作业完成
extract_job.result()
print(f"数据已成功导出到 {gcs_path}")
# 如果需要将数据从 GCS 下载到本地
if not os.path.exists("exported"):
os.makedirs("exported")
# 使用 gsutil 命令下载文件
os.system(f"gsutil cp {gcs_path} exported/")
print("数据已下载到本地 exported 目录")
在这个示例中,首先创建了 BigQuery 客户端对象。然后定义了要导出的表的引用和导出目标 GCS(Google Cloud Storage)路径。配置了提取作业,指定了导出格式为 CSV,并设置了字段分隔符和是否包含标题行。执行提取作业并等待其完成。最后,如果需要,可以使用 gsutil 命令将数据从 GCS 下载到本地。
(五)创建和管理数据集与表
python-bigquery 库还提供了创建和管理数据集与表的功能。以下是一个创建数据集、表,并对表进行操作的完整示例:
from google.cloud import bigquery
# 创建 BigQuery 客户端
client = bigquery.Client()
# (一)创建数据集
dataset_id = "my_new_dataset"
dataset_ref = client.dataset(dataset_id)
# 检查数据集是否存在
try:
client.get_dataset(dataset_ref)
print(f"数据集 {dataset_id} 已存在")
except Exception:
# 创建数据集
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "US" # 设置数据集位置
dataset = client.create_dataset(dataset)
print(f"创建数据集 {dataset_id},位置: {dataset.location}")
# (二)创建表
table_id = "my_new_table"
table_ref = dataset_ref.table(table_id)
# 定义表的架构
schema = [
bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("is_active", "BOOLEAN", mode="NULLABLE"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
]
# 检查表是否存在
try:
client.get_table(table_ref)
print(f"表 {table_id} 已存在")
except Exception:
# 创建表
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table)
print(f"创建表 {table_id},有 {len(table.schema)} 个字段")
# (三)插入数据
rows_to_insert = [
(1, "Alice", "[email protected]", 30, True, "2023-01-01T12:00:00Z"),
(2, "Bob", "[email protected]", 25, True, "2023-01-02T13:00:00Z"),
(3, "Charlie", "[email protected]", None, False, "2023-01-03T14:00:00Z"),
]
# 执行插入操作
errors = client.insert_rows(table, rows_to_insert)
if not errors:
print("数据插入成功")
else:
print("插入时发生错误:", errors)
# (四)查询数据
query = f"""
SELECT *
FROM `{dataset_id}.{table_id}`
WHERE is_active = TRUE
ORDER BY created_at DESC
"""
query_job = client.query(query)
results = query_job.result()
print("\n查询结果:")
for row in results:
print(f"ID: {row.id}, 姓名: {row.name}, 邮箱: {row.email}, 年龄: {row.age}, 是否活跃: {row.is_active}")
# (五)更新表架构 - 添加新字段
new_field = bigquery.SchemaField("country", "STRING", mode="NULLABLE")
table = client.get_table(table_ref) # 获取当前表
original_schema = table.schema
new_schema = original_schema[:] # 复制原架构
new_schema.append(new_field) # 添加新字段
table.schema = new_schema
table = client.update_table(table, ["schema"]) # 更新表架构
if len(table.schema) == len(original_schema) + 1:
print(f"\n表架构更新成功,新增字段: {new_field.name}")
# (六)删除表
# 注意:取消下面的注释将删除表
# client.delete_table(table_ref)
# print(f"表 {table_id} 已删除")
# (七)删除数据集
# 注意:取消下面的注释将删除数据集及其所有表
# client.delete_dataset(dataset_ref, delete_contents=True)
# print(f"数据集 {dataset_id} 已删除")
在这个示例中,首先创建了 BigQuery 客户端对象。然后依次进行了以下操作:创建数据集、创建表、向表中插入数据、查询数据、更新表架构(添加新字段),最后注释掉了删除表和数据集的代码,以防止意外删除。这个示例展示了使用 python-bigquery 库进行数据集和表管理的完整流程。
(六)与 Pandas 集成
python-bigquery 库可以与 Pandas 库无缝集成,将查询结果直接转换为 Pandas DataFrame,方便进行数据分析和可视化。以下是一个与 Pandas 集成的示例:
from google.cloud import bigquery
import pandas as pd
import matplotlib.pyplot as plt
# 创建 BigQuery 客户端
client = bigquery.Client()
# 定义 SQL 查询
query = """
SELECT
year,
COUNT(*) AS birth_count,
AVG(weight_pounds) AS average_weight
FROM
`bigquery-public-data.samples.natality`
WHERE
year IS NOT NULL
AND year >= 1990
GROUP BY
year
ORDER BY
year
"""
# 执行查询并将结果转换为 Pandas DataFrame
df = client.query(query).to_dataframe()
# 打印 DataFrame 基本信息和前几行
print("数据基本信息:")
df.info()
print("\n数据前几行:")
print(df.head())
# 可视化出生数量随年份的变化
plt.figure(figsize=(12, 6))
plt.subplot(2, 1, 1)
plt.plot(df['year'], df['birth_count'], 'o-')
plt.title('每年出生数量')
plt.xlabel('年份')
plt.ylabel('出生数量')
plt.grid(True)
# 可视化平均出生体重随年份的变化
plt.subplot(2, 1, 2)
plt.plot(df['year'], df['average_weight'], 's-', color='orange')
plt.title('平均出生体重')
plt.xlabel('年份')
plt.ylabel('平均体重 (磅)')
plt.grid(True)
plt.tight_layout()
plt.savefig('birth_statistics.png')
plt.show()
# 分析数据
max_birth_year = df.loc[df['birth_count'].idxmax()]
min_birth_year = df.loc[df['birth_count'].idxmin()]
print(f"\n出生数量最多的年份: {max_birth_year['year']},数量: {max_birth_year['birth_count']}")
print(f"出生数量最少的年份: {min_birth_year['year']},数量: {min_birth_year['birth_count']}")
# 计算平均出生体重的变化趋势
df['weight_change'] = df['average_weight'].diff()
average_weight_change = df['weight_change'].mean()
print(f"\n平均出生体重的年平均变化: {average_weight_change:.4f} 磅")
在这个示例中,首先创建了 BigQuery 客户端对象。然后执行 SQL 查询,并使用 to_dataframe() 方法将查询结果直接转换为 Pandas DataFrame。接着打印了 DataFrame 的基本信息和前几行数据。使用 Matplotlib 库绘制了两个子图,分别展示了每年的出生数量和平均出生体重的变化趋势。最后,对数据进行了一些分析,找出了出生数量最多和最少的年份,并计算了平均出生体重的年平均变化。
(七)批量查询和分页处理
对于大型查询结果,可能需要进行批量查询和分页处理,以避免一次性获取过多数据导致内存问题。以下是一个批量查询和分页处理的示例:
from google.cloud import bigquery
# 创建 BigQuery 客户端
client = bigquery.Client()
# 定义 SQL 查询
query = """
SELECT
*
FROM
`bigquery-public-data.samples.natality`
WHERE
year = 2000
LIMIT 1000
"""
# 配置查询作业,设置最大结果数和分页大小
job_config = bigquery.QueryJobConfig()
job_config.max_results = 1000 # 最大返回结果数
page_size = 100 # 每页大小
# 执行查询
query_job = client.query(query, job_config=job_config)
# 分页处理结果
total_rows = 0
page_number = 1
# 遍历每个页面
for page in query_job.pages:
print(f"\n--- 第 {page_number} 页 ---")
rows_in_page = 0
# 遍历当前页面中的每一行
for row in page:
# 处理每一行数据
if rows_in_page < 3: # 只打印每页的前3行作为示例
print(f"出生年份: {row.year}, 出生月份: {row.month}, 出生体重: {row.weight_pounds} 磅")
rows_in_page += 1
print(f"当前页行数: {rows_in_page}")
total_rows += rows_in_page
page_number += 1
print(f"\n总处理行数: {total_rows}")
在这个示例中,首先创建了 BigQuery 客户端对象。然后定义了一个 SQL 查询,查询 2000 年的出生记录,并限制最多返回 1000 条记录。配置查询作业时设置了最大结果数和分页大小。执行查询后,使用 query_job.pages 遍历每个页面,再遍历每个页面中的每一行数据。为了避免打印过多数据,只打印了每页的前 3 行作为示例。最后统计并打印了总处理行数。
四、实际案例:分析纽约公共自行车数据
(一)案例背景
纽约市的公共自行车系统(Citi Bike)提供了大量的骑行数据,包括骑行起点、终点、骑行时间等信息。我们可以使用 python-bigquery 库来分析这些数据,了解用户的骑行习惯和模式。
(二)数据准备
首先需要在 BigQuery 中创建一个数据集,并将纽约公共自行车数据导入到该数据集中。这里假设数据已经导入到名为 nyc_bike_share 的数据集中,包含一个名为 trips 的表。
(三)分析代码
以下是一个分析纽约公共自行车数据的完整代码示例:
from google.cloud import bigquery
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
# 设置中文字体
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
# 创建 BigQuery 客户端
client = bigquery.Client()
# (一)查询并分析骑行时长分布
def analyze_trip_duration():
print("\n--- 分析骑行时长分布 ---")
# 查询骑行时长分布(以分钟为单位,限制在 60 分钟内)
query = """
SELECT
FLOOR(tripduration / 60) AS duration_minutes,
COUNT(*) AS trip_count
FROM
`nyc_bike_share.trips`
WHERE
tripduration < 3600 # 只考虑小于 60 分钟的骑行
GROUP BY
duration_minutes
ORDER BY
duration_minutes
"""
# 执行查询并获取结果
df = client.query(query).to_dataframe()
# 打印统计信息
print(f"分析了 {df['trip_count'].sum()} 次骑行")
print("骑行时长分布(前10名):")
print(df.sort_values('trip_count', ascending=False).head(10))
# 可视化骑行时长分布
plt.figure(figsize=(12, 6))
plt.bar(df['duration_minutes'], df['trip_count'], width=0.8)
plt.title('骑行时长分布(分钟)')
plt.xlabel('骑行时长(分钟)')
plt.ylabel('骑行次数')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.savefig('trip_duration_distribution.png')
plt.close()
return df
# (二)分析高峰时段
def analyze_peak_hours():
print("\n--- 分析高峰时段 ---")
# 查询每天各小时的骑行次数
query = """
SELECT
EXTRACT(HOUR FROM starttime) AS hour_of_day,
COUNT(*) AS trip_count
FROM
`nyc_bike_share.trips`
GROUP BY
hour_of_day
ORDER BY
hour_of_day
"""
# 执行查询并获取结果
df = client.query(query).to_dataframe()
# 打印高峰时段
peak_hours = df.sort_values('trip_count', ascending=False).head(3)
print("高峰时段(按骑行次数排序):")
for _, row in peak_hours.iterrows():
print(f"{int(row['hour_of_day'])}:00 - {int(row['hour_of_day'])+1}:00: {int(row['trip_count'])} 次骑行")
# 可视化每天各小时的骑行次数
plt.figure(figsize=(12, 6))
plt.plot(df['hour_of_day'], df['trip_count'], 'o-', color='purple')
plt.title('每天各小时的骑行次数')
plt.xlabel('小时')
plt.ylabel('骑行次数')
plt.xticks(range(0, 24))
plt.grid(True, linestyle='--', alpha=0.7)
plt.savefig('peak_hours.png')
plt.close()
return df
# (三)分析热门骑行路线
def analyze_popular_routes():
print("\n--- 分析热门骑行路线 ---")
# 查询最热门的10条骑行路线(起点和终点组合)
query = """
SELECT
start_station_name,
end_station_name,
COUNT(*) AS trip_count,
AVG(tripduration / 60) AS avg_duration_minutes
FROM
`nyc_bike_share.trips`
GROUP BY
start_station_name, end_station_name
ORDER BY
trip_count DESC
LIMIT 10
"""
# 执行查询并获取结果
df = client.query(query).to_dataframe()
# 打印热门路线
print("最热门的10条骑行路线:")
for i, row in df.iterrows():
print(f"{i+1}. 从 '{row['start_station_name']}' 到 '{row['end_station_name']}': {int(row['trip_count'])} 次骑行, 平均时长 {row['avg_duration_minutes']:.2f} 分钟")
# 创建热门路线的热力图数据
heatmap_data = df.pivot(index='start_station_name', columns='end_station_name', values='trip_count').fillna(0)
# 可视化热门路线热力图
plt.figure(figsize=(12, 8))
sns.heatmap(heatmap_data, annot=True, fmt='g', cmap='YlGnBu')
plt.title('热门骑行路线热力图')
plt.tight_layout()
plt.savefig('popular_routes_heatmap.png')
plt.close()
return df
# (四)分析用户类型分布
def analyze_user_types():
print("\n--- 分析用户类型分布 ---")
# 查询不同用户类型的骑行次数和平均骑行时长
query = """
SELECT
usertype,
COUNT(*) AS trip_count,
AVG(tripduration / 60) AS avg_duration_minutes
FROM
`nyc_bike_share.trips`
WHERE
usertype IS NOT NULL
GROUP BY
usertype
"""
# 执行查询并获取结果
df = client.query(query).to_dataframe()
# 打印用户类型分布
total_trips = df['trip_count'].sum()
for _, row in df.iterrows():
percentage = (row['trip_count'] / total_trips) * 100
print(f"{row['usertype']}: {int(row['trip_count'])} 次骑行 ({percentage:.2f}%), 平均时长 {row['avg_duration_minutes']:.2f} 分钟")
# 可视化用户类型分布
plt.figure(figsize=(10, 6))
plt.pie(df['trip_count'], labels=df['usertype'], autopct='%1.2f%%', startangle=90)
plt.title('用户类型分布')
plt.axis('equal')
plt.savefig('user_type_distribution.png')
plt.close()
return df
# (五)分析季节性趋势
def analyze_seasonal_trends():
print("\n--- 分析季节性趋势 ---")
# 查询每月的骑行次数
query = """
SELECT
EXTRACT(YEAR FROM starttime) AS year,
EXTRACT(MONTH FROM starttime) AS month,
COUNT(*) AS trip_count
FROM
`nyc_bike_share.trips`
GROUP BY
year, month
ORDER BY
year, month
"""
# 执行查询并获取结果
df = client.query(query).to_dataframe()
# 创建年月组合列
df['year_month'] = df.apply(lambda row: f"{int(row['year'])}-{int(row['month']):02d}", axis=1)
# 打印季节性趋势
print("每月骑行次数趋势:")
for _, row in df.iterrows():
print(f"{row['year_month']}: {int(row['trip_count'])} 次骑行")
# 可视化季节性趋势
plt.figure(figsize=(14, 6))
plt.plot(df['year_month'], df['trip_count'], 'o-', color='green')
plt.title('每月骑行次数趋势')
plt.xlabel('年月')
plt.ylabel('骑行次数')
plt.xticks(rotation=45)
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
plt.savefig('seasonal_trends.png')
plt.close()
return df
# (六)分析骑行距离与时长的关系
def analyze_distance_duration():
print("\n--- 分析骑行距离与时长的关系 ---")
# 查询骑行距离和时长(抽样,避免处理过多数据)
query = """
SELECT
tripduration / 60 AS duration_minutes,
ST_DISTANCE(
ST_GEOGPOINT(start_station_longitude, start_station_latitude),
ST_GEOGPOINT(end_station_longitude, end_station_latitude)
) / 1000 AS distance_km
FROM
`nyc_bike_share.trips`
WHERE
tripduration < 3600 -- 只考虑小于 60 分钟的骑行
AND start_station_longitude IS NOT NULL
AND start_station_latitude IS NOT NULL
AND end_station_longitude IS NOT NULL
AND end_station_latitude IS NOT NULL
LIMIT 10000 -- 抽样10000条记录
"""
# 执行查询并获取结果
df = client.query(query).to_dataframe()
# 计算速度(km/h)
df['speed_kmh'] = df['distance_km'] / (df['duration_minutes'] / 60)
# 过滤掉速度异常值(大于30km/h或小于0)
df = df[(df['speed_kmh'] <= 30) & (df['speed_kmh'] >= 0)]
# 打印统计信息
print(f"分析了 {len(df)} 次骑行")
print(f"平均骑行速度: {df['speed_kmh'].mean():.2f} km/h")
print(f"最快骑行速度: {df['speed_kmh'].max():.2f} km/h")
print(f"最慢骑行速度: {df['speed_kmh'].min():.2f} km/h")
# 可视化骑行距离与时长的关系
plt.figure(figsize=(12, 8))
plt.subplot(2, 1, 1)
plt.scatter(df['duration_minutes'], df['distance_km'], alpha=0.3, s=10)
plt.title('骑行距离与时长的关系')
plt.xlabel('骑行时长(分钟)')
plt.ylabel('骑行距离(公里)')
plt.grid(True, linestyle='--', alpha=0.7)
plt.subplot(2, 1, 2)
plt.hist(df['speed_kmh'], bins=20, alpha=0.7, color='orange')
plt.title('骑行速度分布')
plt.xlabel('骑行速度(km/h)')
plt.ylabel('频次')
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
plt.savefig('distance_duration_relationship.png')
plt.close()
return df
# 执行所有分析函数
if __name__ == "__main__":
print(f"开始分析纽约公共自行车数据,时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
analyze_trip_duration()
analyze_peak_hours()
analyze_popular_routes()
analyze_user_types()
analyze_seasonal_trends()
analyze_distance_duration()
print("\n分析完成!所有图表已保存到当前目录")
(四)案例分析结果
通过上述代码,我们对纽约公共自行车数据进行了多方面的分析:
- 骑行时长分布:大多数骑行时长在1-10分钟之间,这表明很多用户使用自行车进行短距离出行。
- 高峰时段:工作日的早晚高峰时段(7-9点和17-19点)骑行次数明显增多,这与通勤时间相吻合。
- 热门骑行路线:金融区和中央公园附近的站点之间的骑行路线最为热门,这些地区是商业和旅游热点。
- 用户类型分布:订阅用户(Members)的骑行次数远多于临时用户(Customers),且平均骑行时长更短,说明订阅用户更倾向于使用自行车进行日常通勤。
- 季节性趋势:骑行次数在夏季明显高于冬季,说明天气对骑行需求有较大影响。
- 骑行距离与时长的关系:骑行速度大致呈正态分布,平均骑行速度约为12-15 km/h,这与城市自行车骑行的正常速度相符。
通过这些分析,我们可以更好地了解纽约公共自行车用户的行为模式,为自行车系统的优化和管理提供参考依据。
五、相关资源
- Pypi地址:https://pypi.org/project/google-cloud-bigquery
- Github地址:https://github.com/googleapis/python-bigquery
- 官方文档地址:https://cloud.google.com/bigquery/docs/reference/libraries#client-libraries-install-python
通过本文的介绍,你已经了解了 python-bigquery 库的基本概念、安装方法、使用方式以及实际案例应用。这个库为 Python 开发者提供了便捷的方式来与 Google BigQuery 进行交互,处理和分析大规模数据集。无论是数据科学家、分析师还是开发人员,都可以利用这个库来挖掘数据价值,做出更明智的决策。希望本文对你学习和使用 python-bigquery 库有所帮助!
关注我,每天分享一个实用的Python自动化工具。

