Java开发者转战Python:进阶篇
  📖 系列文章导航
本文是"Java开发者转战Python"系列的第二篇,深入学习Python的进阶特性。
📚 系列:
# 一、Python进阶特性
# 1、异常处理
Python的异常处理机制与Java类似,但语法更简洁,并提供了一些独特的特性。
# 1.1、try-except基本语法
基本用法
# 基本异常捕获
try:
    number = int(input("请输入数字: "))
    result = 10 / number
    print(f"结果: {result}")
except ValueError:
    print("输入的不是有效数字")
except ZeroDivisionError:
    print("不能除以零")
对比Java:
// Java异常处理
try {
    int number = Integer.parseInt(scanner.nextLine());
    int result = 10 / number;
    System.out.println("结果: " + result);
} catch (NumberFormatException e) {
    System.out.println("输入的不是有效数字");
} catch (ArithmeticException e) {
    System.out.println("不能除以零");
}
捕获多个异常
# 方式1:分别捕获
try:
    data = json.loads(json_string)
except ValueError as e:
    print(f"JSON解析错误: {e}")
except KeyError as e:
    print(f"键不存在: {e}")
# 方式2:合并捕获(异常处理逻辑相同时)
try:
    file = open("data.txt")
    data = process(file)
except (FileNotFoundError, PermissionError) as e:
    print(f"文件访问错误: {e}")
# 方式3:捕获所有异常(不推荐用于生产)
try:
    risky_operation()
except Exception as e:
    print(f"发生错误: {e}")
# 1.2、try-except-else-finally
Python的异常处理提供了else和finally子句。
else子句
try:
    file = open("data.txt", "r")
    data = file.read()
except FileNotFoundError:
    print("文件不存在")
else:
    # 只有try块成功时才执行
    print(f"读取了{len(data)}个字符")
    process_data(data)
finally:
    # 无论如何都会执行
    if 'file' in locals():
        file.close()
        print("文件已关闭")
完整示例
def divide(a, b):
    try:
        result = a / b
    except ZeroDivisionError:
        print("错误:除数不能为零")
        return None
    except TypeError:
        print("错误:参数类型错误")
        return None
    else:
        print(f"计算成功: {a} / {b} = {result}")
        return result
    finally:
        print("除法操作完成")
# 使用
divide(10, 2)
# 输出:
# 计算成功: 10 / 2 = 5.0
# 除法操作完成
divide(10, 0)
# 输出:
# 错误:除数不能为零
# 除法操作完成
对比Java:
// Java没有else,但有finally
try {
    result = a / b;
    System.out.println("计算成功");  // 需要手动判断
} catch (ArithmeticException e) {
    System.out.println("错误:除数不能为零");
} finally {
    System.out.println("除法操作完成");
}
# 1.3、异常类型与继承体系
Python的异常都继承自BaseException。
异常层次结构
BaseException
├── SystemExit           # 系统退出
├── KeyboardInterrupt    # 用户中断(Ctrl+C)
├── GeneratorExit        # 生成器退出
└── Exception            # 常规异常的基类
    ├── StopIteration
    ├── ArithmeticError
    │   ├── ZeroDivisionError
    │   ├── OverflowError
    │   └── FloatingPointError
    ├── AssertionError
    ├── AttributeError
    ├── EOFError
    ├── ImportError
    │   └── ModuleNotFoundError
    ├── LookupError
    │   ├── IndexError
    │   └── KeyError
    ├── MemoryError
    ├── NameError
    │   └── UnboundLocalError
    ├── OSError
    │   ├── FileNotFoundError
    │   ├── PermissionError
    │   └── TimeoutError
    ├── RuntimeError
    │   ├── NotImplementedError
    │   └── RecursionError
    ├── TypeError
    ├── ValueError
    │   └── UnicodeError
    └── Warning
常见异常
# ValueError:值错误
int("abc")  # ValueError: invalid literal
# TypeError:类型错误
"string" + 123  # TypeError: can only concatenate str
# KeyError:字典键不存在
{"a": 1}["b"]  # KeyError: 'b'
# IndexError:索引越界
[1, 2, 3][5]  # IndexError: list index out of range
# AttributeError:属性不存在
"string".non_existent  # AttributeError: 'str' object has no attribute
# FileNotFoundError:文件不存在
open("non_existent.txt")  # FileNotFoundError
# ImportError:模块导入失败
import non_existent_module  # ModuleNotFoundError
# 1.4、raise语句
抛出异常
def withdraw(amount, balance):
    if amount <= 0:
        raise ValueError("取款金额必须大于0")
    if amount > balance:
        raise ValueError(f"余额不足:需要{amount},当前{balance}")
    return balance - amount
# 使用
try:
    new_balance = withdraw(1000, 500)
except ValueError as e:
    print(f"操作失败: {e}")
# 输出: 操作失败: 余额不足:需要1000,当前500
重新抛出异常
def process_data(data):
    try:
        result = risky_operation(data)
    except ValueError as e:
        print(f"警告: {e}")
        raise  # 重新抛出原异常
# 或者抛出新异常
def process_file(filename):
    try:
        # 提示:文件操作的详细内容请见第8章"文件操作与I/O"
        with open(filename) as f:
            return f.read()
    except FileNotFoundError:
        raise ValueError(f"配置文件{filename}不存在")
# 1.5、自定义异常
# 基本自定义异常
class ValidationError(Exception):
    """数据验证错误"""
    pass
# 带额外信息的异常
class InsufficientFundsError(Exception):
    """余额不足异常"""
    def __init__(self, balance, amount):
        self.balance = balance
        self.amount = amount
        self.shortage = amount - balance
        super().__init__(f"余额不足:需要{amount},当前{balance},缺少{self.shortage}")
# 使用
def withdraw(balance, amount):
    if amount > balance:
        raise InsufficientFundsError(balance, amount)
    return balance - amount
try:
    new_balance = withdraw(100, 150)
except InsufficientFundsError as e:
    print(e)
    print(f"缺少金额: {e.shortage}")
# 输出:
# 余额不足:需要150,当前100,缺少50
# 缺少金额: 50
异常基类最佳实践
class AppError(Exception):
    """应用程序基础异常"""
    pass
class DatabaseError(AppError):
    """数据库相关错误"""
    pass
class NetworkError(AppError):
    """网络相关错误"""
    pass
class APIError(AppError):
    """API调用错误"""
    def __init__(self, status_code, message):
        self.status_code = status_code
        self.message = message
        super().__init__(f"API错误 {status_code}: {message}")
# 使用层次化捕获
try:
    call_api()
except APIError as e:
    print(f"API调用失败: {e}")
except NetworkError as e:
    print(f"网络错误: {e}")
except AppError as e:
    print(f"应用错误: {e}")
对比Java:
// Java自定义异常
public class InsufficientFundsException extends Exception {
    private double balance;
    private double amount;
    public InsufficientFundsException(double balance, double amount) {
        super("余额不足:需要" + amount + ",当前" + balance);
        this.balance = balance;
        this.amount = amount;
    }
    public double getShortage() {
        return amount - balance;
    }
}
# 1.6、异常链(Exception Chaining)
Python 3支持异常链,保留原始异常信息。
使用from关键字
def parse_config(filename):
    try:
        with open(filename) as f:
            return json.load(f)
    except FileNotFoundError as e:
        raise ValueError(f"配置文件{filename}不存在") from e
    except json.JSONDecodeError as e:
        raise ValueError(f"配置文件格式错误") from e
try:
    config = parse_config("config.json")
except ValueError as e:
    print(f"错误: {e}")
    print(f"原因: {e.__cause__}")
    # 可以追溯到原始异常
隐式异常链
try:
    try:
        result = 10 / 0
    except ZeroDivisionError:
        # 在异常处理中又发生了异常
        undefined_variable  # NameError
except NameError as e:
    print(f"当前异常: {e}")
    print(f"之前的异常: {e.__context__}")
抑制异常链
try:
    result = 10 / 0
except ZeroDivisionError:
    # 使用from None抑制异常链
    raise ValueError("计算错误") from None
# 1.7、实用异常处理模式
资源清理
# ❌ 不推荐
file = open("data.txt")
try:
    data = file.read()
    process(data)
finally:
    file.close()
# ✅ 推荐:使用with语句
with open("data.txt") as file:
    data = file.read()
    process(data)
# 自动关闭文件
多个资源
# 管理多个资源
with open("input.txt") as infile, open("output.txt", "w") as outfile:
    data = infile.read()
    outfile.write(data.upper())
EAFP vs LBYL
Python推崇EAFP(Easier to Ask for Forgiveness than Permission,请求原谅比请求许可更容易)而非LBYL(Look Before You Leap,先检查再行动)。
# LBYL(Java风格)- 不推荐
if key in dictionary:
    value = dictionary[key]
else:
    value = default_value
# EAFP(Python风格)- 推荐
try:
    value = dictionary[key]
except KeyError:
    value = default_value
# 或者使用get方法
value = dictionary.get(key, default_value)
异常传播与日志
import logging
def process_user_data(user_id):
    try:
        user = fetch_user(user_id)
        data = transform_data(user)
        save_data(data)
    except DatabaseError as e:
        logging.error(f"数据库错误处理用户{user_id}: {e}")
        raise  # 重新抛出,让上层处理
    except ValidationError as e:
        logging.warning(f"验证失败用户{user_id}: {e}")
        return None  # 吞掉异常,返回默认值
    except Exception as e:
        logging.critical(f"未知错误处理用户{user_id}: {e}", exc_info=True)
        raise  # 致命错误,必须抛出
小结对比表
| 特性 | Python | Java | 
|---|---|---|
| 基本语法 | try-except |  try-catch | 
| 多异常 | except (E1, E2) |  多个catch块或\| | 
| else子句 | 支持 | 不支持 | 
| finally | 支持 | 支持 | 
| 异常链 | raise ... from |  Throwable.initCause() | 
| 检查异常 | 无 | 有(编译时检查) | 
| 自定义异常 | 继承Exception |  继承Exception | 
Python的异常处理更简洁,但缺少Java的检查异常机制。这让Python更灵活,但也需要开发者更自律地处理异常!
# 2、文件操作与I/O
Python的文件操作比Java简单直观得多,无需处理繁琐的流和缓冲区。
前置知识:异常处理已在第1章"异常处理"中介绍,本章会大量使用
with语句。
# 2.1、文件打开与关闭
基本操作
# 打开文件
file = open("data.txt", "r")  # 读模式
content = file.read()
file.close()  # 必须手动关闭
# ✅ 推荐:使用with语句自动关闭
with open("data.txt", "r") as file:
    content = file.read()
# 自动关闭,即使发生异常也会关闭
文件模式
| 模式 | 说明 | Java对应 | 
|---|---|---|
'r' |  只读(默认) | FileReader | 
'w' |  写入(覆盖) | FileWriter | 
'a' |  追加 | FileWriter(file, true) | 
'x' |  独占创建(文件已存在则失败) | - | 
'b' |  二进制模式 | FileInputStream | 
't' |  文本模式(默认) | - | 
'+' |  读写模式 | RandomAccessFile | 
常用组合
# 文本文件
open("file.txt", "r")      # 读
open("file.txt", "w")      # 写(覆盖)
open("file.txt", "a")      # 追加
open("file.txt", "r+")     # 读写
# 二进制文件
open("image.png", "rb")    # 读二进制
open("image.png", "wb")    # 写二进制
对比Java:
// Java需要更多代码
try (BufferedReader reader = new BufferedReader(
        new FileReader("data.txt"))) {
    String content = reader.readLine();
} catch (IOException e) {
    e.printStackTrace();
}
# 2.2、文本文件读写
读取文件
# 方式1:一次性读取全部内容
with open("data.txt", "r", encoding="utf-8") as f:
    content = f.read()
    print(content)
# 方式2:按行读取(返回列表)
with open("data.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
    for line in lines:
        print(line.strip())  # 去除换行符
# 方式3:逐行迭代(推荐,内存友好)
with open("data.txt", "r", encoding="utf-8") as f:
    for line in f:
        print(line.strip())
# 方式4:读取指定字节数
with open("data.txt", "r") as f:
    chunk = f.read(100)  # 读取前100个字符
写入文件
# 写入字符串
with open("output.txt", "w", encoding="utf-8") as f:
    f.write("Hello, World!\n")
    f.write("第二行\n")
# 写入多行
lines = ["第一行\n", "第二行\n", "第三行\n"]
with open("output.txt", "w", encoding="utf-8") as f:
    f.writelines(lines)
# 追加内容
with open("output.txt", "a", encoding="utf-8") as f:
    f.write("追加的内容\n")
实用示例
# 复制文件
with open("source.txt", "r") as src, open("dest.txt", "w") as dst:
    dst.write(src.read())
# 处理CSV
with open("data.csv", "r") as f:
    for line in f:
        fields = line.strip().split(",")
        print(fields)
# 统计行数
with open("data.txt", "r") as f:
    line_count = sum(1 for line in f)
    print(f"总行数: {line_count}")
# 查找并替换
with open("input.txt", "r") as f:
    content = f.read()
content = content.replace("old", "new")
with open("output.txt", "w") as f:
    f.write(content)
# 2.3、二进制文件处理
读写二进制文件
# 读取二进制文件
with open("image.png", "rb") as f:
    data = f.read()
    print(f"文件大小: {len(data)} 字节")
# 写入二进制文件
with open("output.bin", "wb") as f:
    f.write(b"\x00\x01\x02\x03")
# 复制二进制文件
with open("source.png", "rb") as src, open("dest.png", "wb") as dst:
    dst.write(src.read())
# 分块读取大文件(内存友好)
with open("large_file.bin", "rb") as f:
    while chunk := f.read(8192):  # 每次读8KB
        process(chunk)
使用struct模块处理二进制数据
import struct
# 写入结构化二进制数据
with open("data.bin", "wb") as f:
    # 写入:1个整数,1个浮点数,5个字符
    data = struct.pack("if5s", 42, 3.14, b"hello")
    f.write(data)
# 读取结构化二进制数据
with open("data.bin", "rb") as f:
    data = f.read()
    num, pi, text = struct.unpack("if5s", data)
    print(f"整数: {num}, 浮点: {pi}, 文本: {text.decode()}")
# 2.4、pathlib模块(现代路径操作)
pathlib是Python 3.4+推荐的路径操作方式,面向对象,比os.path更直观。
基本用法
from pathlib import Path
# 创建Path对象
file_path = Path("data/file.txt")
absolute_path = Path("/usr/local/bin/python")
# 获取当前目录
current_dir = Path.cwd()
print(current_dir)
# 获取用户主目录
home_dir = Path.home()
print(home_dir)
# 路径拼接
config_file = Path.home() / "config" / "settings.json"
# 等同于:Path.home().joinpath("config", "settings.json")
路径属性
file_path = Path("/home/user/documents/report.pdf")
print(file_path.name)        # report.pdf(文件名)
print(file_path.stem)        # report(文件名不含扩展名)
print(file_path.suffix)      # .pdf(扩展名)
print(file_path.parent)      # /home/user/documents(父目录)
print(file_path.parents[0])  # /home/user/documents
print(file_path.parents[1])  # /home/user
print(file_path.anchor)      # /(根目录)
print(file_path.parts)       # ('/', 'home', 'user', 'documents', 'report.pdf')
文件系统操作
from pathlib import Path
file_path = Path("data.txt")
# 检查存在性
if file_path.exists():
    print("文件存在")
# 检查类型
if file_path.is_file():
    print("是文件")
if file_path.is_dir():
    print("是目录")
# 读写文件
file_path.write_text("Hello, World!", encoding="utf-8")
content = file_path.read_text(encoding="utf-8")
# 二进制读写
file_path.write_bytes(b"\x00\x01\x02")
data = file_path.read_bytes()
# 创建目录
Path("new_dir").mkdir(exist_ok=True)           # 创建单层目录
Path("parent/child").mkdir(parents=True, exist_ok=True)  # 创建多层
# 删除文件
file_path.unlink(missing_ok=True)  # Python 3.8+
# 遍历目录
for item in Path(".").iterdir():
    print(item)
# 模式匹配查找文件
for py_file in Path(".").glob("*.py"):
    print(py_file)
for py_file in Path(".").rglob("*.py"):  # 递归查找
    print(py_file)
实用示例
from pathlib import Path
# 查找项目中所有Python文件
project_root = Path(".")
python_files = list(project_root.rglob("*.py"))
print(f"找到{len(python_files)}个Python文件")
# 统计代码行数
total_lines = 0
for py_file in python_files:
    lines = py_file.read_text().count("\n")
    total_lines += lines
print(f"总行数: {total_lines}")
# 安全地处理配置文件
config_file = Path.home() / ".myapp" / "config.json"
if not config_file.exists():
    config_file.parent.mkdir(parents=True, exist_ok=True)
    config_file.write_text('{"default": true}')
对比Java:
// Java使用Path和Files(Java 7+)
import java.nio.file.*;
Path filePath = Paths.get("/home/user/data.txt");
String content = Files.readString(filePath);
Files.writeString(filePath, "Hello");
// 遍历目录
try (var stream = Files.list(Paths.get("."))) {
    stream.forEach(System.out::println);
}
# 2.5、标准输入输出
输入
# input():读取一行输入
name = input("请输入姓名: ")
print(f"你好, {name}!")
# 读取数字
age = int(input("请输入年龄: "))
# 读取多个值
x, y = map(int, input("输入两个数字(空格分隔): ").split())
输出
# print():标准输出
print("Hello, World!")
# 多个参数
print("姓名:", name, "年龄:", age)
# 自定义分隔符和结束符
print("a", "b", "c", sep="-")      # a-b-c
print("loading", end="...")        # loading...(不换行)
print(" done!")                    # loading... done!
# 写入文件
with open("output.txt", "w") as f:
    print("写入文件", file=f)
格式化输出
name = "张三"
age = 25
score = 95.678
# f-string(推荐)
print(f"姓名: {name}, 年龄: {age}, 成绩: {score:.2f}")
# format方法
print("姓名: {}, 年龄: {}, 成绩: {:.2f}".format(name, age, score))
# %格式化(老式)
print("姓名: %s, 年龄: %d, 成绩: %.2f" % (name, age, score))
小结对比表
| 操作 | Python | Java | 
|---|---|---|
| 打开文件 | open() |  FileReader/BufferedReader | 
| 自动关闭 | with语句 |  try-with-resources | 
| 读取全部 | read() |  readAll()或循环读取 | 
| 逐行读取 | for line in f |  BufferedReader.readLine() | 
| 路径操作 | pathlib.Path |  java.nio.file.Path | 
| 文件存在 | path.exists() |  Files.exists() | 
| 创建目录 | path.mkdir() |  Files.createDirectories() | 
Python的文件I/O操作比Java简洁优雅得多,pathlib模块提供了现代化的路径处理方式!
# 3、迭代器与生成器
迭代器和生成器是Python的强大特性,能够高效处理大数据集和实现惰性求值。
# 3.1、迭代器协议
Python的迭代器基于两个魔术方法:__iter__和__next__。
基本概念
# 可迭代对象(Iterable):实现了__iter__方法
numbers = [1, 2, 3, 4, 5]
iterator = iter(numbers)  # 获取迭代器
# 迭代器(Iterator):实现了__iter__和__next__方法
print(next(iterator))  # 1
print(next(iterator))  # 2
print(next(iterator))  # 3
# 迭代完毕后抛出StopIteration
# next(iterator)  # 迭代完毕会抛出StopIteration
自定义迭代器
class Countdown:
    """倒计时迭代器"""
    def __init__(self, start):
        self.current = start
    def __iter__(self):
        return self
    def __next__(self):
        if self.current <= 0:
            raise StopIteration
        self.current -= 1
        return self.current + 1
# 使用
for num in Countdown(5):
    print(num)  # 5, 4, 3, 2, 1
实用迭代器示例
class FileReader:
    """逐行读取文件的迭代器"""
    def __init__(self, filename):
        self.file = open(filename, 'r')
    def __iter__(self):
        return self
    def __next__(self):
        line = self.file.readline()
        if not line:
            self.file.close()
            raise StopIteration
        return line.strip()
# 使用
for line in FileReader("data.txt"):
    print(line)
对比Java:
// Java使用Iterator接口
Iterator<Integer> iterator = numbers.iterator();
while (iterator.hasNext()) {
    System.out.println(iterator.next());
}
# 3.2、生成器函数(yield)
生成器是创建迭代器的最简单方式,使用yield关键字。
基本用法
def countdown(n):
    """倒计时生成器"""
    while n > 0:
        yield n
        n -= 1
# 使用
for num in countdown(5):
    print(num)  # 5, 4, 3, 2, 1
# 生成器是迭代器
gen = countdown(3)
print(next(gen))  # 3
print(next(gen))  # 2
print(next(gen))  # 1
# print(next(gen))  # StopIteration
工作原理
def simple_generator():
    print("开始")
    yield 1
    print("继续")
    yield 2
    print("结束")
    yield 3
gen = simple_generator()
print("创建生成器")
print(next(gen))  # 开始 -> 1
print(next(gen))  # 继续 -> 2
print(next(gen))  # 结束 -> 3
实用示例
# 1. 斐波那契数列
def fibonacci(n):
    """生成前n个斐波那契数"""
    a, b = 0, 1
    count = 0
    while count < n:
        yield a
        a, b = b, a + b
        count += 1
print(list(fibonacci(10)))
# [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
# 2. 逐行读取大文件(内存友好)
def read_large_file(file_path):
    """逐行读取文件"""
    with open(file_path) as f:
        for line in f:
            yield line.strip()
for line in read_large_file("huge_file.txt"):
    process(line)  # 一次只加载一行到内存
# 3. 批量处理数据
def batch(items, size):
    """将数据分批"""
    batch = []
    for item in items:
        batch.append(item)
        if len(batch) == size:
            yield batch
            batch = []
    if batch:  # 最后一批
        yield batch
# 使用
data = range(100)
for batch in batch(data, 10):
    print(f"处理批次: {batch}")
# 4. 无限序列
def infinite_sequence():
    """无限递增序列"""
    num = 0
    while True:
        yield num
        num += 1
gen = infinite_sequence()
print(next(gen))  # 0
print(next(gen))  # 1
print(next(gen))  # 2
# 3.3、生成器表达式
生成器表达式是列表推导式的生成器版本,使用圆括号。
注意:列表推导式和生成器表达式的完整用法,请参考第5章"数据结构与推导式"。
基本语法
# 列表推导式:立即计算,占用内存
squares_list = [x**2 for x in range(1000000)]
# 生成器表达式:惰性计算,节省内存
squares_gen = (x**2 for x in range(1000000))
# 使用
for square in squares_gen:
    if square > 100:
        break
    print(square)
对比
import sys
# 列表推导式
list_comp = [x for x in range(10000)]
print(f"列表大小: {sys.getsizeof(list_comp)} bytes")  # ~87616 bytes
# 生成器表达式
gen_expr = (x for x in range(10000))
print(f"生成器大小: {sys.getsizeof(gen_expr)} bytes")  # ~112 bytes
实用示例
# 1. 过滤和转换
numbers = range(1, 11)
even_squares = (x**2 for x in numbers if x % 2 == 0)
print(list(even_squares))  # [4, 16, 36, 64, 100]
# 2. 链式处理
lines = (line.strip() for line in open("data.txt"))
non_empty = (line for line in lines if line)
uppercase = (line.upper() for line in non_empty)
for line in uppercase:
    print(line)
# 3. 作为函数参数
sum_of_squares = sum(x**2 for x in range(10))
print(sum_of_squares)  # 285
max_value = max((x**2 for x in range(10)))
print(max_value)  # 81
# 4. 内存友好的数据处理
total = sum(int(line) for line in open("numbers.txt"))
# 3.4、itertools模块
itertools提供了高效的迭代器工具。
无限迭代器
from itertools import count, cycle, repeat
# count:无限计数
for i in count(10, 2):  # 从10开始,步长2
    if i > 20:
        break
    print(i)  # 10, 12, 14, 16, 18, 20
# cycle:无限循环
counter = 0
for item in cycle(['A', 'B', 'C']):
    if counter >= 5:
        break
    print(item, end=" ")  # A B C A B
    counter += 1
# repeat:重复元素
for item in repeat('Hello', 3):
    print(item)  # Hello(3次)
组合迭代器
from itertools import chain, zip_longest, product, combinations, permutations
# chain:连接多个迭代器
for item in chain([1, 2], [3, 4], [5, 6]):
    print(item, end=" ")  # 1 2 3 4 5 6
# zip_longest:配对(补齐)
for pair in zip_longest([1, 2, 3], ['a', 'b'], fillvalue='?'):
    print(pair)  # (1, 'a'), (2, 'b'), (3, '?')
# product:笛卡尔积
for pair in product([1, 2], ['a', 'b']):
    print(pair)  # (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')
# combinations:组合
for combo in combinations([1, 2, 3, 4], 2):
    print(combo)  # (1,2), (1,3), (1,4), (2,3), (2,4), (3,4)
# permutations:排列
for perm in permutations([1, 2, 3], 2):
    print(perm)  # (1,2), (1,3), (2,1), (2,3), (3,1), (3,2)
过滤和分组
from itertools import filterfalse, dropwhile, takewhile, groupby
# filterfalse:过滤假值
data = [1, 0, 2, 0, 3]
non_zero = filterfalse(lambda x: x == 0, data)
print(list(non_zero))  # [1, 2, 3]
# dropwhile:丢弃直到条件为假
numbers = [1, 3, 5, 6, 7, 8, 9]
result = dropwhile(lambda x: x < 6, numbers)
print(list(result))  # [6, 7, 8, 9]
# takewhile:获取直到条件为假
result = takewhile(lambda x: x < 6, numbers)
print(list(result))  # [1, 3, 5]
# groupby:分组
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('A', 5)]
for key, group in groupby(data, lambda x: x[0]):
    print(f"{key}: {list(group)}")
# A: [('A', 1), ('A', 2)]
# B: [('B', 3), ('B', 4)]
# A: [('A', 5)]
实用组合
from itertools import islice, tee, accumulate
# islice:切片迭代器
gen = (x**2 for x in range(100))
first_10 = list(islice(gen, 10))
print(first_10)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# tee:复制迭代器
gen1, gen2 = tee(range(5), 2)
print(list(gen1))  # [0, 1, 2, 3, 4]
print(list(gen2))  # [0, 1, 2, 3, 4]
# accumulate:累积
from operator import mul
numbers = [1, 2, 3, 4, 5]
print(list(accumulate(numbers)))        # [1, 3, 6, 10, 15](累加)
print(list(accumulate(numbers, mul)))   # [1, 2, 6, 24, 120](累乘)
# 3.5、惰性求值优势
生成器的惰性求值特性带来巨大优势。
内存效率
# ❌ 列表:立即计算,占用大量内存
def process_data_list():
    data = [expensive_operation(x) for x in range(1000000)]
    return sum(data)
# ✅ 生成器:惰性计算,内存占用小
def process_data_generator():
    data = (expensive_operation(x) for x in range(1000000))
    return sum(data)
无限序列
# 生成器可以表示无限序列
def primes():
    """无限素数生成器"""
    yield 2
    candidates = count(3, 2)
    while True:
        prime = next(candidates)
        yield prime
        candidates = (x for x in candidates if x % prime != 0)
# 取前10个素数
prime_gen = primes()
first_10_primes = [next(prime_gen) for _ in range(10)]
print(first_10_primes)  # [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]
管道式处理
# 数据处理管道
def read_log(filename):
    """读取日志"""
    with open(filename) as f:
        for line in f:
            yield line.strip()
def filter_errors(lines):
    """过滤错误"""
    for line in lines:
        if 'ERROR' in line:
            yield line
def parse_timestamp(lines):
    """解析时间戳"""
    for line in lines:
        # 假设格式:[2024-01-01 10:00:00] ERROR: message
        yield line.split(']')[0][1:]
# 管道组合
lines = read_log("app.log")
errors = filter_errors(lines)
timestamps = parse_timestamp(errors)
# 惰性执行,只在需要时处理
for ts in timestamps:
    print(ts)
小结对比表
| 特性 | 列表 | 生成器 | Java Stream | 
|---|---|---|---|
| 求值方式 | 立即 | 惰性 | 惰性 | 
| 内存占用 | 大 | 小 | 小 | 
| 可重复使用 | ✅ | ❌ | ❌ | 
| 索引访问 | ✅ | ❌ | ❌ | 
| 长度获取 | ✅ | ❌ | ❌ | 
| 无限序列 | ❌ | ✅ | ✅ | 
# 4、装饰器
装饰器是Python的强大特性,允许在不修改原函数/类的情况下增强其功能。这类似于Java的注解+AOP,但更灵活强大。
前置知识:装饰器基于闭包概念,闭包已在第6章"函数定义与使用"中介绍。
# 4.1、函数装饰器基础
基本概念
装饰器本质是一个接受函数作为参数并返回新函数的高阶函数。
# 最简单的装饰器
def my_decorator(func):
    """装饰器函数"""
    def wrapper():
        print("函数执行前")
        func()
        print("函数执行后")
    return wrapper
# 使用装饰器(方式1:手动包装)
def say_hello():
    print("Hello!")
say_hello = my_decorator(say_hello)
say_hello()
# 输出:
# 函数执行前
# Hello!
# 函数执行后
# 使用装饰器(方式2:@语法糖)
@my_decorator
def say_world():
    print("World!")
say_world()
# 输出:
# 函数执行前
# World!
# 函数执行后
带参数的函数装饰
def my_decorator(func):
    def wrapper(*args, **kwargs):
        """接受任意参数"""
        print(f"调用 {func.__name__},参数: {args}, {kwargs}")
        result = func(*args, **kwargs)
        print(f"返回值: {result}")
        return result
    return wrapper
@my_decorator
def add(a, b):
    return a + b
@my_decorator
def greet(name, greeting="Hello"):
    return f"{greeting}, {name}!"
# 使用
result = add(3, 5)
# 调用 add,参数: (3, 5), {}
# 返回值: 8
message = greet("张三", greeting="你好")
# 调用 greet,参数: ('张三',), {'greeting': '你好'}
# 返回值: 你好, 张三!
对比Java:
// Java需要使用注解+AOP或代理模式
@Around("@annotation(LogExecution)")
public Object logExecution(ProceedingJoinPoint joinPoint) throws Throwable {
    System.out.println("方法执行前");
    Object result = joinPoint.proceed();
    System.out.println("方法执行后");
    return result;
}
@LogExecution
public void sayHello() {
    System.out.println("Hello!");
}
# 4.2、functools.wraps保留元信息
装饰器会改变函数的元信息,functools.wraps用于保留原函数的元数据。
from functools import wraps
# ❌ 不使用wraps
def bad_decorator(func):
    def wrapper(*args, **kwargs):
        """这是wrapper的文档"""
        return func(*args, **kwargs)
    return wrapper
@bad_decorator
def my_function():
    """这是my_function的文档"""
    pass
print(my_function.__name__)  # wrapper(错误!)
print(my_function.__doc__)   # 这是wrapper的文档(错误!)
# ✅ 使用wraps
def good_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        """这是wrapper的文档"""
        return func(*args, **kwargs)
    return wrapper
@good_decorator
def my_function2():
    """这是my_function2的文档"""
    pass
print(my_function2.__name__)  # my_function2(正确!)
print(my_function2.__doc__)   # 这是my_function2的文档(正确!)
标准装饰器模板
from functools import wraps
def my_decorator(func):
    """标准装饰器模板"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 执行前的逻辑
        print(f"Before calling {func.__name__}")
        # 调用原函数
        result = func(*args, **kwargs)
        # 执行后的逻辑
        print(f"After calling {func.__name__}")
        return result
    return wrapper
# 4.3、带参数的装饰器
装饰器本身也可以接受参数。
from functools import wraps
def repeat(times):
    """重复执行装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for i in range(times):
                print(f"第{i+1}次执行:")
                result = func(*args, **kwargs)
            return result
        return wrapper
    return decorator
@repeat(3)
def greet(name):
    print(f"Hello, {name}!")
greet("张三")
# 输出:
# 第1次执行:
# Hello, 张三!
# 第2次执行:
# Hello, 张三!
# 第3次执行:
# Hello, 张三!
工作原理
# @repeat(3) 等价于:
# greet = repeat(3)(greet)
# 分步理解:
decorator = repeat(3)      # 调用repeat(3),返回decorator函数
greet = decorator(greet)   # 调用decorator(greet),返回wrapper
实用带参数装饰器示例
from functools import wraps
import time
def retry(max_attempts=3, delay=1):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise e
                    print(f"第{attempt + 1}次尝试失败,{delay}秒后重试...")
                    time.sleep(delay)
        return wrapper
    return decorator
@retry(max_attempts=5, delay=2)
def unstable_api_call():
    """不稳定的API调用"""
    import random
    if random.random() < 0.7:
        raise ConnectionError("API调用失败")
    return "成功"
# 使用
result = unstable_api_call()
# 4.4、类装饰器
类也可以作为装饰器,通过实现__call__方法。
from functools import wraps
class CountCalls:
    """统计函数调用次数"""
    def __init__(self, func):
        wraps(func)(self)
        self.func = func
        self.count = 0
    def __call__(self, *args, **kwargs):
        self.count += 1
        print(f"{self.func.__name__} 已被调用 {self.count} 次")
        return self.func(*args, **kwargs)
@CountCalls
def say_hello():
    print("Hello!")
say_hello()  # say_hello 已被调用 1 次 -> Hello!
say_hello()  # say_hello 已被调用 2 次 -> Hello!
say_hello()  # say_hello 已被调用 3 次 -> Hello!
print(say_hello.count)  # 3
带参数的类装饰器
from functools import wraps
class LogWith:
    """带日志级别的装饰器"""
    def __init__(self, level="INFO"):
        self.level = level
    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            print(f"[{self.level}] 调用 {func.__name__}")
            result = func(*args, **kwargs)
            print(f"[{self.level}] {func.__name__} 返回 {result}")
            return result
        return wrapper
@LogWith(level="DEBUG")
def add(a, b):
    return a + b
result = add(3, 5)
# [DEBUG] 调用 add
# [DEBUG] add 返回 8
# 4.5、装饰器叠加
多个装饰器可以叠加使用。
def make_bold(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        return "<b>" + func(*args, **kwargs) + "</b>"
    return wrapper
def make_italic(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        return "<i>" + func(*args, **kwargs) + "</i>"
    return wrapper
@make_bold
@make_italic
def say_hello():
    return "Hello!"
print(say_hello())  # <b><i>Hello!</i></b>
# 等价于:
# say_hello = make_bold(make_italic(say_hello))
# 执行顺序:从下到上装饰,从上到下执行
顺序理解
def decorator1(func):
    print(f"装饰器1 装饰 {func.__name__}")
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("装饰器1: 执行前")
        result = func(*args, **kwargs)
        print("装饰器1: 执行后")
        return result
    return wrapper
def decorator2(func):
    print(f"装饰器2 装饰 {func.__name__}")
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("装饰器2: 执行前")
        result = func(*args, **kwargs)
        print("装饰器2: 执行后")
        return result
    return wrapper
@decorator1
@decorator2
def test():
    print("执行test函数")
# 装饰阶段输出:
# 装饰器2 装饰 test
# 装饰器1 装饰 wrapper
test()
# 执行阶段输出:
# 装饰器1: 执行前
# 装饰器2: 执行前
# 执行test函数
# 装饰器2: 执行后
# 装饰器1: 执行后
# 4.6、常用内置装饰器
Python提供了一些常用的内置装饰器。
@property(属性装饰器)
详细内容:property装饰器的完整用法已在第3章"面向对象编程"中详细介绍。
class Circle:
    def __init__(self, radius):
        self._radius = radius
    @property
    def radius(self):
        """半径(只读)"""
        return self._radius
    @property
    def area(self):
        """面积(计算属性)"""
        return 3.14159 * self._radius ** 2
    @area.setter
    def area(self, value):
        """通过面积反推半径"""
        self._radius = (value / 3.14159) ** 0.5
circle = Circle(5)
print(circle.area)    # 78.53975
circle.area = 100
print(circle.radius)  # 5.641895835477563
@staticmethod(静态方法)
class MathUtils:
    @staticmethod
    def add(a, b):
        """静态方法,不需要访问类或实例"""
        return a + b
    @staticmethod
    def is_even(n):
        return n % 2 == 0
# 使用
print(MathUtils.add(3, 5))      # 8
print(MathUtils.is_even(4))     # True
@classmethod(类方法)
class Date:
    def __init__(self, year, month, day):
        self.year = year
        self.month = month
        self.day = day
    @classmethod
    def from_string(cls, date_string):
        """工厂方法:从字符串创建"""
        year, month, day = map(int, date_string.split('-'))
        return cls(year, month, day)
    @classmethod
    def today(cls):
        """工厂方法:创建今天的日期"""
        import datetime
        now = datetime.date.today()
        return cls(now.year, now.month, now.day)
# 使用
date1 = Date.from_string("2024-10-26")
date2 = Date.today()
@cached_property(缓存属性)
from functools import cached_property
class DataProcessor:
    def __init__(self, data):
        self.data = data
    @cached_property
    def processed_data(self):
        """计算量大的属性,只计算一次"""
        print("处理数据中...")
        import time
        time.sleep(2)  # 模拟耗时操作
        return [x * 2 for x in self.data]
processor = DataProcessor([1, 2, 3, 4, 5])
print(processor.processed_data)  # 处理数据中... [2, 4, 6, 8, 10]
print(processor.processed_data)  # [2, 4, 6, 8, 10](直接返回缓存)
# 4.7、装饰器实战案例
案例1:性能计时器
from functools import wraps
import time
def timer(func):
    """测量函数执行时间"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} 执行时间: {end - start:.4f}秒")
        return result
    return wrapper
@timer
def slow_function():
    time.sleep(1)
    return "完成"
result = slow_function()
# slow_function 执行时间: 1.0012秒
案例2:权限检查
from functools import wraps
def require_auth(func):
    """检查用户是否已登录"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 假设有一个全局的当前用户对象
        if not hasattr(wrapper, 'current_user') or not wrapper.current_user:
            raise PermissionError("需要登录")
        return func(*args, **kwargs)
    return wrapper
def require_role(role):
    """检查用户角色"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if not hasattr(wrapper, 'current_user'):
                raise PermissionError("需要登录")
            if wrapper.current_user.get('role') != role:
                raise PermissionError(f"需要{role}权限")
            return func(*args, **kwargs)
        return wrapper
    return decorator
@require_auth
def view_profile():
    return "个人资料"
@require_role('admin')
def delete_user(user_id):
    return f"删除用户 {user_id}"
# 设置当前用户
view_profile.current_user = {"username": "张三", "role": "user"}
delete_user.current_user = {"username": "管理员", "role": "admin"}
print(view_profile())      # 个人资料
print(delete_user(123))    # 删除用户 123
案例3:缓存/记忆化
from functools import wraps
def memoize(func):
    """缓存函数结果"""
    cache = {}
    @wraps(func)
    def wrapper(*args):
        if args not in cache:
            print(f"计算 {func.__name__}{args}")
            cache[args] = func(*args)
        else:
            print(f"从缓存获取 {args}")
        return cache[args]
    # 添加清除缓存的方法
    wrapper.cache = cache
    wrapper.clear_cache = lambda: cache.clear()
    return wrapper
@memoize
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n-1) + fibonacci(n-2)
print(fibonacci(10))
# 计算 fibonacci(10)
# 计算 fibonacci(9)
# ...
# 55
print(fibonacci(10))  # 从缓存获取 (10)
print(fibonacci.cache)  # 查看缓存内容
fibonacci.clear_cache()  # 清除缓存
案例4:输入验证
from functools import wraps
def validate_types(**type_hints):
    """验证函数参数类型"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 获取函数签名
            import inspect
            sig = inspect.signature(func)
            bound = sig.bind(*args, **kwargs)
            bound.apply_defaults()
            # 验证类型
            for param_name, expected_type in type_hints.items():
                if param_name in bound.arguments:
                    value = bound.arguments[param_name]
                    if not isinstance(value, expected_type):
                        raise TypeError(
                            f"{param_name} 应为 {expected_type.__name__},"
                            f"实际为 {type(value).__name__}"
                        )
            return func(*args, **kwargs)
        return wrapper
    return decorator
@validate_types(name=str, age=int, salary=float)
def create_employee(name, age, salary):
    return f"{name}, {age}岁, 薪资{salary}"
# 正确调用
print(create_employee("张三", 25, 5000.0))
# 错误调用
try:
    create_employee("李四", "30", 6000.0)  # age类型错误
except TypeError as e:
    print(e)  # age 应为 int,实际为 str
案例5:日志记录
from functools import wraps
import logging
logging.basicConfig(level=logging.INFO)
def log_calls(log_args=True, log_result=True):
    """记录函数调用日志"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            logger = logging.getLogger(func.__module__)
            # 记录调用
            if log_args:
                logger.info(f"调用 {func.__name__},参数: {args}, {kwargs}")
            else:
                logger.info(f"调用 {func.__name__}")
            try:
                result = func(*args, **kwargs)
                # 记录结果
                if log_result:
                    logger.info(f"{func.__name__} 返回: {result}")
                return result
            except Exception as e:
                logger.error(f"{func.__name__} 抛出异常: {e}")
                raise
        return wrapper
    return decorator
@log_calls(log_args=True, log_result=True)
def divide(a, b):
    return a / b
result = divide(10, 2)
# INFO:__main__:调用 divide,参数: (10, 2), {}
# INFO:__main__:divide 返回: 5.0
try:
    divide(10, 0)
except ZeroDivisionError:
    pass
# INFO:__main__:调用 divide,参数: (10, 0), {}
# ERROR:__main__:divide 抛出异常: division by zero
案例6:单例模式
from functools import wraps
def singleton(cls):
    """单例装饰器"""
    instances = {}
    @wraps(cls)
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return get_instance
@singleton
class DatabaseConnection:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        print(f"连接到数据库 {host}:{port}")
# 使用
db1 = DatabaseConnection("localhost", 5432)
# 连接到数据库 localhost:5432
db2 = DatabaseConnection("localhost", 5432)
# (不会打印,因为返回的是同一个实例)
print(db1 is db2)  # True
小结对比表
| 特性 | Python装饰器 | Java注解+AOP | 
|---|---|---|
| 语法 | @decorator |  @Annotation | 
| 灵活性 | 高(运行时修改行为) | 中(需要额外配置) | 
| 嵌套 | 支持多层装饰 | 支持 | 
| 带参数 | 支持 | 支持 | 
| 类装饰 | 支持装饰类 | 主要用于方法 | 
| 内省 | 容易(__wrapped__) |  需要反射 | 
Python的装饰器比Java的注解更灵活强大,可以在运行时动态修改函数/类的行为,而且语法更简洁!
# 5、上下文管理器
上下文管理器让资源管理变得优雅安全,自动处理资源的获取和释放。这类似于Java的try-with-resources,但更灵活。
# 5.1、with语句原理
基本概念
with语句确保资源在使用后被正确清理,即使发生异常也能保证。
# 传统方式:需要手动管理
file = open("data.txt", "r")
try:
    content = file.read()
    process(content)
finally:
    file.close()  # 必须手动关闭
# with语句:自动管理
with open("data.txt", "r") as file:
    content = file.read()
    process(content)
# 自动关闭文件,即使发生异常
对比Java:
// Java try-with-resources(Java 7+)
try (BufferedReader reader = new BufferedReader(new FileReader("data.txt"))) {
    String content = reader.readLine();
    process(content);
} // 自动关闭资源
上下文管理器协议
上下文管理器需要实现__enter__和__exit__方法:
class MyContextManager:
    def __enter__(self):
        """进入with块时调用"""
        print("进入上下文")
        return self  # 返回值赋给as后的变量
    def __exit__(self, exc_type, exc_value, traceback):
        """退出with块时调用"""
        print("退出上下文")
        if exc_type is not None:
            print(f"发生异常: {exc_type.__name__}: {exc_value}")
        return False  # False表示不抑制异常
# 使用
with MyContextManager() as cm:
    print("执行代码")
    # raise ValueError("测试异常")
# 输出:
# 进入上下文
# 执行代码
# 退出上下文
__exit__方法参数说明
def __exit__(self, exc_type, exc_value, traceback):
    """
    exc_type: 异常类型(如果没有异常则为None)
    exc_value: 异常实例(如果没有异常则为None)
    traceback: 异常堆栈(如果没有异常则为None)
    返回值:
    - False或None: 异常继续传播
    - True: 抑制异常(异常被吞掉)
    """
    pass
# 5.2、自定义上下文管理器
文件操作管理器
class FileManager:
    """文件管理器"""
    def __init__(self, filename, mode):
        self.filename = filename
        self.mode = mode
        self.file = None
    def __enter__(self):
        """打开文件"""
        print(f"打开文件: {self.filename}")
        self.file = open(self.filename, self.mode)
        return self.file
    def __exit__(self, exc_type, exc_value, traceback):
        """关闭文件"""
        if self.file:
            print(f"关闭文件: {self.filename}")
            self.file.close()
        return False
# 使用
with FileManager("test.txt", "w") as f:
    f.write("Hello, World!")
# 输出:
# 打开文件: test.txt
# 关闭文件: test.txt
数据库连接管理器
class DatabaseConnection:
    """数据库连接管理器"""
    def __init__(self, host, database):
        self.host = host
        self.database = database
        self.connection = None
    def __enter__(self):
        """建立连接"""
        print(f"连接数据库: {self.host}/{self.database}")
        # 这里模拟连接
        self.connection = f"Connection({self.host}, {self.database})"
        return self.connection
    def __exit__(self, exc_type, exc_value, traceback):
        """关闭连接"""
        print("关闭数据库连接")
        self.connection = None
        # 如果有异常,回滚事务
        if exc_type is not None:
            print("发生异常,回滚事务")
        else:
            print("提交事务")
        return False
# 使用
with DatabaseConnection("localhost", "mydb") as conn:
    print(f"使用连接: {conn}")
    # 执行数据库操作
计时器上下文管理器
import time
class Timer:
    """计时器上下文管理器"""
    def __init__(self, name="代码块"):
        self.name = name
        self.start_time = None
    def __enter__(self):
        self.start_time = time.time()
        return self
    def __exit__(self, exc_type, exc_value, traceback):
        elapsed = time.time() - self.start_time
        print(f"{self.name} 执行时间: {elapsed:.4f}秒")
        return False
# 使用
with Timer("数据处理"):
    # 模拟耗时操作
    time.sleep(1)
    result = sum(range(1000000))
# 输出: 数据处理 执行时间: 1.xxxx秒
异常处理上下文管理器
class IgnoreException:
    """忽略特定异常"""
    def __init__(self, *exceptions):
        self.exceptions = exceptions
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_value, traceback):
        # 如果异常类型在指定列表中,抑制异常
        if exc_type is not None and issubclass(exc_type, self.exceptions):
            print(f"忽略异常: {exc_type.__name__}: {exc_value}")
            return True  # 抑制异常
        return False
# 使用
with IgnoreException(ValueError, KeyError):
    data = {"a": 1}
    value = data["b"]  # KeyError,但会被忽略
    print("继续执行")  # 这行不会执行
print("程序继续")  # 这行会执行
# 输出:
# 忽略异常: KeyError: 'b'
# 程序继续
# 5.3、contextlib模块
contextlib模块提供了创建上下文管理器的便捷工具。
@contextmanager装饰器
使用生成器函数创建上下文管理器:
from contextlib import contextmanager
@contextmanager
def file_manager(filename, mode):
    """使用装饰器创建上下文管理器"""
    print(f"打开文件: {filename}")
    file = open(filename, mode)
    try:
        yield file  # yield前是__enter__,yield后是__exit__
    finally:
        print(f"关闭文件: {filename}")
        file.close()
# 使用
with file_manager("test.txt", "w") as f:
    f.write("Hello!")
工作原理
@contextmanager
def my_context():
    # __enter__阶段
    print("进入")
    resource = "资源对象"
    try:
        yield resource  # 返回给as变量
        # with块正常结束后继续执行
        print("正常退出")
    except Exception as e:
        # with块抛出异常时执行
        print(f"异常退出: {e}")
        raise  # 重新抛出异常
    finally:
        # __exit__阶段,无论如何都执行
        print("清理资源")
# 使用
with my_context() as res:
    print(f"使用: {res}")
实用示例
from contextlib import contextmanager
import time
@contextmanager
def timing(label):
    """计时上下文管理器"""
    start = time.time()
    try:
        yield
    finally:
        end = time.time()
        print(f"{label}: {end - start:.4f}秒")
# 使用
with timing("数据库查询"):
    time.sleep(0.5)
    # 执行查询
临时修改环境
from contextlib import contextmanager
import os
@contextmanager
def temporary_env(**env_vars):
    """临时设置环境变量"""
    old_env = {}
    # 保存旧值并设置新值
    for key, value in env_vars.items():
        old_env[key] = os.environ.get(key)
        os.environ[key] = value
    try:
        yield
    finally:
        # 恢复旧值
        for key, old_value in old_env.items():
            if old_value is None:
                os.environ.pop(key, None)
            else:
                os.environ[key] = old_value
# 使用
with temporary_env(DEBUG="true", LOG_LEVEL="debug"):
    print(os.environ.get("DEBUG"))  # true
    # 执行需要特定环境的代码
print(os.environ.get("DEBUG"))  # None(已恢复)
suppress(抑制异常)
from contextlib import suppress
# 传统写法
try:
    os.remove("somefile.txt")
except FileNotFoundError:
    pass
# 使用suppress
with suppress(FileNotFoundError):
    os.remove("somefile.txt")
# 抑制多个异常
with suppress(FileNotFoundError, PermissionError):
    os.remove("protected_file.txt")
closing(自动关闭)
from contextlib import closing
from urllib.request import urlopen
# 确保对象的close()方法被调用
with closing(urlopen("http://example.com")) as page:
    content = page.read()
# 自动调用page.close()
redirect_stdout/redirect_stderr(重定向输出)
from contextlib import redirect_stdout, redirect_stderr
import io
# 重定向标准输出
f = io.StringIO()
with redirect_stdout(f):
    print("这些内容会被捕获")
    print("不会打印到控制台")
output = f.getvalue()
print(f"捕获的输出: {output}")
# 重定向到文件
with open("output.txt", "w") as f:
    with redirect_stdout(f):
        print("写入文件")
        help(str.upper)
ExitStack(管理多个上下文)
from contextlib import ExitStack
# 动态管理多个上下文
with ExitStack() as stack:
    # 动态添加上下文管理器
    files = [
        stack.enter_context(open(f"file{i}.txt", "w"))
        for i in range(5)
    ]
    # 使用所有文件
    for i, f in enumerate(files):
        f.write(f"内容 {i}\n")
# 所有文件自动关闭
# 条件性添加上下文
def process_files(filenames, use_backup=False):
    with ExitStack() as stack:
        files = []
        for filename in filenames:
            file = stack.enter_context(open(filename, "r"))
            files.append(file)
            if use_backup:
                backup = stack.enter_context(
                    open(f"{filename}.bak", "w")
                )
                files.append(backup)
        # 处理文件
        for f in files:
            process(f)
# 5.4、嵌套上下文管理器
多个with语句
# 方式1:嵌套with
with open("input.txt", "r") as infile:
    with open("output.txt", "w") as outfile:
        data = infile.read()
        outfile.write(data.upper())
# 方式2:单行with(Python 2.7+)
with open("input.txt", "r") as infile, open("output.txt", "w") as outfile:
    data = infile.read()
    outfile.write(data.upper())
组合使用
from contextlib import contextmanager
import threading
@contextmanager
def acquire_lock(lock):
    """获取锁"""
    print("获取锁")
    lock.acquire()
    try:
        yield
    finally:
        print("释放锁")
        lock.release()
# 使用
lock = threading.Lock()
with acquire_lock(lock):
    # 临界区代码
    print("执行临界区代码")
# 5.5、资源管理最佳实践
案例1:数据库事务管理
from contextlib import contextmanager
class Database:
    def __init__(self):
        self.connection = None
    @contextmanager
    def transaction(self):
        """事务上下文管理器"""
        print("开始事务")
        try:
            yield self
            print("提交事务")
            # self.connection.commit()
        except Exception as e:
            print(f"回滚事务: {e}")
            # self.connection.rollback()
            raise
# 使用
db = Database()
with db.transaction():
    # 执行数据库操作
    pass
案例2:临时修改配置
from contextlib import contextmanager
class Config:
    debug = False
    log_level = "INFO"
@contextmanager
def temporary_config(**changes):
    """临时修改配置"""
    original = {}
    # 保存并修改
    for key, value in changes.items():
        original[key] = getattr(Config, key)
        setattr(Config, key, value)
    try:
        yield Config
    finally:
        # 恢复
        for key, value in original.items():
            setattr(Config, key, value)
# 使用
print(Config.debug)  # False
with temporary_config(debug=True, log_level="DEBUG"):
    print(Config.debug)  # True
    print(Config.log_level)  # DEBUG
print(Config.debug)  # False(已恢复)
案例3:批量操作
from contextlib import contextmanager
@contextmanager
def batch_operation(batch_size=100):
    """批量操作上下文"""
    items = []
    def add_item(item):
        """添加项目"""
        items.append(item)
        if len(items) >= batch_size:
            process_batch(items)
            items.clear()
    try:
        yield add_item
    finally:
        # 处理剩余项目
        if items:
            process_batch(items)
def process_batch(items):
    print(f"处理批次: {len(items)} 项")
# 使用
with batch_operation(batch_size=3) as add:
    for i in range(10):
        add(f"项目{i}")
# 输出:
# 处理批次: 3 项
# 处理批次: 3 项
# 处理批次: 3 项
# 处理批次: 1 项
案例4:性能分析
from contextlib import contextmanager
import time
import functools
@contextmanager
def profile_section(name):
    """性能分析上下文"""
    start_time = time.time()
    start_memory = 0  # 简化示例
    try:
        yield
    finally:
        elapsed = time.time() - start_time
        print(f"{name}:")
        print(f"  时间: {elapsed:.4f}秒")
# 使用
with profile_section("数据处理"):
    data = [i ** 2 for i in range(1000000)]
with profile_section("数据保存"):
    time.sleep(0.1)
案例5:资源池管理
from contextlib import contextmanager
from queue import Queue
class ConnectionPool:
    """连接池"""
    def __init__(self, size=5):
        self.pool = Queue(maxsize=size)
        for i in range(size):
            self.pool.put(f"Connection_{i}")
    @contextmanager
    def get_connection(self):
        """获取连接"""
        conn = self.pool.get()
        print(f"获取连接: {conn}")
        try:
            yield conn
        finally:
            print(f"归还连接: {conn}")
            self.pool.put(conn)
# 使用
pool = ConnectionPool(size=2)
with pool.get_connection() as conn1:
    print(f"使用 {conn1}")
    with pool.get_connection() as conn2:
        print(f"使用 {conn2}")
小结对比表
| 特性 | Python上下文管理器 | Java try-with-resources | 
|---|---|---|
| 语法 | with obj as var: |  try (Type var = ...) {} | 
| 协议 | __enter__/__exit__ |  AutoCloseable.close() | 
| 自定义 | 类或@contextmanager | 实现AutoCloseable | 
| 嵌套 | 支持 | 支持 | 
| 异常抑制 | 支持(__exit__返回True) | 不支持 | 
| 多资源 | with a, b: |  try (A a; B b) | 
# 6、函数式编程
说明:列表推导式和生成器表达式已在第5章"数据结构与推导式"中详细讲解,这里不再重复。
Python虽然不是纯函数式语言,但提供了丰富的函数式编程工具。对于熟悉Java 8+ Stream API的开发者来说,Python的函数式特性会感觉很亲切。
# 6.1、map()、filter()、reduce()
这三个函数是函数式编程的基石。
// Java Stream API方式
List<Integer> squares = IntStream.range(0, 10)
    .map(x -> x * x)
    .boxed()
    .collect(Collectors.toList());
List<Integer> evenSquares = IntStream.range(0, 10)
    .filter(x -> x % 2 == 0)
    .map(x -> x * x)
    .boxed()
    .collect(Collectors.toList());
// 多重循环 - Java需要嵌套Stream或flatMap
List<String> pairs = IntStream.rangeClosed(1, 3)
    .boxed()
    .flatMap(x -> Stream.of("a", "b", "c")
        .map(y -> "(" + x + ", " + y + ")"))
    .collect(Collectors.toList());
字符串处理示例:
# 提取所有单词的首字母
sentence = "Hello World Python Programming"
initials = [word[0] for word in sentence.split()]
print(initials)  # ['H', 'W', 'P', 'P']
# 过滤并转换
words = ["apple", "banana", "cherry", "date"]
upper_long_words = [w.upper() for w in words if len(w) > 5]
print(upper_long_words)  # ['BANANA', 'CHERRY']
# 嵌套列表展平
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flat = [num for row in matrix for num in row]
print(flat)  # [1, 2, 3, 4, 5, 6, 7, 8, 9]
# 6.2、高阶函数
高阶函数是指接受函数作为参数或返回函数的函数。
函数作为参数:
def apply_operation(numbers, operation):
    """对列表中每个数字应用操作"""
    return [operation(x) for x in numbers]
# 定义不同的操作
def square(x):
    return x ** 2
def double(x):
    return x * 2
def negate(x):
    return -x
numbers = [1, 2, 3, 4, 5]
print(apply_operation(numbers, square))   # [1, 4, 9, 16, 25]
print(apply_operation(numbers, double))   # [2, 4, 6, 8, 10]
print(apply_operation(numbers, negate))   # [-1, -2, -3, -4, -5]
# 使用lambda
print(apply_operation(numbers, lambda x: x**3))  # [1, 8, 27, 64, 125]
函数作为返回值:
def make_multiplier(n):
    """返回一个乘以n的函数"""
    def multiplier(x):
        return x * n
    return multiplier
times2 = make_multiplier(2)
times10 = make_multiplier(10)
print(times2(5))   # 10
print(times10(5))  # 50
# 实际应用:创建验证器
def make_range_validator(min_val, max_val):
    """创建范围验证函数"""
    def validator(value):
        return min_val <= value <= max_val
    return validator
age_validator = make_range_validator(0, 120)
percentage_validator = make_range_validator(0, 100)
print(age_validator(25))    # True
print(age_validator(150))   # False
print(percentage_validator(85))  # True
内置高阶函数:
# sorted() - 自定义排序
students = [
    {'name': 'Alice', 'age': 25, 'score': 85},
    {'name': 'Bob', 'age': 22, 'score': 92},
    {'name': 'Charlie', 'age': 23, 'score': 78}
]
# 按年龄排序
by_age = sorted(students, key=lambda s: s['age'])
print([s['name'] for s in by_age])  # ['Bob', 'Charlie', 'Alice']
# 按分数降序
by_score = sorted(students, key=lambda s: s['score'], reverse=True)
print([s['name'] for s in by_score])  # ['Bob', 'Alice', 'Charlie']
# 多级排序:先按分数降序,再按年龄升序
multi_sort = sorted(students, key=lambda s: (-s['score'], s['age']))
# max()/min() - 自定义比较
oldest = max(students, key=lambda s: s['age'])
print(oldest['name'])  # Alice
highest_score = max(students, key=lambda s: s['score'])
print(highest_score['name'])  # Bob
# 6.3、偏函数(functools.partial)
偏函数用于固定函数的某些参数,创建新函数。
from functools import partial
# 基本示例
def power(base, exponent):
    return base ** exponent
# 创建偏函数 - 固定exponent=2
square = partial(power, exponent=2)
print(square(5))  # 25
print(square(10))  # 100
# 固定exponent=3
cube = partial(power, exponent=3)
print(cube(5))  # 125
# 实际应用1:日志记录
def log_message(message, level='INFO', timestamp=True):
    import datetime
    prefix = f"[{datetime.datetime.now()}] " if timestamp else ""
    print(f"{prefix}{level}: {message}")
# 创建专用日志函数
log_error = partial(log_message, level='ERROR')
log_warning = partial(log_message, level='WARNING')
log_debug = partial(log_message, level='DEBUG', timestamp=False)
log_error("Database connection failed")
log_warning("Low memory")
log_debug("Variable value: 42")
# 实际应用2:数据转换
def convert_value(value, multiplier=1, offset=0, round_digits=2):
    """通用数值转换函数"""
    result = value * multiplier + offset
    return round(result, round_digits)
# 摄氏度转华氏度: F = C * 9/5 + 32
celsius_to_fahrenheit = partial(convert_value, multiplier=9/5, offset=32)
print(celsius_to_fahrenheit(0))    # 32.0
print(celsius_to_fahrenheit(100))  # 212.0
# 米转英尺: feet = meter * 3.28084
meter_to_feet = partial(convert_value, multiplier=3.28084)
print(meter_to_feet(10))  # 32.81
# 实际应用3:配置HTTP请求
import functools
def make_request(url, method='GET', headers=None, timeout=30):
    """模拟HTTP请求"""
    headers = headers or {}
    print(f"{method} {url}")
    print(f"Headers: {headers}")
    print(f"Timeout: {timeout}s")
# API特定配置
api_headers = {'Authorization': 'Bearer token123', 'Content-Type': 'application/json'}
api_request = partial(make_request, headers=api_headers, timeout=60)
# 使用
api_request('https://api.example.com/users')
api_request('https://api.example.com/posts', method='POST')
与Java对比:
// Java没有直接的偏函数,需要手动包装
BiFunction<Integer, Integer, Integer> power = (base, exp) -> (int) Math.pow(base, exp);
// 创建"偏函数"
Function<Integer, Integer> square = base -> power.apply(base, 2);
Function<Integer, Integer> cube = base -> power.apply(base, 3);
# 6.4、函数组合
函数组合是将多个函数组合成一个新函数。
# 手动实现函数组合
def compose(*functions):
    """从右到左组合函数"""
    def inner(arg):
        result = arg
        for func in reversed(functions):
            result = func(result)
        return result
    return inner
# 定义基础函数
def add_one(x):
    return x + 1
def double(x):
    return x * 2
def square(x):
    return x ** 2
# 组合函数:(x+1) * 2 然后平方
combined = compose(square, double, add_one)
print(combined(3))  # ((3+1)*2)^2 = 64
# 更优雅的实现
from functools import reduce
def compose2(*functions):
    """使用reduce实现函数组合"""
    return reduce(lambda f, g: lambda x: f(g(x)), functions)
combined2 = compose2(square, double, add_one)
print(combined2(3))  # 64
# 实际应用:数据处理管道
def remove_spaces(text):
    return text.replace(' ', '')
def to_lowercase(text):
    return text.lower()
def remove_punctuation(text):
    import string
    return text.translate(str.maketrans('', '', string.punctuation))
# 组合文本清洗函数
clean_text = compose(remove_punctuation, to_lowercase, remove_spaces)
text = "Hello, World! Python is AWESOME."
print(clean_text(text))  # heloworldpythonisawesome
# 更Pythonic的方式:使用管道
class Pipeline:
    """函数管道"""
    def __init__(self, value):
        self.value = value
    def pipe(self, func):
        """应用函数并返回新的Pipeline"""
        return Pipeline(func(self.value))
    def get(self):
        """获取最终值"""
        return self.value
# 使用管道
result = (Pipeline("Hello, World!")
         .pipe(str.lower)
         .pipe(lambda s: s.replace(' ', ''))
         .pipe(lambda s: s.replace('!', ''))
         .get())
print(result)  # helloworld
实战:数据转换管道
# 处理用户数据
users = [
    {'name': '  ALICE  ', 'age': '25', 'email': 'ALICE@EXAMPLE.COM'},
    {'name': 'bob', 'age': '30', 'email': 'bob@example.com  '},
    {'name': '  Charlie  ', 'age': '22', 'email': '  charlie@EXAMPLE.com'}
]
# 定义转换函数
def clean_name(user):
    user['name'] = user['name'].strip().capitalize()
    return user
def convert_age(user):
    user['age'] = int(user['age'])
    return user
def normalize_email(user):
    user['email'] = user['email'].strip().lower()
    return user
# 组合所有转换
from functools import reduce
def process_user(user):
    transformations = [clean_name, convert_age, normalize_email]
    return reduce(lambda u, transform: transform(u), transformations, user)
# 应用到所有用户
processed = list(map(process_user, users))
for user in processed:
    print(user)
# {'name': 'Alice', 'age': 25, 'email': 'alice@example.com'}
# {'name': 'Bob', 'age': 30, 'email': 'bob@example.com'}
# {'name': 'Charlie', 'age': 22, 'email': 'charlie@example.com'}
# 6.5、纯函数与副作用
纯函数是函数式编程的核心概念。
纯函数的特征:
# 纯函数 - 相同输入总是产生相同输出,无副作用
def add(a, b):
    return a + b
def multiply(a, b):
    return a * b
# 纯函数 - 不修改输入
def append_item_pure(lst, item):
    """返回新列表,不修改原列表"""
    return lst + [item]
original = [1, 2, 3]
new_list = append_item_pure(original, 4)
print(original)  # [1, 2, 3] - 未被修改
print(new_list)  # [1, 2, 3, 4]
# 非纯函数 - 有副作用
def append_item_impure(lst, item):
    """直接修改列表"""
    lst.append(item)
    return lst
original = [1, 2, 3]
new_list = append_item_impure(original, 4)
print(original)  # [1, 2, 3, 4] - 被修改了!
print(new_list)  # [1, 2, 3, 4]
# 非纯函数 - 依赖外部状态
counter = 0
def increment_impure():
    global counter
    counter += 1
    return counter
print(increment_impure())  # 1
print(increment_impure())  # 2 - 相同调用,不同结果!
# 纯函数替代方案
def increment_pure(value):
    return value + 1
counter = 0
counter = increment_pure(counter)  # 1
counter = increment_pure(counter)  # 2
纯函数的优势:
# 1. 可测试性
def calculate_total(items, tax_rate):
    """纯函数 - 易于测试"""
    subtotal = sum(item['price'] * item['quantity'] for item in items)
    return subtotal * (1 + tax_rate)
# 测试简单
items = [
    {'price': 10, 'quantity': 2},
    {'price': 5, 'quantity': 3}
]
assert calculate_total(items, 0.1) == 38.5
# 2. 可组合性
def filter_active(users):
    return [u for u in users if u.get('active', False)]
def sort_by_name(users):
    return sorted(users, key=lambda u: u['name'])
def get_emails(users):
    return [u['email'] for u in users]
# 轻松组合
users = [
    {'name': 'Alice', 'active': True, 'email': 'alice@example.com'},
    {'name': 'Bob', 'active': False, 'email': 'bob@example.com'},
    {'name': 'Charlie', 'active': True, 'email': 'charlie@example.com'}
]
active_emails = get_emails(sort_by_name(filter_active(users)))
print(active_emails)  # ['alice@example.com', 'charlie@example.com']
# 3. 并发安全
from concurrent.futures import ThreadPoolExecutor
def square_pure(x):
    """纯函数 - 线程安全"""
    return x ** 2
numbers = range(100)
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(square_pure, numbers))
避免副作用的技巧:
# 1. 使用copy避免修改原始数据
import copy
def update_user_pure(user, **updates):
    """返回更新后的新用户对象"""
    new_user = copy.deepcopy(user)
    new_user.update(updates)
    return new_user
user = {'name': 'Alice', 'age': 25}
updated = update_user_pure(user, age=26)
print(user)     # {'name': 'Alice', 'age': 25}
print(updated)  # {'name': 'Alice', 'age': 26}
# 2. 使用不可变数据结构
from collections import namedtuple
User = namedtuple('User', ['name', 'age', 'email'])
user = User('Alice', 25, 'alice@example.com')
# 创建新对象而不是修改
updated = user._replace(age=26)
print(user)     # User(name='Alice', age=25, ...)
print(updated)  # User(name='Alice', age=26, ...)
# 3. 使用dataclasses(Python 3.7+)
from dataclasses import dataclass, replace
@dataclass(frozen=True)  # frozen=True使其不可变
class Product:
    name: str
    price: float
    quantity: int
product = Product('Book', 29.99, 10)
# product.price = 19.99  # 错误!不可修改
# 使用replace创建新对象
discounted = replace(product, price=19.99)
print(product)     # Product(name='Book', price=29.99, quantity=10)
print(discounted)  # Product(name='Book', price=19.99, quantity=10)
# 7、类型提示(Type Hints)
作为Java开发者,你习惯了强类型系统。Python从3.5版本开始引入了类型提示,虽然不强制检查,但能让代码更清晰、IDE提示更好。
# 7.1、基本类型注解
# 变量类型注解
name: str = "Alice"
age: int = 25
salary: float = 5000.50
is_active: bool = True
# 函数参数和返回值注解
def greet(name: str, age: int) -> str:
    return f"Hello {name}, you are {age} years old"
def add(a: int, b: int) -> int:
    return a + b
def get_user_info() -> dict:
    return {"name": "Alice", "age": 25}
# 无返回值
def log_message(message: str) -> None:
    print(message)
# Java对比
"""
// Java强制类型
String name = "Alice";
int age = 25;
public String greet(String name, int age) {
    return "Hello " + name;
}
"""
# 7.2、复合类型注解
from typing import List, Dict, Tuple, Set
# 列表类型
numbers: List[int] = [1, 2, 3, 4]
names: List[str] = ["Alice", "Bob"]
# 字典类型
user: Dict[str, int] = {"age": 25, "score": 90}
config: Dict[str, any] = {"host": "localhost", "port": 8080}
# 元组类型
point: Tuple[int, int] = (10, 20)
person: Tuple[str, int, bool] = ("Alice", 25, True)
# 集合类型
unique_ids: Set[int] = {1, 2, 3}
# 嵌套类型
users: List[Dict[str, any]] = [
    {"name": "Alice", "age": 25},
    {"name": "Bob", "age": 30}
]
# 实际应用
def get_user_scores() -> Dict[str, List[int]]:
    """返回用户和他们的分数列表"""
    return {
        "Alice": [85, 90, 88],
        "Bob": [92, 87, 95]
    }
def process_data(items: List[Tuple[str, int]]) -> Dict[str, int]:
    """处理数据列表"""
    return {name: score for name, score in items}
# 7.3、Optional与Union
from typing import Optional, Union
# Optional - 可能为None
def find_user(user_id: int) -> Optional[dict]:
    """返回用户或None"""
    if user_id > 0:
        return {"id": user_id, "name": "Alice"}
    return None
# Optional[X] 等价于 Union[X, None]
def get_config(key: str) -> Optional[str]:
    return None
# Union - 多种可能类型
def process_value(value: Union[int, str, float]) -> str:
    """处理int、str或float类型"""
    return str(value)
# 实际应用
def parse_input(data: Union[str, bytes]) -> str:
    """处理字符串或字节"""
    if isinstance(data, bytes):
        return data.decode('utf-8')
    return data
# Java对比
"""
// Java使用泛型和null
Optional<User> findUser(int id) {
    if (id > 0) {
        return Optional.of(new User(id));
    }
    return Optional.empty();
}
"""
# 7.4、泛型类型
from typing import TypeVar, Generic, List
# 定义类型变量
T = TypeVar('T')
# 泛型函数
def first_element(items: List[T]) -> Optional[T]:
    """返回列表第一个元素"""
    return items[0] if items else None
# 泛型类
class Stack(Generic[T]):
    """通用栈实现"""
    def __init__(self) -> None:
        self._items: List[T] = []
    def push(self, item: T) -> None:
        self._items.append(item)
    def pop(self) -> Optional[T]:
        return self._items.pop() if self._items else None
# 使用
int_stack: Stack[int] = Stack()
int_stack.push(1)
int_stack.push(2)
str_stack: Stack[str] = Stack()
str_stack.push("hello")
# 7.5、typing模块常用类型
from typing import Callable, Any, Sequence, Mapping, Iterable
# Callable - 可调用对象
def apply_func(func: Callable[[int, int], int], a: int, b: int) -> int:
    return func(a, b)
result = apply_func(lambda x, y: x + y, 3, 5)  # 8
# Any - 任意类型
def process_data(data: Any) -> str:
    return str(data)
# Sequence - 序列类型
def sum_values(numbers: Sequence[int]) -> int:
    return sum(numbers)
sum_values([1, 2, 3])  # List
sum_values((1, 2, 3))  # Tuple
# Mapping - 映射类型
def print_config(config: Mapping[str, any]) -> None:
    for key, value in config.items():
        print(f"{key}: {value}")
# Iterable - 可迭代对象
def process_items(items: Iterable[str]) -> List[str]:
    return [item.upper() for item in items]
# 7.6、类型别名
from typing import List, Dict, Tuple
# 定义类型别名
UserId = int
UserName = str
Score = float
# 复杂类型别名
User = Dict[str, any]
UserList = List[User]
Coordinate = Tuple[float, float]
ScoreBoard = Dict[UserName, List[Score]]
# 使用类型别名
def get_user(user_id: UserId) -> User:
    return {"id": user_id, "name": "Alice"}
def calculate_average(scores: ScoreBoard) -> Dict[UserName, Score]:
    return {
        name: sum(score_list) / len(score_list)
        for name, score_list in scores.items()
    }
# 7.7、mypy静态类型检查
# 安装mypy: pip install mypy
# 示例代码 example.py
def add_numbers(a: int, b: int) -> int:
    return a + b
result: str = add_numbers(1, 2)  # 类型错误!
# 运行检查
# mypy example.py
# error: Incompatible types in assignment (expression has type "int", variable has type "str")
# 配置mypy: mypy.ini
"""
[mypy]
python_version = 3.9
warn_return_any = True
warn_unused_configs = True
disallow_untyped_defs = True
"""
# 实际项目示例
from typing import List, Optional
class UserService:
    """用户服务类"""
    def __init__(self, db_connection: any) -> None:
        self.db = db_connection
    def find_user(self, user_id: int) -> Optional[Dict[str, any]]:
        """查找用户"""
        # 实现...
        return {"id": user_id, "name": "Alice"}
    def get_all_users(self) -> List[Dict[str, any]]:
        """获取所有用户"""
        return [{"id": 1, "name": "Alice"}]
对比总结:
| 特性 | Python类型提示 | Java类型系统 | 
|---|---|---|
| 强制性 | 可选,运行时不检查 | 强制,编译时检查 | 
| 基本语法 | name: str |  String name | 
| 泛型 | List[int] |  List<Integer> | 
| 可选类型 | Optional[int] |  Optional<Integer> | 
| 联合类型 | Union[int, str] |  不直接支持 | 
| 类型别名 | UserId = int |  typedef(C++)或接口 | 
| 检查工具 | mypy, pyright | javac内置 | 
| IDE支持 | VSCode, PyCharm | IntelliJ IDEA | 
# 8、反射与内省
Python的反射能力比Java更强大。作为Java开发者,你会发现Python的反射API更简单直接。
# 8.1、inspect模块
import inspect
# 定义示例类
class User:
    """用户类"""
    def __init__(self, name: str, age: int):
        self.name = name
        self.age = age
    def greet(self):
        return f"Hello, I'm {self.name}"
    @staticmethod
    def create_guest():
        return User("Guest", 0)
# 检查对象类型
print(inspect.isclass(User))          # True
print(inspect.isfunction(User.greet)) # True
print(inspect.ismethod(User().greet)) # True
# 获取源代码
print(inspect.getsource(User))        # 打印User类的源代码
print(inspect.getfile(User))          # 获取文件路径
# 获取函数签名
def example_func(name: str, age: int = 18) -> str:
    return f"{name}: {age}"
sig = inspect.signature(example_func)
print(sig)  # (name: str, age: int = 18) -> str
for param_name, param in sig.parameters.items():
    print(f"{param_name}: {param.annotation}, default={param.default}")
# 获取类成员
members = inspect.getmembers(User)
for name, value in members:
    print(f"{name}: {type(value)}")
# 8.2、动态属性操作
class Config:
    host = "localhost"
    port = 8080
# getattr - 获取属性
print(getattr(Config, 'host'))  # localhost
print(getattr(Config, 'timeout', 30))  # 30 (默认值)
# setattr - 设置属性
setattr(Config, 'host', '192.168.1.1')
print(Config.host)  # 192.168.1.1
# hasattr - 检查属性是否存在
print(hasattr(Config, 'host'))    # True
print(hasattr(Config, 'timeout')) # False
# delattr - 删除属性
delattr(Config, 'port')
print(hasattr(Config, 'port'))  # False
# 实际应用:动态配置加载
config_data = {
    'database_host': 'localhost',
    'database_port': 3306,
    'cache_enabled': True
}
class AppConfig:
    pass
for key, value in config_data.items():
    setattr(AppConfig, key, value)
print(AppConfig.database_host)  # localhost
# 8.3、动态导入模块
import importlib
# 动态导入模块
math_module = importlib.import_module('math')
print(math_module.sqrt(16))  # 4.0
# 动态导入并获取属性
module = importlib.import_module('json')
dumps_func = getattr(module, 'dumps')
print(dumps_func({'name': 'Alice'}))  # {"name": "Alice"}
# __import__() 函数
os_module = __import__('os')
print(os_module.getcwd())
# 实际应用:插件系统
def load_plugin(plugin_name: str):
    """动态加载插件"""
    try:
        module = importlib.import_module(f'plugins.{plugin_name}')
        if hasattr(module, 'Plugin'):
            return module.Plugin()
        else:
            raise AttributeError(f"Plugin class not found in {plugin_name}")
    except ImportError as e:
        print(f"Failed to load plugin: {e}")
        return None
# 8.4、类型检查
# type() vs isinstance()
value = [1, 2, 3]
print(type(value))  # <class 'list'>
print(type(value) == list)  # True
# isinstance() - 推荐使用,支持继承
print(isinstance(value, list))  # True
print(isinstance(value, (list, tuple)))  # True - 检查多个类型
# issubclass() - 判断继承关系
class Animal:
    pass
class Dog(Animal):
    pass
print(issubclass(Dog, Animal))  # True
print(issubclass(Dog, object))  # True - 所有类都继承自object
# 鸭子类型应用
def process_iterable(obj):
    """处理任何可迭代对象"""
    if hasattr(obj, '__iter__'):
        for item in obj:
            print(item)
    else:
        print("Not iterable")
process_iterable([1, 2, 3])      # 处理列表
process_iterable("hello")        # 处理字符串
process_iterable(range(5))       # 处理range对象
# 8.5、动态创建类
# 使用type()创建类
# type(name, bases, dict)
User = type('User', (object,), {
    'name': 'Alice',
    'greet': lambda self: f"Hello, {self.name}"
})
user = User()
print(user.name)     # Alice
print(user.greet())  # Hello, Alice
# 更复杂的示例
def __init__(self, name, age):
    self.name = name
    self.age = age
def get_info(self):
    return f"{self.name}, {self.age} years old"
Person = type('Person', (object,), {
    '__init__': __init__,
    'get_info': get_info
})
p = Person("Bob", 25)
print(p.get_info())  # Bob, 25 years old
# 实际应用:ORM模型动态生成
def create_model(table_name: str, fields: dict):
    """动态创建数据库模型"""
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)
    def __repr__(self):
        field_strs = [f"{k}={getattr(self, k)}" for k in fields.keys()]
        return f"{table_name}({', '.join(field_strs)})"
    attrs = {
        '__init__': __init__,
        '__repr__': __repr__,
        '_table_name': table_name,
        '_fields': fields
    }
    return type(table_name, (object,), attrs)
# 创建User模型
UserModel = create_model('users', {
    'id': 'INTEGER',
    'name': 'VARCHAR(100)',
    'email': 'VARCHAR(100)'
})
user = UserModel(id=1, name='Alice', email='alice@example.com')
print(user)  # users(id=1, name=Alice, email=alice@example.com)
# 8.6、元类(Metaclass)基础
# 元类是创建类的类
class Meta(type):
    """自定义元类"""
    def __new__(mcs, name, bases, attrs):
        # 在类创建时执行
        print(f"Creating class: {name}")
        # 添加类属性
        attrs['created_by'] = 'Meta'
        return super().__new__(mcs, name, bases, attrs)
# 使用元类
class MyClass(metaclass=Meta):
    pass
# 输出: Creating class: MyClass
print(MyClass.created_by)  # Meta
# 实际应用:自动注册类
_registry = {}
class RegisterMeta(type):
    """自动注册元类"""
    def __new__(mcs, name, bases, attrs):
        cls = super().__new__(mcs, name, bases, attrs)
        if name != 'Base':  # 跳过基类
            _registry[name] = cls
        return cls
class Base(metaclass=RegisterMeta):
    pass
class PluginA(Base):
    pass
class PluginB(Base):
    pass
print(_registry)  # {'PluginA': <class '__main__.PluginA'>, 'PluginB': <class '__main__.PluginB'>}
# 8.7、实战案例
案例1:简易依赖注入
class Container:
    """依赖注入容器"""
    def __init__(self):
        self._services = {}
    def register(self, interface, implementation):
        """注册服务"""
        self._services[interface] = implementation
    def resolve(self, interface):
        """解析服务"""
        implementation = self._services.get(interface)
        if implementation is None:
            raise ValueError(f"Service {interface} not registered")
        # 检查是否需要依赖注入
        sig = inspect.signature(implementation.__init__)
        dependencies = {}
        for param_name, param in sig.parameters.items():
            if param_name == 'self':
                continue
            if param.annotation != inspect.Parameter.empty:
                dependencies[param_name] = self.resolve(param.annotation)
        return implementation(**dependencies)
# 使用示例
class Database:
    def query(self, sql):
        return f"Executing: {sql}"
class UserRepository:
    def __init__(self, db: Database):
        self.db = db
    def find_all(self):
        return self.db.query("SELECT * FROM users")
# 注册服务
container = Container()
container.register(Database, Database)
container.register(UserRepository, UserRepository)
# 解析服务(自动注入依赖)
repo = container.resolve(UserRepository)
print(repo.find_all())  # Executing: SELECT * FROM users
案例2:序列化/反序列化
import json
from typing import get_type_hints
class Serializable:
    """可序列化基类"""
    def to_dict(self):
        """转换为字典"""
        result = {}
        for key, value in self.__dict__.items():
            if isinstance(value, Serializable):
                result[key] = value.to_dict()
            else:
                result[key] = value
        return result
    @classmethod
    def from_dict(cls, data: dict):
        """从字典创建对象"""
        hints = get_type_hints(cls.__init__)
        kwargs = {}
        for key, value in data.items():
            if key in hints:
                hint = hints[key]
                # 如果是Serializable子类,递归创建
                if isinstance(hint, type) and issubclass(hint, Serializable):
                    kwargs[key] = hint.from_dict(value)
                else:
                    kwargs[key] = value
            else:
                kwargs[key] = value
        return cls(**kwargs)
class Address(Serializable):
    def __init__(self, city: str, street: str):
        self.city = city
        self.street = street
class User(Serializable):
    def __init__(self, name: str, age: int, address: Address):
        self.name = name
        self.age = age
        self.address = address
# 使用
user = User("Alice", 25, Address("Beijing", "Main St"))
user_dict = user.to_dict()
print(json.dumps(user_dict, indent=2))
# 反序列化
restored = User.from_dict(user_dict)
print(f"{restored.name} lives in {restored.address.city}")
对比总结:
| 特性 | Python | Java | 
|---|---|---|
| 反射API | inspect, getattr, setattr | java.lang.reflect | 
| 获取类信息 | inspect.getmembers() |  Class.getDeclaredMethods() | 
| 动态调用 | getattr(obj, 'method')() |  method.invoke(obj) | 
| 动态创建类 | type(name, bases, dict) |  Proxy.newProxyInstance() | 
| 类型检查 | isinstance(), issubclass() |  instanceof, isAssignableFrom() | 
| 元类 | class Meta(type) |  不支持(使用注解处理器) | 
| 灵活性 | 极高 | 中等 | 
| 性能 | 较慢 | 较快 | 
# 二、Python标准库
Python的标准库非常丰富,开箱即用。作为Java开发者,你会发现很多功能在Python标准库中已经实现,无需引入第三方依赖。
# 1、常用内置模块
# 1.1、os与sys模块
os模块 - 操作系统交互
import os
# 获取当前工作目录
print(os.getcwd())  # D:\workspace\project
# 改变工作目录
os.chdir('/tmp')
# 列出目录内容
files = os.listdir('.')
print(files)
# 创建目录
os.mkdir('new_folder')
os.makedirs('path/to/folder', exist_ok=True)  # 递归创建
# 删除文件和目录
os.remove('file.txt')  # 删除文件
os.rmdir('folder')     # 删除空目录
import shutil
shutil.rmtree('folder')  # 删除非空目录
# 文件和目录判断
print(os.path.exists('file.txt'))  # 是否存在
print(os.path.isfile('file.txt'))  # 是否是文件
print(os.path.isdir('folder'))     # 是否是目录
# 路径操作
full_path = os.path.join('path', 'to', 'file.txt')  # path/to/file.txt
dirname = os.path.dirname('/path/to/file.txt')      # /path/to
basename = os.path.basename('/path/to/file.txt')    # file.txt
name, ext = os.path.splitext('file.txt')            # ('file', '.txt')
# 环境变量
print(os.environ.get('PATH'))
os.environ['MY_VAR'] = 'value'
# 执行系统命令
os.system('ls -l')  # 不推荐,使用subprocess模块
sys模块 - Python解释器交互
import sys
# 命令行参数
print(sys.argv)  # ['script.py', 'arg1', 'arg2']
# Python版本信息
print(sys.version)
print(sys.version_info)  # sys.version_info(major=3, minor=9, ...)
# 模块搜索路径
print(sys.path)
sys.path.append('/custom/path')
# 退出程序
sys.exit(0)  # 正常退出
sys.exit(1)  # 异常退出
# 标准输入输出
sys.stdout.write("Hello\n")
line = sys.stdin.readline()
# 获取对象大小
numbers = [1, 2, 3, 4, 5]
print(sys.getsizeof(numbers))  # 字节数
Java对比:
// Java获取当前目录
String currentDir = System.getProperty("user.dir");
// Java环境变量
String path = System.getenv("PATH");
// Java命令行参数
public static void main(String[] args) {
    // args数组
}
# 1.2、datetime模块
from datetime import datetime, date, time, timedelta
# 获取当前时间
now = datetime.now()
print(now)  # 2025-01-26 15:30:45.123456
today = date.today()
print(today)  # 2025-01-26
# 创建日期时间
dt = datetime(2025, 1, 26, 15, 30, 45)
d = date(2025, 1, 26)
t = time(15, 30, 45)
# 格式化输出
print(now.strftime('%Y-%m-%d %H:%M:%S'))  # 2025-01-26 15:30:45
print(now.strftime('%Y年%m月%d日'))        # 2025年01月26日
# 解析字符串
dt = datetime.strptime('2025-01-26 15:30:45', '%Y-%m-%d %H:%M:%S')
# 日期运算
tomorrow = today + timedelta(days=1)
next_week = today + timedelta(weeks=1)
one_hour_later = now + timedelta(hours=1)
# 时间差
diff = datetime(2025, 12, 31) - now
print(diff.days)  # 剩余天数
print(diff.total_seconds())  # 总秒数
# 时间戳
timestamp = now.timestamp()  # 转为时间戳
dt = datetime.fromtimestamp(timestamp)  # 从时间戳创建
# Java对比
"""
// Java 8+
LocalDateTime now = LocalDateTime.now();
LocalDate today = LocalDate.now();
// 格式化
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String formatted = now.format(formatter);
// 日期运算
LocalDate tomorrow = today.plusDays(1);
"""
# 1.3、json模块
import json
# Python对象转JSON字符串
data = {
    'name': 'Alice',
    'age': 25,
    'skills': ['Python', 'Java'],
    'active': True
}
json_str = json.dumps(data)
print(json_str)  # {"name": "Alice", "age": 25, ...}
# 美化输出
json_str = json.dumps(data, indent=2, ensure_ascii=False)
print(json_str)
# JSON字符串转Python对象
data = json.loads(json_str)
print(data['name'])  # Alice
# 读写JSON文件
# 写入
with open('data.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, indent=2, ensure_ascii=False)
# 读取
with open('data.json', 'r', encoding='utf-8') as f:
    data = json.load(f)
# 自定义JSON编码
from datetime import datetime
class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return super().default(obj)
data = {'created': datetime.now()}
json_str = json.dumps(data, cls=DateTimeEncoder)
# Java对比
"""
// Java使用Jackson或Gson
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(data);
Data data = mapper.readValue(json, Data.class);
"""
# 1.4、re模块(正则表达式)
import re
# 基本匹配
text = "My email is alice@example.com"
match = re.search(r'\w+@\w+\.\w+', text)
if match:
    print(match.group())  # alice@example.com
# 查找所有匹配
text = "Phone: 123-456-7890, Mobile: 098-765-4321"
phones = re.findall(r'\d{3}-\d{3}-\d{4}', text)
print(phones)  # ['123-456-7890', '098-765-4321']
# 替换
text = "Hello World"
result = re.sub(r'World', 'Python', text)
print(result)  # Hello Python
# 分割
text = "apple,banana;orange:grape"
fruits = re.split(r'[,;:]', text)
print(fruits)  # ['apple', 'banana', 'orange', 'grape']
# 编译正则(提高性能)
pattern = re.compile(r'\d+')
numbers = pattern.findall("I have 3 apples and 5 oranges")
print(numbers)  # ['3', '5']
# 捕获组
text = "Name: Alice, Age: 25"
match = re.search(r'Name: (\w+), Age: (\d+)', text)
if match:
    print(match.group(1))  # Alice
    print(match.group(2))  # 25
    print(match.groups())  # ('Alice', '25')
# 常用正则表达式
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
url_pattern = r'^https?://(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b'
phone_pattern = r'^\d{3}-\d{3}-\d{4}$'
# 1.5、math与random模块
import math
import random
# math模块
print(math.pi)      # 3.141592653589793
print(math.e)       # 2.718281828459045
print(math.sqrt(16))    # 4.0
print(math.pow(2, 3))   # 8.0
print(math.ceil(4.3))   # 5
print(math.floor(4.7))  # 4
print(math.fabs(-5))    # 5.0
# 三角函数
print(math.sin(math.pi / 2))  # 1.0
print(math.cos(0))             # 1.0
# random模块
print(random.random())  # 0.0到1.0的随机浮点数
# 随机整数
print(random.randint(1, 10))  # 1到10的随机整数(包含10)
print(random.randrange(1, 10))  # 1到9的随机整数(不含10)
# 随机选择
colors = ['red', 'green', 'blue']
print(random.choice(colors))  # 随机选择一个
# 随机多个(有放回)
print(random.choices(colors, k=3))  # ['red', 'blue', 'red']
# 随机多个(无放回)
print(random.sample(colors, k=2))  # ['green', 'red']
# 打乱列表
numbers = [1, 2, 3, 4, 5]
random.shuffle(numbers)
print(numbers)  # [3, 1, 5, 2, 4]
# 设置随机种子(可复现)
random.seed(42)
print(random.random())  # 相同种子产生相同序列
# 1.6、collections模块
from collections import Counter, defaultdict, deque, namedtuple, OrderedDict
# Counter - 计数器
words = ['apple', 'banana', 'apple', 'orange', 'banana', 'apple']
counter = Counter(words)
print(counter)  # Counter({'apple': 3, 'banana': 2, 'orange': 1})
print(counter.most_common(2))  # [('apple', 3), ('banana', 2)]
# defaultdict - 默认值字典
dd = defaultdict(list)
dd['fruits'].append('apple')  # 不需要检查key是否存在
print(dd)  # defaultdict(<class 'list'>, {'fruits': ['apple']})
# 按类别分组
data = [('fruit', 'apple'), ('veg', 'carrot'), ('fruit', 'banana')]
grouped = defaultdict(list)
for category, item in data:
    grouped[category].append(item)
print(dict(grouped))  # {'fruit': ['apple', 'banana'], 'veg': ['carrot']}
# deque - 双端队列
dq = deque([1, 2, 3])
dq.append(4)      # 右端添加
dq.appendleft(0)  # 左端添加
dq.pop()          # 右端移除
dq.popleft()      # 左端移除
print(dq)  # deque([1, 2, 3])
# 限制长度的deque(用作循环缓冲区)
buffer = deque(maxlen=3)
for i in range(5):
    buffer.append(i)
print(buffer)  # deque([2, 3, 4], maxlen=3)
# namedtuple - 命名元组
Point = namedtuple('Point', ['x', 'y'])
p = Point(10, 20)
print(p.x, p.y)  # 10 20
print(p[0], p[1])  # 也可以用索引
# OrderedDict - 有序字典(Python 3.7+普通dict也保序)
od = OrderedDict()
od['a'] = 1
od['b'] = 2
od['c'] = 3
print(list(od.keys()))  # ['a', 'b', 'c']
# Java对比
"""
// Java Counter类似
Map<String, Integer> counter = new HashMap<>();
// 需要手动计数
// Java deque
Deque<Integer> deque = new ArrayDeque<>();
deque.addFirst(1);
deque.addLast(2);
"""
# 1.7、pathlib模块
from pathlib import Path
# 创建路径对象
p = Path('/usr/local/bin')
p = Path.home()  # 用户主目录
p = Path.cwd()   # 当前工作目录
# 路径拼接
config_path = Path.home() / '.config' / 'app' / 'settings.json'
print(config_path)  # /home/user/.config/app/settings.json
# 路径属性
p = Path('/path/to/file.txt')
print(p.name)       # file.txt
print(p.stem)       # file
print(p.suffix)     # .txt
print(p.parent)     # /path/to
print(p.parts)      # ('/', 'path', 'to', 'file.txt')
# 文件操作
p = Path('test.txt')
p.write_text('Hello World', encoding='utf-8')  # 写入
content = p.read_text(encoding='utf-8')        # 读取
p.unlink()  # 删除文件
# 目录操作
dir_path = Path('new_folder')
dir_path.mkdir(exist_ok=True)  # 创建目录
dir_path.mkdir(parents=True, exist_ok=True)  # 递归创建
# 判断
p = Path('file.txt')
print(p.exists())   # 是否存在
print(p.is_file())  # 是否是文件
print(p.is_dir())   # 是否是目录
# 遍历目录
for file in Path('.').iterdir():
    print(file)
# 递归查找
for py_file in Path('.').rglob('*.py'):
    print(py_file)
# 对比os.path
"""
# 旧方式
import os
path = os.path.join(os.path.expanduser('~'), '.config', 'app')
# 新方式(pathlib)
path = Path.home() / '.config' / 'app'
"""
模块对比总结:
| 功能 | Python模块 | Java对应 | 
|---|---|---|
| 操作系统交互 | os |  System, Runtime | 
| 文件路径 | pathlib, os.path |  java.nio.file.Path | 
| 日期时间 | datetime |  java.time.* | 
| JSON处理 | json |  Jackson, Gson | 
| 正则表达式 | re |  java.util.regex.Pattern | 
| 随机数 | random |  java.util.Random | 
| 高级集合 | collections |  java.util.* | 
# 2、数据处理
# 2.1、csv模块
import csv
# 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as f:
    reader = csv.reader(f)
    headers = next(reader)  # 读取表头
    for row in reader:
        print(row)  # ['Alice', '25', 'Beijing']
# 使用DictReader(推荐)
with open('data.csv', 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row['name'], row['age'])  # 按列名访问
# 写入CSV文件
data = [
    ['name', 'age', 'city'],
    ['Alice', 25, 'Beijing'],
    ['Bob', 30, 'Shanghai']
]
with open('output.csv', 'w', encoding='utf-8', newline='') as f:
    writer = csv.writer(f)
    writer.writerows(data)
# 使用DictWriter
with open('output.csv', 'w', encoding='utf-8', newline='') as f:
    fieldnames = ['name', 'age', 'city']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerow({'name': 'Alice', 'age': 25, 'city': 'Beijing'})
# 2.2、XML处理
import xml.etree.ElementTree as ET
# 解析XML
xml_str = '''
<users>
    <user id="1">
        <name>Alice</name>
        <age>25</age>
    </user>
    <user id="2">
        <name>Bob</name>
        <age>30</age>
    </user>
</users>
'''
root = ET.fromstring(xml_str)
# 遍历元素
for user in root.findall('user'):
    user_id = user.get('id')
    name = user.find('name').text
    age = user.find('age').text
    print(f"ID: {user_id}, Name: {name}, Age: {age}")
# 创建XML
root = ET.Element('users')
user = ET.SubElement(root, 'user', id='1')
ET.SubElement(user, 'name').text = 'Alice'
ET.SubElement(user, 'age').text = '25'
# 保存XML
tree = ET.ElementTree(root)
tree.write('users.xml', encoding='utf-8', xml_declaration=True)
# 2.3、pickle模块(序列化)
import pickle
# Python对象序列化
data = {
    'name': 'Alice',
    'scores': [85, 90, 88],
    'metadata': {'created': '2025-01-26'}
}
# 保存到文件
with open('data.pkl', 'wb') as f:
    pickle.dump(data, f)
# 从文件加载
with open('data.pkl', 'rb') as f:
    loaded_data = pickle.load(f)
    print(loaded_data)
# 序列化为字节串
bytes_data = pickle.dumps(data)
restored = pickle.loads(bytes_data)
# 2.4、struct模块(二进制数据)
import struct
# 打包二进制数据
# 格式: i=int, f=float, s=string
packed = struct.pack('i f 10s', 42, 3.14, b'Hello')
print(packed)  # b'*\x00\x00\x00\xc3\xf5H@Hello\x00\x00\x00\x00\x00'
# 解包二进制数据
unpacked = struct.unpack('i f 10s', packed)
print(unpacked)  # (42, 3.140000104904175, b'Hello\x00\x00\x00\x00\x00')
# 实际应用:读取二进制文件
with open('data.bin', 'rb') as f:
    data = f.read(struct.calcsize('i f'))
    values = struct.unpack('i f', data)
# 3、网络编程
# 3.1、socket模块
import socket
# TCP服务器
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 8080))
server.listen(5)
print("Server listening on port 8080...")
while True:
    client, addr = server.accept()
    print(f"Connection from {addr}")
    data = client.recv(1024)
    client.send(b"Hello from server")
    client.close()
# TCP客户端
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 8080))
client.send(b"Hello server")
response = client.recv(1024)
print(response.decode())
client.close()
# 3.2、urllib模块
from urllib import request, parse
# GET请求
response = request.urlopen('https://api.github.com')
html = response.read().decode('utf-8')
print(html)
# POST请求
data = parse.urlencode({'key': 'value'}).encode()
req = request.Request('https://httpbin.org/post', data=data)
response = request.urlopen(req)
print(response.read().decode())
# 设置请求头
req = request.Request('https://api.github.com')
req.add_header('User-Agent', 'Python App')
response = request.urlopen(req)
# 3.3、http.server
# 命令行启动简单HTTP服务器
# python -m http.server 8000
# 自定义HTTP服务器
from http.server import HTTPServer, BaseHTTPRequestHandler
class MyHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        self.wfile.write(b'<h1>Hello World</h1>')
server = HTTPServer(('localhost', 8000), MyHandler)
server.serve_forever()
# 4、多线程与多进程
# 4.1、threading模块
import threading
import time
# 创建线程
def worker(name):
    print(f"Thread {name} starting")
    time.sleep(2)
    print(f"Thread {name} done")
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
# 等待所有线程完成
for t in threads:
    t.join()
# 线程类
class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    def run(self):
        print(f"{self.name} is running")
# 线程锁
lock = threading.Lock()
def safe_increment():
    global counter
    with lock:
        counter += 1
# 4.2、multiprocessing模块
from multiprocessing import Process, Pool, Queue
# 创建进程
def worker(name):
    print(f"Process {name} starting")
if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = Process(target=worker, args=(i,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
# 进程池
def square(x):
    return x ** 2
if __name__ == '__main__':
    with Pool(processes=4) as pool:
        results = pool.map(square, range(10))
        print(results)
# 4.3、concurrent.futures模块
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def task(n):
    time.sleep(1)
    return n ** 2
# 线程池
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(task, i) for i in range(10)]
    for future in futures:
        print(future.result())
# 进程池
with ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(task, range(10))
    print(list(results))
# 4.4、GIL详解
GIL(全局解释器锁)是CPython的实现细节:
影响:
- 同一时刻只有一个线程执行Python字节码
 - 多线程无法利用多核CPU进行CPU密集型任务
 - I/O密集型任务不受影响
 
解决方案:
- CPU密集型: 使用multiprocessing
 - I/O密集型: 使用threading或asyncio
 - 混合型: 根据具体情况选择
 
Java对比:
- Java没有GIL,多线程可以真正并行
 - Python的多线程更适合I/O操作
 - Python的多进程类似Java的多线程
 
# 5、日志与调试
# 5.1、logging模块
import logging
# 基本配置
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    filename='app.log'
)
# 使用日志
logging.debug("Debug message")
logging.info("Info message")
logging.warning("Warning message")
logging.error("Error message")
logging.critical("Critical message")
# 创建logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# 添加处理器
file_handler = logging.FileHandler('app.log')
console_handler = logging.StreamHandler()
# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 使用logger
logger.info("Application started")
# 5.2、pdb调试器
import pdb
def buggy_function(x, y):
    result = x + y
    pdb.set_trace()  # 设置断点
    result = result * 2
    return result
# 调试命令:
# n - 下一行
# s - 进入函数
# c - 继续执行
# p variable - 打印变量
# l - 显示当前代码
# q - 退出调试
# 5.3、traceback模块
import traceback
try:
    result = 1 / 0
except Exception as e:
    # 打印完整堆栈
    traceback.print_exc()
    # 获取堆栈信息
    tb_str = traceback.format_exc()
    print(tb_str)
    # 记录到日志
    logging.error("Error occurred", exc_info=True)
# 三、异步编程
异步编程是Python的重要特性,特别适合处理I/O密集型任务。对于Java开发者来说,Python的异步编程模型比Java的CompletableFuture更加直观和强大。
# 1、asyncio基础
# 1.1、async/await语法
Python 3.5引入了async/await语法,让异步代码看起来像同步代码一样直观。
import asyncio
async def greet(name):
    print(f"开始问候 {name}")
    await asyncio.sleep(1)  # 异步等待1秒
    return f"Hello, {name}!"
async def main():
    result = await greet("张三")
    print(result)
# 运行异步程序
asyncio.run(main())
对比Java的异步编程:
// Java使用CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, 张三!";
});
future.thenAccept(System.out::println);
核心概念:
async def:定义协程函数await:暂停当前协程,等待另一个协程完成asyncio.run():启动事件循环并运行主协程
# 1.2、协程概念
协程(Coroutine)是可以暂停和恢复执行的函数,比线程更轻量级。
import asyncio
import time
async def task(name, duration):
    print(f"任务 {name} 开始")
    await asyncio.sleep(duration)
    print(f"任务 {name} 完成")
    return f"{name} 结果"
async def main():
    start_time = time.time()
    # 顺序执行(总时间 = 3秒)
    result1 = await task("A", 1)
    result2 = await task("B", 2)
    end_time = time.time()
    print(f"顺序执行耗时: {end_time - start_time:.2f}秒")
asyncio.run(main())
协程 vs 线程对比:
| 特性 | 协程 | 线程 | 
|---|---|---|
| 创建成本 | 极低(几KB) | 较高(几MB) | 
| 切换成本 | 用户态切换 | 内核态切换 | 
| 数量限制 | 可以创建数万个 | 通常数百个 | 
| 适用场景 | I/O密集型 | CPU密集型 | 
# 1.3、事件循环
事件循环是异步编程的核心,负责调度和执行协程。
import asyncio
async def worker(name):
    print(f"{name} 开始工作")
    await asyncio.sleep(1)
    print(f"{name} 完成工作")
async def main():
    # 创建任务
    task1 = asyncio.create_task(worker("任务1"))
    task2 = asyncio.create_task(worker("任务2"))
    # 等待所有任务完成
    await task1
    await task2
    print("所有任务完成")
# 使用asyncio.run()
asyncio.run(main())
# 手动控制事件循环(高级用法)
loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()
asyncio.run() vs await的区别:
asyncio.run():启动新的事件循环,通常用于程序入口await:在现有事件循环中等待协程完成
import asyncio
async def nested():
    return "嵌套协程结果"
async def outer():
    # ✅ 正确:使用await
    result = await nested()
    print(result)
    # ❌ 错误:不能在协程中使用asyncio.run()
    # asyncio.run(nested())  # RuntimeError: no running event loop
asyncio.run(outer())
# 1.4、Task与Future
Task是Future的子类,用于包装协程并调度执行。
import asyncio
async def background_task(name, delay):
    print(f"后台任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"后台任务 {name} 完成")
    return f"{name} 的结果"
async def main():
    # 创建多个任务(立即开始执行)
    tasks = [
        asyncio.create_task(background_task("A", 2)),
        asyncio.create_task(background_task("B", 1)),
        asyncio.create_task(background_task("C", 3))
    ]
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    print("所有任务结果:", results)
    # 带超时的等待
    try:
        await asyncio.wait_for(asyncio.create_task(background_task("D", 5)), timeout=3)
    except asyncio.TimeoutError:
        print("任务D超时")
asyncio.run(main())
Task的高级用法:
import asyncio
async def cancellable_task():
    try:
        for i in range(10):
            print(f"工作中... {i}")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("任务被取消")
        # 清理资源
        await asyncio.sleep(0.5)
        print("清理完成")
        raise
async def main():
    task = asyncio.create_task(cancellable_task())
    # 3秒后取消任务
    await asyncio.sleep(3)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("主协程捕获到取消异常")
asyncio.run(main())
# 2、异步I/O
# 2.1、异步文件操作
Python 3.4+提供了异步文件操作API。
import asyncio
import aiofiles  # 第三方库,功能更完整
async def read_file_async(filename):
    # 使用aiofiles(推荐)
    async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
        content = await f.read()
        print(f"文件内容长度: {len(content)}")
        return content
async def write_file_async(filename, content):
    async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
        await f.write(content)
        print(f"写入文件: {filename}")
async def process_files():
    # 并发处理多个文件
    files = ['file1.txt', 'file2.txt', 'file3.txt']
    tasks = []
    for filename in files:
        content = f"这是 {filename} 的内容\n"
        tasks.append(write_file_async(filename, content))
    await asyncio.gather(*tasks)
    # 并发读取文件
    read_tasks = [read_file_async(f) for f in files]
    contents = await asyncio.gather(*read_tasks)
    print(f"读取了 {len(contents)} 个文件")
# 安装aiofiles: pip install aiofiles
# asyncio.run(process_files())
对比Java的异步文件操作:
// Java NIO.2异步文件操作
AsynchronousFileChannel channel = AsynchronousFileChannel.open(
    Paths.get("file.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> readResult = channel.read(buffer, 0);
// 处理结果
while (!readResult.isDone()) {
    // 可以做其他工作
}
# 2.2、异步网络请求
使用aiohttp库进行异步HTTP请求。
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
    try:
        async with session.get(url) as response:
            print(f"获取 {url} - 状态: {response.status}")
            content = await response.text()
            return {
                'url': url,
                'status': response.status,
                'length': len(content),
                'content': content[:100]  # 只返回前100个字符
            }
    except Exception as e:
        print(f"请求 {url} 失败: {e}")
        return {'url': url, 'error': str(e)}
async def fetch_multiple_urls(urls):
    # 创建会话(复用连接)
    async with aiohttp.ClientSession() as session:
        # 并发请求多个URL
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        # 处理结果
        success_count = 0
        for result in results:
            if isinstance(result, dict) and 'error' not in result:
                success_count += 1
                print(f"✅ {result['url']}: {result['status']} ({result['length']} 字符)")
            else:
                print(f"❌ 失败: {result}")
        print(f"成功请求: {success_count}/{len(urls)}")
        return results
async def main():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/status/200',
        'https://httpbin.org/status/404'
    ]
    start_time = time.time()
    await fetch_multiple_urls(urls)
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f}秒")
# 安装aiohttp: pip install aiohttp
# asyncio.run(main())
异步HTTP客户端的最佳实践:
import asyncio
import aiohttp
from aiohttp import ClientTimeout, ClientSession
class AsyncHttpClient:
    def __init__(self, timeout=30, max_connections=100):
        self.timeout = ClientTimeout(total=timeout)
        self.connector = aiohttp.TCPConnector(limit=max_connections)
    async def __aenter__(self):
        self.session = ClientSession(
            timeout=self.timeout,
            connector=self.connector
        )
        return self
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
    async def get(self, url, **kwargs):
        async with self.session.get(url, **kwargs) as response:
            return await response.json()
    async def post(self, url, data=None, json=None, **kwargs):
        async with self.session.post(url, data=data, json=json, **kwargs) as response:
            return await response.json()
async def api_client_example():
    async with AsyncHttpClient() as client:
        # GET请求
        data = await client.get('https://api.github.com/users/python')
        print(f"Python GitHub followers: {data['followers']}")
        # POST请求
        result = await client.post('https://httpbin.org/post', json={'key': 'value'})
        print(f"POST响应: {result['json']}")
# asyncio.run(api_client_example())
# 3、异步编程最佳实践
# 3.1、何时使用异步
适合异步的场景:
- I/O密集型操作(网络请求、数据库查询、文件读写)
 - 需要并发处理大量连接
 - 实时性要求高的应用
 - WebSocket、聊天服务器、API网关
 
# ✅ 适合异步:网络爬虫
import asyncio
import aiohttp
async def crawl_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results
# ✅ 适合异步:WebSocket聊天服务器
async def handle_chat_messages(websocket):
    async for message in websocket:
        await broadcast_to_other_clients(message)
# ❌ 不适合异步:CPU密集型计算
async def heavy_computation():
    # 这会阻塞事件循环
    result = sum(i * i for i in range(10000000))
    return result
不适合异步的场景:
- CPU密集型计算(数值计算、图像处理)
 - 简单的脚本程序
 - 同步第三方库的包装
 
# 3.2、异步与多线程的选择
异步 vs 多线程对比:
| 维度 | 异步 | 多线程 | 
|---|---|---|
| 并发模型 | 单线程事件循环 | 多线程抢占式 | 
| 内存占用 | 低(单线程) | 高(每个线程栈空间) | 
| 上下文切换 | 快(用户态) | 慢(内核态) | 
| 编程复杂度 | 高(需要异步思维) | 中等 | 
| 调试难度 | 较高 | 中等 | 
| 适用场景 | I/O密集型 | CPU密集型或阻塞操作 | 
混合使用示例:
import asyncio
import concurrent.futures
import time
def cpu_intensive_task(n):
    """CPU密集型任务 - 在线程池中执行"""
    return sum(i * i for i in range(n))
async def async_io_task():
    """I/O密集型任务 - 异步执行"""
    await asyncio.sleep(1)
    return "I/O任务完成"
async def mixed_workload():
    start_time = time.time()
    # 创建线程池执行器
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        # 并发执行不同类型的任务
        tasks = [
            # CPU密集型任务在线程池中
            loop.run_in_executor(executor, cpu_intensive_task, 1000000),
            loop.run_in_executor(executor, cpu_intensive_task, 2000000),
            # I/O密集型任务异步执行
            async_io_task(),
            async_io_task(),
        ]
        results = await asyncio.gather(*tasks)
    end_time = time.time()
    print(f"混合任务完成,耗时: {end_time - start_time:.2f}秒")
    print(f"结果: {results}")
asyncio.run(mixed_workload())
# 3.3、常见陷阱
陷阱1:在协程中调用阻塞函数
import asyncio
import time
# ❌ 错误:阻塞事件循环
async def bad_example():
    time.sleep(2)  # 阻塞2秒,整个事件循环被阻塞
    return "完成"
# ✅ 正确:使用异步版本
async def good_example():
    await asyncio.sleep(2)  # 非阻塞等待
    return "完成"
# ✅ 或者在线程池中执行阻塞操作
async def blocking_in_thread():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        result = await loop.run_in_executor(executor, time.sleep, 2)
    return "完成"
陷阱2:忘记await
import asyncio
async def forget_await():
    # ❌ 错误:忘记await,协程不会执行
    asyncio.sleep(1)
    print("这可能不会按预期执行")
async def correct_usage():
    # ✅ 正确:使用await
    await asyncio.sleep(1)
    print("1秒后执行")
陷阱3:异常处理不当
import asyncio
async def task_with_error():
    await asyncio.sleep(0.1)
    raise ValueError("任务失败")
async def bad_error_handling():
    # ❌ 错误:异常被忽略
    asyncio.create_task(task_with_error())
async def good_error_handling():
    # ✅ 正确:处理异常
    task = asyncio.create_task(task_with_error())
    try:
        await task
    except ValueError as e:
        print(f"捕获到异常: {e}")
# ✅ 使用gather处理异常
async def gather_with_exceptions():
    tasks = [
        asyncio.create_task(task_with_error()),
        asyncio.create_task(asyncio.sleep(1))
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
        else:
            print(f"任务 {i} 成功: {result}")
陷阱4:过度使用异步
# ❌ 过度设计:简单的同步任务不需要异步
async def over_engineered():
    result = await async_add(2, 3)
    return result
# ✅ 简单直接
def simple():
    return 2 + 3
最佳实践总结:
- 保持异步函数纯净:异步函数中只调用异步函数
 - 正确处理异常:使用try/except包装可能失败的操作
 - 避免阻塞操作:将阻塞操作放到线程池中
 - 合理并发数:控制同时进行的任务数量
 - 使用连接池:复用数据库和HTTP连接
 - 性能监控:使用工具监控异步程序性能
 
import asyncio
import aiohttp
from asyncio import Semaphore
async def rate_limited_fetch(session, url, semaphore):
    """带速率限制的请求"""
    async with semaphore:  # 限制并发数
        async with session.get(url) as response:
            return await response.text()
async def robust_fetch_all(urls, max_concurrent=10):
    """健壮的批量请求"""
    semaphore = Semaphore(max_concurrent)
    async with aiohttp.ClientSession() as session:
        tasks = [
            rate_limited_fetch(session, url, semaphore)
            for url in urls
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        success_count = sum(1 for r in results if not isinstance(r, Exception))
        print(f"成功: {success_count}/{len(urls)}")
        return results
📖 系列文章导航
👈 上一篇:Java开发者转战Python:基础篇 - Python基础语法、数据结构、面向对象编程
👉 下一篇:Java开发者转战Python:热门库与质量管理 - 探索Python生态系统和工程实践
祝你变得更强!