import sys
from colout import colout
with open('app.log', 'r') as f:
for line in f:
if 'DEBUG' in line:
print(colout(line).cyan())
elif 'INFO' in line:
print(colout(line).green())
elif 'WARNING' in line:
print(colout(line).yellow())
elif 'ERROR' in line:
print(colout(line).red().bold())
def add_numbers(a, b):
result = a + b
return result
num1 = 5
num2 = 3
sum_result = add_numbers(num1, num2)
print(sum_result)
我们可以编写一个简单的脚本来实现代码关键部分的高亮显示,创建code_review.py:
import re
from colout import colout
with open('example.py', 'r') as f:
code = f.read()
\# 匹配变量名
variables = re.findall(r'\b([a-zA-Z_][a-zA-Z0-9_]\*)\b(?!\()', code)
for var in variables:
code = code.replace(var, colout(var).blue().underline())
\# 匹配函数名
functions = re.findall(r'\b([a-zA-Z_][a-zA-Z0-9_]\*)\b\(', code)
for func in functions:
code = code.replace(func, colout(func).magenta().bold())
print(code)
from colout import colout
with open('data.txt', 'r') as f:
for line in f:
num = int(line.strip())
if num > 50:
print(colout(str(num)).red())
else:
print(num)
在这个脚本中,我们使用psutil.cpu_percent(interval=1)获取当前系统的 CPU 使用率,interval=1表示获取最近 1 秒内的 CPU 使用率平均值。然后根据 CPU 使用率的不同范围,使用colout库进行不同的颜色标记。当 CPU 使用率大于 80% 时,输出红色加粗的文本;当 CPU 使用率大于 50% 时,输出黄色文本;当 CPU 使用率小于等于 50% 时,输出绿色文本。这样通过颜色的变化,我们可以直观地了解系统 CPU 的负载情况,及时发现系统性能问题。
四、实际案例应用
4.1 案例背景
在一个 Web 应用开发项目中,我们使用 Python 的 Flask 框架搭建了一个简单的博客系统。随着业务的不断发展,系统的日志文件变得越来越庞大,每天都会记录大量的运行状态信息。在排查问题和监控系统时,我们需要快速定位到错误和关键信息,然而,普通的日志输出方式使得这些重要信息在众多的日志内容中难以被发现,这就导致了问题排查效率低下,严重影响了开发和运维的工作进度。因此,我们决定使用 Colout 库来处理日志文件,通过颜色区分不同类型的信息,提高日志的可读性和分析效率。
4.2 具体实现步骤
下面是使用 Colout 库处理 Web 应用日志文件的完整代码:
import sys
from colout import colout
def process_log(log_file_path):
try:
with open(log_file_path, 'r') as f:
for line in f:
if 'ERROR' in line:
print(colout(line).red().bold())
elif 'WARNING' in line:
print(colout(line).yellow())
elif 'INFO' in line:
print(colout(line).green())
else:
print(line.strip())
except FileNotFoundError:
print(colout(f"日志文件 {log_file_path} 未找到").red().bold())
if __name__ == "__main__":
if len(sys.argv) != 2:
print(colout("请提供日志文件路径作为参数").red().bold())
sys.exit(1)
log_file_path = sys.argv[1]
process_log(log_file_path)
逐行解释代码功能:
import sys:导入sys模块,用于处理命令行参数和与 Python 解释器交互。
from colout import colout:从colout库中导入colout对象,用于设置文本的颜色和样式。
"""
Usage:
mytool.py [options] [--] <input>...
mytool.py (-h | --help | --version)
Options:
-h --help Show this screen.
--version Show version.
-o FILE, --output=FILE Output file [default: output.txt].
-v, --verbose Increase verbosity.
-q, --quiet Decrease verbosity.
--encoding=ENCODING Encoding for input/output [default: utf-8].
--filter=FILTER Filter results by FILTER.
--limit=LIMIT Limit the number of results [default: 10].
--format=FORMAT Output format: json, csv, text [default: text].
Examples:
mytool.py file1.txt file2.txt
mytool.py -v --format=json --limit=5 data/*.txt -o results.json
mytool.py --filter="error" logs/*.log
"""
from docopt import docopt
import sys
import os
import json
import csv
def main():
arguments = docopt(__doc__, version='MyTool 1.0')
# 获取输入文件列表
input_files = arguments['<input>']
# 获取选项值
output_file = arguments['--output']
verbose = arguments['--verbose']
quiet = arguments['--quiet']
encoding = arguments['--encoding']
filter_text = arguments['--filter']
limit = int(arguments['--limit'])
format = arguments['--format']
# 检查输入文件是否存在
for filename in input_files:
if not os.path.exists(filename):
print(f"Error: File '{filename}' does not exist.", file=sys.stderr)
sys.exit(1)
# 处理输入文件
results = []
for filename in input_files:
if verbose:
print(f"Processing file: {filename}")
try:
with open(filename, 'r', encoding=encoding) as f:
lines = f.readlines()
# 应用过滤
if filter_text:
lines = [line for line in lines if filter_text in line]
# 应用限制
if limit > 0:
lines = lines[:limit]
results.extend([{
'filename': filename,
'line_number': i + 1,
'content': line.strip()
} for i, line in enumerate(lines)])
except Exception as e:
print(f"Error reading file '{filename}': {str(e)}", file=sys.stderr)
if not quiet:
sys.exit(1)
# 输出结果
if format == 'json':
with open(output_file, 'w', encoding=encoding) as f:
json.dump(results, f, indent=2)
if verbose:
print(f"Results saved to {output_file} in JSON format.")
elif format == 'csv':
with open(output_file, 'w', encoding=encoding, newline='') as f:
writer = csv.DictWriter(f, fieldnames=['filename', 'line_number', 'content'])
writer.writeheader()
writer.writerows(results)
if verbose:
print(f"Results saved to {output_file} in CSV format.")
else: # text format
with open(output_file, 'w', encoding=encoding) as f:
for result in results:
f.write(f"{result['filename']}:{result['line_number']} {result['content']}\n")
if verbose:
print(f"Results saved to {output_file} in text format.")
if __name__ == '__main__':
main()
"""
文件搜索工具
Usage:
file_search.py [options] <search_text> <directory>
file_search.py (-h | --help | --version)
Options:
-h --help Show this screen.
--version Show version.
-r, --recursive Search recursively in subdirectories.
-i, --ignore-case Ignore case when searching.
-e EXT, --extension=EXT Only search files with the given extension (e.g., .txt, .py).
-n, --no-filename Do not show filenames in results.
-m MAX, --max-results=MAX Maximum number of results to show [default: 100].
--encoding=ENCODING Encoding to use when reading files [default: utf-8].
"""
from docopt import docopt
import os
import re
def main():
arguments = docopt(__doc__, version='文件搜索工具 1.0')
search_text = arguments['<search_text>']
directory = arguments['<directory>']
recursive = arguments['--recursive']
ignore_case = arguments['--ignore-case']
extension = arguments['--extension']
no_filename = arguments['--no-filename']
max_results = int(arguments['--max-results'])
encoding = arguments['--encoding']
# 检查目录是否存在
if not os.path.isdir(directory):
print(f"错误: 目录 '{directory}' 不存在。")
return
# 编译正则表达式
flags = re.IGNORECASE if ignore_case else 0
pattern = re.compile(re.escape(search_text), flags)
# 搜索文件
results = []
count = 0
for root, dirs, files in os.walk(directory):
for filename in files:
# 检查文件扩展名
if extension and not filename.endswith(extension):
continue
file_path = os.path.join(root, filename)
# 读取文件内容并搜索
try:
with open(file_path, 'r', encoding=encoding) as f:
for line_num, line in enumerate(f, 1):
if pattern.search(line):
if no_filename:
results.append((None, line_num, line.strip()))
else:
results.append((file_path, line_num, line.strip()))
count += 1
if count >= max_results:
break
if count >= max_results:
break
except (UnicodeDecodeError, PermissionError) as e:
# 忽略无法读取的文件
continue
if count >= max_results:
break
# 如果不递归搜索,则跳过子目录
if not recursive:
break
# 显示结果
if results:
print(f"找到 {len(results)} 个匹配项:")
for file_path, line_num, content in results:
if file_path:
print(f"{file_path}:{line_num}: {content}")
else:
print(f"{line_num}: {content}")
else:
print("未找到匹配项。")
if __name__ == '__main__':
main()
git clone https://github.com/peterbrittain/asciimatics.git
cd asciimatics
python setup.py install
2.2 第一个动画:Hello, Asciimatics!
from asciimatics.screen import Screen
def demo(screen):
# 设置文本位置和样式
x = screen.width // 2 - 10
y = screen.height // 2
text = "Hello, Asciimatics!"
color = Screen.COLOUR_RED
bg_color = Screen.COLOUR_BLACK
# 循环渲染动画
while True:
# 清空屏幕
screen.clear()
# 绘制文本(居中对齐,带阴影效果)
screen.print_at(text, x, y, color=color, bg=bg_color)
screen.print_at(" ", x+1, y+1, bg=bg_color) # 阴影
# 处理用户输入(按Q退出)
ev = screen.get_key()
if ev in (ord('Q'), ord('q')):
return
# 控制帧率
screen.refresh()
# 启动屏幕上下文
Screen.wrapper(demo)
代码解析:
Screen.wrapper(demo):创建屏幕上下文,自动处理终端状态的保存与恢复。
screen.clear():清空当前屏幕内容。
screen.print_at(text, x, y, ...):在指定坐标(x,y)绘制文本,支持颜色、背景色设置。
screen.get_key():阻塞式获取用户按键输入,返回ASCII码(如Q对应113)。
screen.refresh():按帧率刷新屏幕,确保动画流畅。
运行效果:红色文本在终端中央显示,按下Q键退出程序。
三、核心功能与实例演示
3.1 动画效果(Effects)
asciimatics内置多种预定义动画效果,通过Effect子类实现,以下为典型示例。
3.1.1 文本滚动(Marquee)
from asciimatics.effects import Marquee
from asciimatics.scene import Scene
from asciimatics.screen import Screen
def marquee_demo(screen):
effects = [
Marquee(
screen,
text="Scrolling Text Demo! This is a long message that loops infinitely.",
y=screen.height//2,
start_frame=0,
stop_frame=screen.width,
speed=2,
transparent=False
)
]
screen.play([Scene(effects, 500)]) # 持续500帧
Screen.wrapper(marquee_demo)
关键参数:
start_frame/stop_frame:控制文本滚动的水平范围。
speed:滚动速度(像素/帧)。
transparent:是否透明背景(默认False,显示纯色背景)。
3.1.2 烟花效果(Fireworks)
from asciimatics.effects import Fireworks
from asciimatics.scene import Scene
from asciimatics.screen import Screen
def fireworks_demo(screen):
effects = [
Fireworks(
screen,
num_fires=3, # 同时绽放的烟花数量
start_frame=0,
stop_frame=200
)
]
screen.play([Scene(effects, 200)]) # 持续200帧
Screen.wrapper(fireworks_demo)
效果特点:
随机生成烟花爆炸位置,模拟粒子扩散效果。
支持颜色渐变和爆炸音效(需终端支持蜂鸣器)。
3.1.3 自定义动画效果
通过继承Effect类可实现个性化动画,以下为心跳呼吸效果示例:
from asciimatics.effects import Effect
from asciimatics.event import Event
from asciimatics.screen import Screen
class HeartBeat(Effect):
def __init__(self, screen, x, y, text="❤", color=Screen.COLOUR_RED):
super().__init__(screen)
self.x = x
self.y = y
self.text = text
self.color = color
self.scale = 1.0
self.direction = 1 # 1为放大,-1为缩小
def update(self, frame_no):
# 缩放动画逻辑
self.scale += 0.1 * self.direction
if self.scale >= 1.5 or self.scale <= 1.0:
self.direction *= -1
# 绘制带缩放的文本
scaled_text = self.text * int(self.scale)
self.screen.print_at(
scaled_text,
self.x - len(scaled_text)//2,
self.y,
color=self.color,
bg=Screen.COLOUR_BLACK
)
def custom_effect_demo(screen):
heart = HeartBeat(screen, screen.width//2, screen.height//2)
screen.play([Scene([heart], 100)])
Screen.wrapper(custom_effect_demo)
class GreetCommand(Command):
name = "greet"
description = "Greets a person by name"
def configure(self):
# 定义位置参数:name(必填),age(可选,默认值为 18)
self.add_argument("name", description="The person's name")
self.add_argument("age", description="The person's age", default=18, optional=True)
def handle(self):
name = self.argument("name")
age = self.argument("age")
self.line(f"Hello, <comment>{name}</comment>! You are <fg=green>{age}</fg=green> years old.")
调用方式:
# 传递必填参数和可选参数
python my_script.py greet Alice 25
# 仅传递必填参数(age 使用默认值)
python my_script.py greet Bob
输出结果:
Hello, Alice! You are 25 years old.
Hello, Bob! You are 18 years old.
self.line("<fg=magenta bg=white bold>WARNING:</bg=white></fg=magenta> This is a test message.")
self.line("<error>Operation failed! Please check the logs.</error>")
╒════╤════════╤══════════════════╕
│ ID │ Name │ Email │
╞════╪════════╪══════════════════╡
│ 1 │ Alice │ [email protected]│
├────┼────────┼──────────────────┤
│ 2 │ Bob │ [email protected] │
├────┼────────┼──────────────────┤
│ 3 │ Charlie│ [email protected]│
╘════╧════════╧══════════════════╛
3.2.3 交互式输入
通过 io.ask()、io.confirm() 等方法实现与用户的交互式问答。
代码示例:
def handle(self):
name = self.ask("What is your name?", default="Guest") # 带默认值的提问
age = self.ask("How old are you?", type=int) # 类型验证(仅允许输入整数)
confirm = self.confirm(f"Confirm user: {name} ({age} years old)?", default=True) # 确认提问
if confirm:
self.line("<info>User confirmed.</info>")
else:
self.line("<error>Operation cancelled.</error>")
交互流程:
What is your name? (Guest) Alice
How old are you? 25
Confirm user: Alice (25 years old)? [y/n] y
User confirmed.
import shutil
import glob
from cleo import Command
class FileCopyCommand(Command):
name = "copy"
description = "Copy files or directories"
def configure(self):
self.add_argument("src", description="Source file/directory or glob pattern")
self.add_argument("dest", description="Destination path")
self.add_option(
"recursive",
"r",
description="Copy directories recursively",
action="store_true"
)
self.add_option(
"force",
"f",
description="Overwrite existing files",
action="store_true"
)
def handle(self):
src = self.argument("src")
dest = self.argument("dest")
recursive = self.option("recursive")
force = self.option("force")
# 处理通配符路径
sources = glob.glob(src, recursive=recursive)
if not sources:
self.line(f"<error>No files matching pattern: {src}</error>")
return 1
for source in sources:
try:
if os.path.isdir(source):
if not recursive:
self.line(f"<warning>Skipping directory {source} (use -r to copy recursively)</warning>")
continue
# 复制目录
dest_path = os.path.join(dest, os.path.basename(source))
if os.path.exists(dest_path) and not force:
self.line(f"<warning>Directory {dest_path} exists (use -f to overwrite)</warning>")
continue
shutil.copytree(source, dest_path, dirs_exist_ok=force)
self.line(f"<info>Copied directory: {source} -> {dest_path}</info>")
else:
# 复制文件
dest_file = os.path.join(dest, os.path.basename(source)) if os.path.isdir(dest) else dest
if os.path.exists(dest_file) and not force:
self.line(f"<warning>File {dest_file} exists (use -f to overwrite)</warning>")
continue
shutil.copy2(source, dest_file) # 保留元数据
self.line(f"<info>Copied file: {source} -> {dest_file}</info>")
except Exception as e:
self.line(f"<error>Failed to copy {source}: {str(e)}</error>")
4.2.3 目录清理命令(file_tool clean <dir> [--patterns])
import os
import glob
from cleo import Command
class FileCleanCommand(Command):
name = "clean"
description = "Clean temporary files in directory"
def configure(self):
self.add_argument("dir", description="Directory to clean")
self.add_option(
"patterns",
"p",
description="File patterns to delete (comma-separated)",
default="*.tmp,*~" # 默认清理.tmp文件和~结尾文件
)
self.add_option(
"dry-run",
None,
description="Show what would be deleted without actual removal",
action="store_true"
)
self.add_option(
"confirm",
"c",
description="Ask for confirmation before deletion",
action="store_true"
)
def handle(self):
target_dir = self.argument("dir")
if not os.path.isdir(target_dir):
self.line(f"<error>{target_dir} is not a valid directory</error>")
return 1
patterns = self.option("patterns").split(",")
dry_run = self.option("dry-run")
confirm = self.option("confirm")
files_to_delete = []
# 收集匹配的文件
for pattern in patterns:
pattern_path = os.path.join(target_dir, pattern)
files_to_delete.extend(glob.glob(pattern_path))
if not files_to_delete:
self.line("<info>No files matching cleanup patterns found</info>")
return 0
# 显示待删除文件
self.line(f"<comment>Found {len(files_to_delete)} files to delete:</comment>")
for file in files_to_delete:
self.line(f"- {file}")
# 确认流程
if confirm:
if not self.confirm("Proceed with deletion?", default=False):
self.line("<info>Deletion cancelled</info>")
return 0
# 执行删除
deleted = 0
for file in files_to_delete:
try:
if dry_run:
self.line(f"<info>[Dry run] Would delete: {file}</info>")
else:
os.remove(file)
self.line(f"<info>Deleted: {file}</info>")
deleted += 1
except Exception as e:
self.line(f"<error>Failed to delete {file}: {str(e)}</error>")
self.line(f"\n<comment>Summary: {deleted}/{len(files_to_delete)} files processed</comment>")
4.3 工具集成与运行
将三个命令整合到一个应用程序中:
from cleo import Application
from commands.stats import FileStatsCommand
from commands.copy import FileCopyCommand
from commands.clean import FileCleanCommand
if __name__ == "__main__":
app = Application(name="file_tool", version="1.0.0")
app.add(FileStatsCommand())
app.add(FileCopyCommand())
app.add(FileCleanCommand())
app.run()
打包与分发
为方便使用,可通过 poetry 或 setuptools 打包为可执行工具:
# pyproject.toml 示例(使用poetry)
[tool.poetry]
name = “file-tool” version = “1.0.0” description = “A file management CLI tool”
from cleo.testing import CommandTester
def test_greet_command():
command = GreetCommand()
tester = CommandTester(command)
tester.execute("Alice 30")
assert "Hello, Alice! You are 30 years old." in tester.io.fetch_output()