批量脚本的迭代优化是一个系统性的过程,核心在于从“能跑”到“跑得快、跑得稳、资源省”,这个过程不是一次性的,而是基于数据反馈的持续循环。
下面是一个分步迭代优化的框架,以Python批量脚本为例,但原则适用于Shell、PowerShell等。
第一阶段:建立基线(Before you optimize, you must measure)
在动手优化前,必须先明确现状,否则你无法判断优化是否有效。
-
定义关键指标:
- 总耗时: 批处理全部任务的总时间。
- 吞吐量: 单位时间内处理的任务数(100条记录/秒)。
- 资源消耗: CPU使用率、内存峰值、磁盘I/O、网络带宽。
- 成功率/错误率: 处理失败的任务占比。
-
添加性能监控和日志:
-
在脚本关键节点(开始、结束、每处理N条记录后)打印时间戳和资源使用情况。
-
Python示例:
import time import psutil # pip install psutil import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') process = psutil.Process() def process_batch(items): start_time = time.time() initial_mem = process.memory_info().rss / 1024 / 1024 # MB for i, item in enumerate(items): # ... your processing logic ... if (i+1) % 1000 == 0: # 每1000条输出一次状态 elapsed = time.time() - start_time current_mem = process.memory_info().rss / 1024 / 1024 logging.info(f"Processed {i+1}/{len(items)} | Elapsed: {elapsed:.2f}s | Mem: {current_mem:.1f}MB") total_time = time.time() - start_time logging.info(f"Batch finished. Total: {total_time:.2f}s, Mem delta: {process.memory_info().rss/1024/1024 - initial_mem:.1f}MB")
-
-
执行一次全量预跑:
- 运行脚本,记录下上述所有指标,这就是你的 基线。
- 例子:基线总耗时100秒,内存峰值500MB,吞吐量50条/秒。
第二阶段:定位瓶颈(Pinpoint the bottleneck)
利用基线和性能分析工具找到最慢的部分。
-
CPU密集型(计算慢):
- 工具:
cProfile(Python标准库),py-spy(无需修改代码)。 - 信号: CPU使用率接近100%。
- 寻找: 慢函数、深层循环、复杂计算。
- 工具:
-
I/O密集型(等待慢):
- 工具:
strace(Linux),Process Monitor(Windows),简单的日志时间戳。 - 信号: CPU使用率不高(如5%-20%),但总耗时很长。
- 寻找:
- 磁盘I/O: 大量小文件的读写、频繁的数据库
commit。 - 网络I/O: 串行调用外部API、慢的数据库查询(SQL慢日志)。
- 数据库锁: 死锁或表锁导致等待。
- 磁盘I/O: 大量小文件的读写、频繁的数据库
- 工具:
-
内存瓶颈:
- 工具:
memory_profiler(Python),htop。 - 信号: 内存持续攀升,甚至触发OOM(Out Of Memory)错误或频繁进入Swap。
- 寻找: 一次性加载所有数据到内存、生成的中间结果(如DataFrame副本)未释放。
- 工具:
第三阶段:策略优化(Apply optimizations)
根据定位到的瓶颈类型,选择对应的策略。一次只改一件事,改完就跑测试比较。
针对 I/O 瓶颈(最常见)
-
批量操作(Batching):
- 场景: 逐条插入数据库、逐条写入文件。
- 方法: 累积一个批次(如1000条),然后一次性提交。
- 效果: 效果通常最显著,从
INSERT x1000变为INSERT VALUES(...),(...)...x1000。
-
并行化(Parallelism):
- 场景: 需要处理多个互相独立的文件、调用多个独立的API。
- 方法: 使用
concurrent.futures.ThreadPoolExecutor(I/O密集型)或ProcessPoolExecutor(CPU密集型)。 - 注意: 数据库连接池、API限流(加
threading.Semaphore或time.sleep)。
-
连接复用(Connection Pooling):
- 场景: 数据库或HTTP请求。
- 方法: 使用
requests.Session()(HTTP)或SQLAlchemy的连接池。
-
异步编程(Async/Await):
- 场景: 海量I/O操作,且顺序不重要。
- 方法: 使用
aiohttp、aiomysql、asyncio。
-
优化数据格式:
- 场景: 读/写CSV vs Parquet vs JSON vs 二进制。
- 方法: 对于大数据,使用列式存储格式(如Parquet)或高效的二进制格式(如Protocol Buffers)。
针对 CPU 瓶颈
-
算法优化:
- 检查是否有O(n²)的循环可以优化,能否用
set或dict替代list进行查找?能否用NumPy向量化操作替代Python循环?
- 检查是否有O(n²)的循环可以优化,能否用
-
多进程(Multiprocessing):
利用多核CPU,但要注意进程间通信和数据序列化的开销。
-
使用更快的库:
- 比如用
polars替代pandas(Rust编写,多线程),用Numpy替代纯Python数学计算。
- 比如用
-
增加缓存(Caching / Memoization):
如果同一个函数被用相同参数反复调用,缓存其结果。
针对内存瓶颈
-
使用迭代器/生成器(Iterators/Generators):
- 反例:
data = [process_line(line) for line in file](一次性加载所有)。 - 正例:
for line in file: process_line(line)(逐行处理)。 - 正例:
pandas使用read_csv(chunksize=10000)。
- 反例:
-
立即释放资源:
- 使用
with语句确保文件、数据库连接及时关闭。 - 手动
del大的、不再使用的变量,并强制垃圾回收(import gc; gc.collect())。
- 使用
-
数据下采样:
- 如果可能,只加载需要的列(
pandas: usecols=['col1', 'col2']),或对数据进行聚合后再处理。
- 如果可能,只加载需要的列(
第四阶段:验证与回归(Validate and Repeat)
- A/B 测试: 运行优化后的脚本,使用完全相同的数据集,记录新指标。
- 对比基线:
- 例子: 优化后,总耗时=30秒,内存峰值=150MB,吞吐量=167条/秒。
- 分析: I/O优化节省了60%时间,内存优化节省了70%内存。
- 检查正确性: 优化不能改变结果,运行一个小的数据集,手动核对输出。
- 代码重构: 将优化后的逻辑封装成更清晰的模块,添加注释解释优化原因。
- 重复循环: 找出新的瓶颈,再次迭代,通常经过2-3轮,脚本效率会进入平稳期。
一个典型迭代路线图(案例)
- 初始状态: 处理10万条记录,逐条插入SQLite,耗时600秒。
- Optimization 1 (I/O Batching): 改为每500条批量插入,结果:耗时降至120秒。(最大收益)
- Optimization 2 (I/O Parallelism): 使用
ThreadPoolExecutor同时处理4个不同表,结果:耗时降至45秒。(次大收益) - Optimization 3 (CPU): 数据清洗部分发现用了
for循环,改用Pandas向量化,结果:耗时降至35秒。 - Optimization 4 (Memory): 发现
pandas.read_sql会一次性加载所有结果,改为chunksize,结果:内存峰值从2GB降至200MB。 - 最终状态: 35秒完成,内存稳定,系统资源占用健康。
快速检查清单
- [ ] 基线记录了吗? 没有基线就是盲人摸象。
- [ ] 是I/O还是CPU问题? 看CPU使用率(高->CPU;低->I/O)。
- [ ] 能用批量操作吗? 最优先考虑,通常收益最大。
- [ ] 能用并行/异步吗? 加速I/O密集型任务。
- [ ] 内存会增长吗? 用生成器或分块处理。
- [ ] 有重复计算吗? 加缓存。
- [ ] 测试数据量和生产环境一样吗? 小数据集的优化可能不适用。
这个框架提供了一个“测量 -> 诊断 -> 解决 -> 验证”的闭环,可以系统性地提升任何批量脚本的性能。

