轩辕李的博客 轩辕李的博客
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

轩辕李

勇猛精进,星辰大海
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Java

  • Spring

  • 其他语言

    • C语言指针二三事
    • 从Java到Kotlin
    • Groovy语言探索
    • Java开发者转战Python:基础篇
    • Java开发者转战Python:进阶篇
      • 一、Python进阶特性
        • 1、异常处理
        • 1.1、try-except基本语法
        • 1.2、try-except-else-finally
        • 1.3、异常类型与继承体系
        • 1.4、raise语句
        • 1.5、自定义异常
        • 1.6、异常链(Exception Chaining)
        • 1.7、实用异常处理模式
        • 2、文件操作与I/O
        • 2.1、文件打开与关闭
        • 2.2、文本文件读写
        • 2.3、二进制文件处理
        • 2.4、pathlib模块(现代路径操作)
        • 2.5、标准输入输出
        • 3、迭代器与生成器
        • 3.1、迭代器协议
        • 3.2、生成器函数(yield)
        • 3.3、生成器表达式
        • 3.4、itertools模块
        • 3.5、惰性求值优势
        • 4、装饰器
        • 4.1、函数装饰器基础
        • 4.2、functools.wraps保留元信息
        • 4.3、带参数的装饰器
        • 4.4、类装饰器
        • 4.5、装饰器叠加
        • 4.6、常用内置装饰器
        • 4.7、装饰器实战案例
        • 5、上下文管理器
        • 5.1、with语句原理
        • 5.2、自定义上下文管理器
        • 5.3、contextlib模块
        • 5.4、嵌套上下文管理器
        • 5.5、资源管理最佳实践
        • 6、函数式编程
        • 6.1、map()、filter()、reduce()
        • 6.2、高阶函数
        • 6.3、偏函数(functools.partial)
        • 6.4、函数组合
        • 6.5、纯函数与副作用
        • 7、类型提示(Type Hints)
        • 7.1、基本类型注解
        • 7.2、复合类型注解
        • 7.3、Optional与Union
        • 7.4、泛型类型
        • 7.5、typing模块常用类型
        • 7.6、类型别名
        • 7.7、mypy静态类型检查
        • 8、反射与内省
        • 8.1、inspect模块
        • 8.2、动态属性操作
        • 8.3、动态导入模块
        • 8.4、类型检查
        • 8.5、动态创建类
        • 8.6、元类(Metaclass)基础
        • 8.7、实战案例
      • 二、Python标准库
        • 1、常用内置模块
        • 1.1、os与sys模块
        • 1.2、datetime模块
        • 1.3、json模块
        • 1.4、re模块(正则表达式)
        • 1.5、math与random模块
        • 1.6、collections模块
        • 1.7、pathlib模块
        • 2、数据处理
        • 2.1、csv模块
        • 2.2、XML处理
        • 2.3、pickle模块(序列化)
        • 2.4、struct模块(二进制数据)
        • 3、网络编程
        • 3.1、socket模块
        • 3.2、urllib模块
        • 3.3、http.server
        • 4、多线程与多进程
        • 4.1、threading模块
        • 4.2、multiprocessing模块
        • 4.3、concurrent.futures模块
        • 4.4、GIL详解
        • 5、日志与调试
        • 5.1、logging模块
        • 5.2、pdb调试器
        • 5.3、traceback模块
      • 三、异步编程
        • 1、asyncio基础
        • 1.1、async/await语法
        • 1.2、协程概念
        • 1.3、事件循环
        • 1.4、Task与Future
        • 2、异步I/O
        • 2.1、异步文件操作
        • 2.2、异步网络请求
        • 3、异步编程最佳实践
        • 3.1、何时使用异步
        • 3.2、异步与多线程的选择
        • 3.3、常见陷阱
    • Java开发者转战Python:热门库与质量管理
  • 工具

  • 后端
  • 其他语言
轩辕李
2025-01-20
目录

Java开发者转战Python:进阶篇

📖 系列文章导航

本文是"Java开发者转战Python"系列的第二篇,深入学习Python的进阶特性。

📚 系列:

  • 基础篇 - Python基础语法、数据结构、面向对象
  • 热门库与质量管理 - Python生态、质量管理

# 一、Python进阶特性

# 1、异常处理

Python的异常处理机制与Java类似,但语法更简洁,并提供了一些独特的特性。

# 1.1、try-except基本语法

基本用法

# 基本异常捕获
try:
    number = int(input("请输入数字: "))
    result = 10 / number
    print(f"结果: {result}")
except ValueError:
    print("输入的不是有效数字")
except ZeroDivisionError:
    print("不能除以零")

对比Java:

// Java异常处理
try {
    int number = Integer.parseInt(scanner.nextLine());
    int result = 10 / number;
    System.out.println("结果: " + result);
} catch (NumberFormatException e) {
    System.out.println("输入的不是有效数字");
} catch (ArithmeticException e) {
    System.out.println("不能除以零");
}

捕获多个异常

# 方式1:分别捕获
try:
    data = json.loads(json_string)
except ValueError as e:
    print(f"JSON解析错误: {e}")
except KeyError as e:
    print(f"键不存在: {e}")

# 方式2:合并捕获(异常处理逻辑相同时)
try:
    file = open("data.txt")
    data = process(file)
except (FileNotFoundError, PermissionError) as e:
    print(f"文件访问错误: {e}")

# 方式3:捕获所有异常(不推荐用于生产)
try:
    risky_operation()
except Exception as e:
    print(f"发生错误: {e}")

# 1.2、try-except-else-finally

Python的异常处理提供了else和finally子句。

else子句

try:
    file = open("data.txt", "r")
    data = file.read()
except FileNotFoundError:
    print("文件不存在")
else:
    # 只有try块成功时才执行
    print(f"读取了{len(data)}个字符")
    process_data(data)
finally:
    # 无论如何都会执行
    if 'file' in locals():
        file.close()
        print("文件已关闭")

完整示例

def divide(a, b):
    try:
        result = a / b
    except ZeroDivisionError:
        print("错误:除数不能为零")
        return None
    except TypeError:
        print("错误:参数类型错误")
        return None
    else:
        print(f"计算成功: {a} / {b} = {result}")
        return result
    finally:
        print("除法操作完成")

# 使用
divide(10, 2)
# 输出:
# 计算成功: 10 / 2 = 5.0
# 除法操作完成

divide(10, 0)
# 输出:
# 错误:除数不能为零
# 除法操作完成

对比Java:

// Java没有else,但有finally
try {
    result = a / b;
    System.out.println("计算成功");  // 需要手动判断
} catch (ArithmeticException e) {
    System.out.println("错误:除数不能为零");
} finally {
    System.out.println("除法操作完成");
}

# 1.3、异常类型与继承体系

Python的异常都继承自BaseException。

异常层次结构

BaseException
├── SystemExit           # 系统退出
├── KeyboardInterrupt    # 用户中断(Ctrl+C)
├── GeneratorExit        # 生成器退出
└── Exception            # 常规异常的基类
    ├── StopIteration
    ├── ArithmeticError
    │   ├── ZeroDivisionError
    │   ├── OverflowError
    │   └── FloatingPointError
    ├── AssertionError
    ├── AttributeError
    ├── EOFError
    ├── ImportError
    │   └── ModuleNotFoundError
    ├── LookupError
    │   ├── IndexError
    │   └── KeyError
    ├── MemoryError
    ├── NameError
    │   └── UnboundLocalError
    ├── OSError
    │   ├── FileNotFoundError
    │   ├── PermissionError
    │   └── TimeoutError
    ├── RuntimeError
    │   ├── NotImplementedError
    │   └── RecursionError
    ├── TypeError
    ├── ValueError
    │   └── UnicodeError
    └── Warning

常见异常

# ValueError:值错误
int("abc")  # ValueError: invalid literal

# TypeError:类型错误
"string" + 123  # TypeError: can only concatenate str

# KeyError:字典键不存在
{"a": 1}["b"]  # KeyError: 'b'

# IndexError:索引越界
[1, 2, 3][5]  # IndexError: list index out of range

# AttributeError:属性不存在
"string".non_existent  # AttributeError: 'str' object has no attribute

# FileNotFoundError:文件不存在
open("non_existent.txt")  # FileNotFoundError

# ImportError:模块导入失败
import non_existent_module  # ModuleNotFoundError

# 1.4、raise语句

抛出异常

def withdraw(amount, balance):
    if amount <= 0:
        raise ValueError("取款金额必须大于0")

    if amount > balance:
        raise ValueError(f"余额不足:需要{amount},当前{balance}")

    return balance - amount

# 使用
try:
    new_balance = withdraw(1000, 500)
except ValueError as e:
    print(f"操作失败: {e}")
# 输出: 操作失败: 余额不足:需要1000,当前500

重新抛出异常

def process_data(data):
    try:
        result = risky_operation(data)
    except ValueError as e:
        print(f"警告: {e}")
        raise  # 重新抛出原异常

# 或者抛出新异常
def process_file(filename):
    try:
        # 提示:文件操作的详细内容请见第8章"文件操作与I/O"
        with open(filename) as f:
            return f.read()
    except FileNotFoundError:
        raise ValueError(f"配置文件{filename}不存在")

# 1.5、自定义异常

# 基本自定义异常
class ValidationError(Exception):
    """数据验证错误"""
    pass

# 带额外信息的异常
class InsufficientFundsError(Exception):
    """余额不足异常"""
    def __init__(self, balance, amount):
        self.balance = balance
        self.amount = amount
        self.shortage = amount - balance
        super().__init__(f"余额不足:需要{amount},当前{balance},缺少{self.shortage}")

# 使用
def withdraw(balance, amount):
    if amount > balance:
        raise InsufficientFundsError(balance, amount)
    return balance - amount

try:
    new_balance = withdraw(100, 150)
except InsufficientFundsError as e:
    print(e)
    print(f"缺少金额: {e.shortage}")
# 输出:
# 余额不足:需要150,当前100,缺少50
# 缺少金额: 50

异常基类最佳实践

class AppError(Exception):
    """应用程序基础异常"""
    pass

class DatabaseError(AppError):
    """数据库相关错误"""
    pass

class NetworkError(AppError):
    """网络相关错误"""
    pass

class APIError(AppError):
    """API调用错误"""
    def __init__(self, status_code, message):
        self.status_code = status_code
        self.message = message
        super().__init__(f"API错误 {status_code}: {message}")

# 使用层次化捕获
try:
    call_api()
except APIError as e:
    print(f"API调用失败: {e}")
except NetworkError as e:
    print(f"网络错误: {e}")
except AppError as e:
    print(f"应用错误: {e}")

对比Java:

// Java自定义异常
public class InsufficientFundsException extends Exception {
    private double balance;
    private double amount;

    public InsufficientFundsException(double balance, double amount) {
        super("余额不足:需要" + amount + ",当前" + balance);
        this.balance = balance;
        this.amount = amount;
    }

    public double getShortage() {
        return amount - balance;
    }
}

# 1.6、异常链(Exception Chaining)

Python 3支持异常链,保留原始异常信息。

使用from关键字

def parse_config(filename):
    try:
        with open(filename) as f:
            return json.load(f)
    except FileNotFoundError as e:
        raise ValueError(f"配置文件{filename}不存在") from e
    except json.JSONDecodeError as e:
        raise ValueError(f"配置文件格式错误") from e

try:
    config = parse_config("config.json")
except ValueError as e:
    print(f"错误: {e}")
    print(f"原因: {e.__cause__}")
    # 可以追溯到原始异常

隐式异常链

try:
    try:
        result = 10 / 0
    except ZeroDivisionError:
        # 在异常处理中又发生了异常
        undefined_variable  # NameError
except NameError as e:
    print(f"当前异常: {e}")
    print(f"之前的异常: {e.__context__}")

抑制异常链

try:
    result = 10 / 0
except ZeroDivisionError:
    # 使用from None抑制异常链
    raise ValueError("计算错误") from None

# 1.7、实用异常处理模式

资源清理

# ❌ 不推荐
file = open("data.txt")
try:
    data = file.read()
    process(data)
finally:
    file.close()

# ✅ 推荐:使用with语句
with open("data.txt") as file:
    data = file.read()
    process(data)
# 自动关闭文件

多个资源

# 管理多个资源
with open("input.txt") as infile, open("output.txt", "w") as outfile:
    data = infile.read()
    outfile.write(data.upper())

EAFP vs LBYL

Python推崇EAFP(Easier to Ask for Forgiveness than Permission,请求原谅比请求许可更容易)而非LBYL(Look Before You Leap,先检查再行动)。

# LBYL(Java风格)- 不推荐
if key in dictionary:
    value = dictionary[key]
else:
    value = default_value

# EAFP(Python风格)- 推荐
try:
    value = dictionary[key]
except KeyError:
    value = default_value

# 或者使用get方法
value = dictionary.get(key, default_value)

异常传播与日志

import logging

def process_user_data(user_id):
    try:
        user = fetch_user(user_id)
        data = transform_data(user)
        save_data(data)
    except DatabaseError as e:
        logging.error(f"数据库错误处理用户{user_id}: {e}")
        raise  # 重新抛出,让上层处理
    except ValidationError as e:
        logging.warning(f"验证失败用户{user_id}: {e}")
        return None  # 吞掉异常,返回默认值
    except Exception as e:
        logging.critical(f"未知错误处理用户{user_id}: {e}", exc_info=True)
        raise  # 致命错误,必须抛出

小结对比表

特性 Python Java
基本语法 try-except try-catch
多异常 except (E1, E2) 多个catch块或\|
else子句 支持 不支持
finally 支持 支持
异常链 raise ... from Throwable.initCause()
检查异常 无 有(编译时检查)
自定义异常 继承Exception 继承Exception

Python的异常处理更简洁,但缺少Java的检查异常机制。这让Python更灵活,但也需要开发者更自律地处理异常!

# 2、文件操作与I/O

Python的文件操作比Java简单直观得多,无需处理繁琐的流和缓冲区。

前置知识:异常处理已在第1章"异常处理"中介绍,本章会大量使用with语句。

# 2.1、文件打开与关闭

基本操作

# 打开文件
file = open("data.txt", "r")  # 读模式
content = file.read()
file.close()  # 必须手动关闭

# ✅ 推荐:使用with语句自动关闭
with open("data.txt", "r") as file:
    content = file.read()
# 自动关闭,即使发生异常也会关闭

文件模式

模式 说明 Java对应
'r' 只读(默认) FileReader
'w' 写入(覆盖) FileWriter
'a' 追加 FileWriter(file, true)
'x' 独占创建(文件已存在则失败) -
'b' 二进制模式 FileInputStream
't' 文本模式(默认) -
'+' 读写模式 RandomAccessFile

常用组合

# 文本文件
open("file.txt", "r")      # 读
open("file.txt", "w")      # 写(覆盖)
open("file.txt", "a")      # 追加
open("file.txt", "r+")     # 读写

# 二进制文件
open("image.png", "rb")    # 读二进制
open("image.png", "wb")    # 写二进制

对比Java:

// Java需要更多代码
try (BufferedReader reader = new BufferedReader(
        new FileReader("data.txt"))) {
    String content = reader.readLine();
} catch (IOException e) {
    e.printStackTrace();
}

# 2.2、文本文件读写

读取文件

# 方式1:一次性读取全部内容
with open("data.txt", "r", encoding="utf-8") as f:
    content = f.read()
    print(content)

# 方式2:按行读取(返回列表)
with open("data.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
    for line in lines:
        print(line.strip())  # 去除换行符

# 方式3:逐行迭代(推荐,内存友好)
with open("data.txt", "r", encoding="utf-8") as f:
    for line in f:
        print(line.strip())

# 方式4:读取指定字节数
with open("data.txt", "r") as f:
    chunk = f.read(100)  # 读取前100个字符

写入文件

# 写入字符串
with open("output.txt", "w", encoding="utf-8") as f:
    f.write("Hello, World!\n")
    f.write("第二行\n")

# 写入多行
lines = ["第一行\n", "第二行\n", "第三行\n"]
with open("output.txt", "w", encoding="utf-8") as f:
    f.writelines(lines)

# 追加内容
with open("output.txt", "a", encoding="utf-8") as f:
    f.write("追加的内容\n")

实用示例

# 复制文件
with open("source.txt", "r") as src, open("dest.txt", "w") as dst:
    dst.write(src.read())

# 处理CSV
with open("data.csv", "r") as f:
    for line in f:
        fields = line.strip().split(",")
        print(fields)

# 统计行数
with open("data.txt", "r") as f:
    line_count = sum(1 for line in f)
    print(f"总行数: {line_count}")

# 查找并替换
with open("input.txt", "r") as f:
    content = f.read()

content = content.replace("old", "new")

with open("output.txt", "w") as f:
    f.write(content)

# 2.3、二进制文件处理

读写二进制文件

# 读取二进制文件
with open("image.png", "rb") as f:
    data = f.read()
    print(f"文件大小: {len(data)} 字节")

# 写入二进制文件
with open("output.bin", "wb") as f:
    f.write(b"\x00\x01\x02\x03")

# 复制二进制文件
with open("source.png", "rb") as src, open("dest.png", "wb") as dst:
    dst.write(src.read())

# 分块读取大文件(内存友好)
with open("large_file.bin", "rb") as f:
    while chunk := f.read(8192):  # 每次读8KB
        process(chunk)

使用struct模块处理二进制数据

import struct

# 写入结构化二进制数据
with open("data.bin", "wb") as f:
    # 写入:1个整数,1个浮点数,5个字符
    data = struct.pack("if5s", 42, 3.14, b"hello")
    f.write(data)

# 读取结构化二进制数据
with open("data.bin", "rb") as f:
    data = f.read()
    num, pi, text = struct.unpack("if5s", data)
    print(f"整数: {num}, 浮点: {pi}, 文本: {text.decode()}")

# 2.4、pathlib模块(现代路径操作)

pathlib是Python 3.4+推荐的路径操作方式,面向对象,比os.path更直观。

基本用法

from pathlib import Path

# 创建Path对象
file_path = Path("data/file.txt")
absolute_path = Path("/usr/local/bin/python")

# 获取当前目录
current_dir = Path.cwd()
print(current_dir)

# 获取用户主目录
home_dir = Path.home()
print(home_dir)

# 路径拼接
config_file = Path.home() / "config" / "settings.json"
# 等同于:Path.home().joinpath("config", "settings.json")

路径属性

file_path = Path("/home/user/documents/report.pdf")

print(file_path.name)        # report.pdf(文件名)
print(file_path.stem)        # report(文件名不含扩展名)
print(file_path.suffix)      # .pdf(扩展名)
print(file_path.parent)      # /home/user/documents(父目录)
print(file_path.parents[0])  # /home/user/documents
print(file_path.parents[1])  # /home/user
print(file_path.anchor)      # /(根目录)
print(file_path.parts)       # ('/', 'home', 'user', 'documents', 'report.pdf')

文件系统操作

from pathlib import Path

file_path = Path("data.txt")

# 检查存在性
if file_path.exists():
    print("文件存在")

# 检查类型
if file_path.is_file():
    print("是文件")
if file_path.is_dir():
    print("是目录")

# 读写文件
file_path.write_text("Hello, World!", encoding="utf-8")
content = file_path.read_text(encoding="utf-8")

# 二进制读写
file_path.write_bytes(b"\x00\x01\x02")
data = file_path.read_bytes()

# 创建目录
Path("new_dir").mkdir(exist_ok=True)           # 创建单层目录
Path("parent/child").mkdir(parents=True, exist_ok=True)  # 创建多层

# 删除文件
file_path.unlink(missing_ok=True)  # Python 3.8+

# 遍历目录
for item in Path(".").iterdir():
    print(item)

# 模式匹配查找文件
for py_file in Path(".").glob("*.py"):
    print(py_file)

for py_file in Path(".").rglob("*.py"):  # 递归查找
    print(py_file)

实用示例

from pathlib import Path

# 查找项目中所有Python文件
project_root = Path(".")
python_files = list(project_root.rglob("*.py"))
print(f"找到{len(python_files)}个Python文件")

# 统计代码行数
total_lines = 0
for py_file in python_files:
    lines = py_file.read_text().count("\n")
    total_lines += lines
print(f"总行数: {total_lines}")

# 安全地处理配置文件
config_file = Path.home() / ".myapp" / "config.json"
if not config_file.exists():
    config_file.parent.mkdir(parents=True, exist_ok=True)
    config_file.write_text('{"default": true}')

对比Java:

// Java使用Path和Files(Java 7+)
import java.nio.file.*;

Path filePath = Paths.get("/home/user/data.txt");
String content = Files.readString(filePath);
Files.writeString(filePath, "Hello");

// 遍历目录
try (var stream = Files.list(Paths.get("."))) {
    stream.forEach(System.out::println);
}

# 2.5、标准输入输出

输入

# input():读取一行输入
name = input("请输入姓名: ")
print(f"你好, {name}!")

# 读取数字
age = int(input("请输入年龄: "))

# 读取多个值
x, y = map(int, input("输入两个数字(空格分隔): ").split())

输出

# print():标准输出
print("Hello, World!")

# 多个参数
print("姓名:", name, "年龄:", age)

# 自定义分隔符和结束符
print("a", "b", "c", sep="-")      # a-b-c
print("loading", end="...")        # loading...(不换行)
print(" done!")                    # loading... done!

# 写入文件
with open("output.txt", "w") as f:
    print("写入文件", file=f)

格式化输出

name = "张三"
age = 25
score = 95.678

# f-string(推荐)
print(f"姓名: {name}, 年龄: {age}, 成绩: {score:.2f}")

# format方法
print("姓名: {}, 年龄: {}, 成绩: {:.2f}".format(name, age, score))

# %格式化(老式)
print("姓名: %s, 年龄: %d, 成绩: %.2f" % (name, age, score))

小结对比表

操作 Python Java
打开文件 open() FileReader/BufferedReader
自动关闭 with语句 try-with-resources
读取全部 read() readAll()或循环读取
逐行读取 for line in f BufferedReader.readLine()
路径操作 pathlib.Path java.nio.file.Path
文件存在 path.exists() Files.exists()
创建目录 path.mkdir() Files.createDirectories()

Python的文件I/O操作比Java简洁优雅得多,pathlib模块提供了现代化的路径处理方式!

# 3、迭代器与生成器

迭代器和生成器是Python的强大特性,能够高效处理大数据集和实现惰性求值。

# 3.1、迭代器协议

Python的迭代器基于两个魔术方法:__iter__和__next__。

基本概念

# 可迭代对象(Iterable):实现了__iter__方法
numbers = [1, 2, 3, 4, 5]
iterator = iter(numbers)  # 获取迭代器

# 迭代器(Iterator):实现了__iter__和__next__方法
print(next(iterator))  # 1
print(next(iterator))  # 2
print(next(iterator))  # 3

# 迭代完毕后抛出StopIteration
# next(iterator)  # 迭代完毕会抛出StopIteration

自定义迭代器

class Countdown:
    """倒计时迭代器"""
    def __init__(self, start):
        self.current = start

    def __iter__(self):
        return self

    def __next__(self):
        if self.current <= 0:
            raise StopIteration

        self.current -= 1
        return self.current + 1

# 使用
for num in Countdown(5):
    print(num)  # 5, 4, 3, 2, 1

实用迭代器示例

class FileReader:
    """逐行读取文件的迭代器"""
    def __init__(self, filename):
        self.file = open(filename, 'r')

    def __iter__(self):
        return self

    def __next__(self):
        line = self.file.readline()
        if not line:
            self.file.close()
            raise StopIteration
        return line.strip()

# 使用
for line in FileReader("data.txt"):
    print(line)

对比Java:

// Java使用Iterator接口
Iterator<Integer> iterator = numbers.iterator();
while (iterator.hasNext()) {
    System.out.println(iterator.next());
}

# 3.2、生成器函数(yield)

生成器是创建迭代器的最简单方式,使用yield关键字。

基本用法

def countdown(n):
    """倒计时生成器"""
    while n > 0:
        yield n
        n -= 1

# 使用
for num in countdown(5):
    print(num)  # 5, 4, 3, 2, 1

# 生成器是迭代器
gen = countdown(3)
print(next(gen))  # 3
print(next(gen))  # 2
print(next(gen))  # 1
# print(next(gen))  # StopIteration

工作原理

def simple_generator():
    print("开始")
    yield 1
    print("继续")
    yield 2
    print("结束")
    yield 3

gen = simple_generator()
print("创建生成器")
print(next(gen))  # 开始 -> 1
print(next(gen))  # 继续 -> 2
print(next(gen))  # 结束 -> 3

实用示例

# 1. 斐波那契数列
def fibonacci(n):
    """生成前n个斐波那契数"""
    a, b = 0, 1
    count = 0
    while count < n:
        yield a
        a, b = b, a + b
        count += 1

print(list(fibonacci(10)))
# [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

# 2. 逐行读取大文件(内存友好)
def read_large_file(file_path):
    """逐行读取文件"""
    with open(file_path) as f:
        for line in f:
            yield line.strip()

for line in read_large_file("huge_file.txt"):
    process(line)  # 一次只加载一行到内存

# 3. 批量处理数据
def batch(items, size):
    """将数据分批"""
    batch = []
    for item in items:
        batch.append(item)
        if len(batch) == size:
            yield batch
            batch = []
    if batch:  # 最后一批
        yield batch

# 使用
data = range(100)
for batch in batch(data, 10):
    print(f"处理批次: {batch}")

# 4. 无限序列
def infinite_sequence():
    """无限递增序列"""
    num = 0
    while True:
        yield num
        num += 1

gen = infinite_sequence()
print(next(gen))  # 0
print(next(gen))  # 1
print(next(gen))  # 2

# 3.3、生成器表达式

生成器表达式是列表推导式的生成器版本,使用圆括号。

注意:列表推导式和生成器表达式的完整用法,请参考第5章"数据结构与推导式"。

基本语法

# 列表推导式:立即计算,占用内存
squares_list = [x**2 for x in range(1000000)]

# 生成器表达式:惰性计算,节省内存
squares_gen = (x**2 for x in range(1000000))

# 使用
for square in squares_gen:
    if square > 100:
        break
    print(square)

对比

import sys

# 列表推导式
list_comp = [x for x in range(10000)]
print(f"列表大小: {sys.getsizeof(list_comp)} bytes")  # ~87616 bytes

# 生成器表达式
gen_expr = (x for x in range(10000))
print(f"生成器大小: {sys.getsizeof(gen_expr)} bytes")  # ~112 bytes

实用示例

# 1. 过滤和转换
numbers = range(1, 11)
even_squares = (x**2 for x in numbers if x % 2 == 0)
print(list(even_squares))  # [4, 16, 36, 64, 100]

# 2. 链式处理
lines = (line.strip() for line in open("data.txt"))
non_empty = (line for line in lines if line)
uppercase = (line.upper() for line in non_empty)

for line in uppercase:
    print(line)

# 3. 作为函数参数
sum_of_squares = sum(x**2 for x in range(10))
print(sum_of_squares)  # 285

max_value = max((x**2 for x in range(10)))
print(max_value)  # 81

# 4. 内存友好的数据处理
total = sum(int(line) for line in open("numbers.txt"))

# 3.4、itertools模块

itertools提供了高效的迭代器工具。

无限迭代器

from itertools import count, cycle, repeat

# count:无限计数
for i in count(10, 2):  # 从10开始,步长2
    if i > 20:
        break
    print(i)  # 10, 12, 14, 16, 18, 20

# cycle:无限循环
counter = 0
for item in cycle(['A', 'B', 'C']):
    if counter >= 5:
        break
    print(item, end=" ")  # A B C A B
    counter += 1

# repeat:重复元素
for item in repeat('Hello', 3):
    print(item)  # Hello(3次)

组合迭代器

from itertools import chain, zip_longest, product, combinations, permutations

# chain:连接多个迭代器
for item in chain([1, 2], [3, 4], [5, 6]):
    print(item, end=" ")  # 1 2 3 4 5 6

# zip_longest:配对(补齐)
for pair in zip_longest([1, 2, 3], ['a', 'b'], fillvalue='?'):
    print(pair)  # (1, 'a'), (2, 'b'), (3, '?')

# product:笛卡尔积
for pair in product([1, 2], ['a', 'b']):
    print(pair)  # (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')

# combinations:组合
for combo in combinations([1, 2, 3, 4], 2):
    print(combo)  # (1,2), (1,3), (1,4), (2,3), (2,4), (3,4)

# permutations:排列
for perm in permutations([1, 2, 3], 2):
    print(perm)  # (1,2), (1,3), (2,1), (2,3), (3,1), (3,2)

过滤和分组

from itertools import filterfalse, dropwhile, takewhile, groupby

# filterfalse:过滤假值
data = [1, 0, 2, 0, 3]
non_zero = filterfalse(lambda x: x == 0, data)
print(list(non_zero))  # [1, 2, 3]

# dropwhile:丢弃直到条件为假
numbers = [1, 3, 5, 6, 7, 8, 9]
result = dropwhile(lambda x: x < 6, numbers)
print(list(result))  # [6, 7, 8, 9]

# takewhile:获取直到条件为假
result = takewhile(lambda x: x < 6, numbers)
print(list(result))  # [1, 3, 5]

# groupby:分组
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('A', 5)]
for key, group in groupby(data, lambda x: x[0]):
    print(f"{key}: {list(group)}")
# A: [('A', 1), ('A', 2)]
# B: [('B', 3), ('B', 4)]
# A: [('A', 5)]

实用组合

from itertools import islice, tee, accumulate

# islice:切片迭代器
gen = (x**2 for x in range(100))
first_10 = list(islice(gen, 10))
print(first_10)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# tee:复制迭代器
gen1, gen2 = tee(range(5), 2)
print(list(gen1))  # [0, 1, 2, 3, 4]
print(list(gen2))  # [0, 1, 2, 3, 4]

# accumulate:累积
from operator import mul
numbers = [1, 2, 3, 4, 5]
print(list(accumulate(numbers)))        # [1, 3, 6, 10, 15](累加)
print(list(accumulate(numbers, mul)))   # [1, 2, 6, 24, 120](累乘)

# 3.5、惰性求值优势

生成器的惰性求值特性带来巨大优势。

内存效率

# ❌ 列表:立即计算,占用大量内存
def process_data_list():
    data = [expensive_operation(x) for x in range(1000000)]
    return sum(data)

# ✅ 生成器:惰性计算,内存占用小
def process_data_generator():
    data = (expensive_operation(x) for x in range(1000000))
    return sum(data)

无限序列

# 生成器可以表示无限序列
def primes():
    """无限素数生成器"""
    yield 2
    candidates = count(3, 2)
    while True:
        prime = next(candidates)
        yield prime
        candidates = (x for x in candidates if x % prime != 0)

# 取前10个素数
prime_gen = primes()
first_10_primes = [next(prime_gen) for _ in range(10)]
print(first_10_primes)  # [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]

管道式处理

# 数据处理管道
def read_log(filename):
    """读取日志"""
    with open(filename) as f:
        for line in f:
            yield line.strip()

def filter_errors(lines):
    """过滤错误"""
    for line in lines:
        if 'ERROR' in line:
            yield line

def parse_timestamp(lines):
    """解析时间戳"""
    for line in lines:
        # 假设格式:[2024-01-01 10:00:00] ERROR: message
        yield line.split(']')[0][1:]

# 管道组合
lines = read_log("app.log")
errors = filter_errors(lines)
timestamps = parse_timestamp(errors)

# 惰性执行,只在需要时处理
for ts in timestamps:
    print(ts)

小结对比表

特性 列表 生成器 Java Stream
求值方式 立即 惰性 惰性
内存占用 大 小 小
可重复使用 ✅ ❌ ❌
索引访问 ✅ ❌ ❌
长度获取 ✅ ❌ ❌
无限序列 ❌ ✅ ✅

# 4、装饰器

装饰器是Python的强大特性,允许在不修改原函数/类的情况下增强其功能。这类似于Java的注解+AOP,但更灵活强大。

前置知识:装饰器基于闭包概念,闭包已在第6章"函数定义与使用"中介绍。

# 4.1、函数装饰器基础

基本概念

装饰器本质是一个接受函数作为参数并返回新函数的高阶函数。

# 最简单的装饰器
def my_decorator(func):
    """装饰器函数"""
    def wrapper():
        print("函数执行前")
        func()
        print("函数执行后")
    return wrapper

# 使用装饰器(方式1:手动包装)
def say_hello():
    print("Hello!")

say_hello = my_decorator(say_hello)
say_hello()
# 输出:
# 函数执行前
# Hello!
# 函数执行后

# 使用装饰器(方式2:@语法糖)
@my_decorator
def say_world():
    print("World!")

say_world()
# 输出:
# 函数执行前
# World!
# 函数执行后

带参数的函数装饰

def my_decorator(func):
    def wrapper(*args, **kwargs):
        """接受任意参数"""
        print(f"调用 {func.__name__},参数: {args}, {kwargs}")
        result = func(*args, **kwargs)
        print(f"返回值: {result}")
        return result
    return wrapper

@my_decorator
def add(a, b):
    return a + b

@my_decorator
def greet(name, greeting="Hello"):
    return f"{greeting}, {name}!"

# 使用
result = add(3, 5)
# 调用 add,参数: (3, 5), {}
# 返回值: 8

message = greet("张三", greeting="你好")
# 调用 greet,参数: ('张三',), {'greeting': '你好'}
# 返回值: 你好, 张三!

对比Java:

// Java需要使用注解+AOP或代理模式
@Around("@annotation(LogExecution)")
public Object logExecution(ProceedingJoinPoint joinPoint) throws Throwable {
    System.out.println("方法执行前");
    Object result = joinPoint.proceed();
    System.out.println("方法执行后");
    return result;
}

@LogExecution
public void sayHello() {
    System.out.println("Hello!");
}

# 4.2、functools.wraps保留元信息

装饰器会改变函数的元信息,functools.wraps用于保留原函数的元数据。

from functools import wraps

# ❌ 不使用wraps
def bad_decorator(func):
    def wrapper(*args, **kwargs):
        """这是wrapper的文档"""
        return func(*args, **kwargs)
    return wrapper

@bad_decorator
def my_function():
    """这是my_function的文档"""
    pass

print(my_function.__name__)  # wrapper(错误!)
print(my_function.__doc__)   # 这是wrapper的文档(错误!)

# ✅ 使用wraps
def good_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        """这是wrapper的文档"""
        return func(*args, **kwargs)
    return wrapper

@good_decorator
def my_function2():
    """这是my_function2的文档"""
    pass

print(my_function2.__name__)  # my_function2(正确!)
print(my_function2.__doc__)   # 这是my_function2的文档(正确!)

标准装饰器模板

from functools import wraps

def my_decorator(func):
    """标准装饰器模板"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 执行前的逻辑
        print(f"Before calling {func.__name__}")

        # 调用原函数
        result = func(*args, **kwargs)

        # 执行后的逻辑
        print(f"After calling {func.__name__}")

        return result
    return wrapper

# 4.3、带参数的装饰器

装饰器本身也可以接受参数。

from functools import wraps

def repeat(times):
    """重复执行装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for i in range(times):
                print(f"第{i+1}次执行:")
                result = func(*args, **kwargs)
            return result
        return wrapper
    return decorator

@repeat(3)
def greet(name):
    print(f"Hello, {name}!")

greet("张三")
# 输出:
# 第1次执行:
# Hello, 张三!
# 第2次执行:
# Hello, 张三!
# 第3次执行:
# Hello, 张三!

工作原理

# @repeat(3) 等价于:
# greet = repeat(3)(greet)

# 分步理解:
decorator = repeat(3)      # 调用repeat(3),返回decorator函数
greet = decorator(greet)   # 调用decorator(greet),返回wrapper

实用带参数装饰器示例

from functools import wraps
import time

def retry(max_attempts=3, delay=1):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise e
                    print(f"第{attempt + 1}次尝试失败,{delay}秒后重试...")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry(max_attempts=5, delay=2)
def unstable_api_call():
    """不稳定的API调用"""
    import random
    if random.random() < 0.7:
        raise ConnectionError("API调用失败")
    return "成功"

# 使用
result = unstable_api_call()

# 4.4、类装饰器

类也可以作为装饰器,通过实现__call__方法。

from functools import wraps

class CountCalls:
    """统计函数调用次数"""
    def __init__(self, func):
        wraps(func)(self)
        self.func = func
        self.count = 0

    def __call__(self, *args, **kwargs):
        self.count += 1
        print(f"{self.func.__name__} 已被调用 {self.count} 次")
        return self.func(*args, **kwargs)

@CountCalls
def say_hello():
    print("Hello!")

say_hello()  # say_hello 已被调用 1 次 -> Hello!
say_hello()  # say_hello 已被调用 2 次 -> Hello!
say_hello()  # say_hello 已被调用 3 次 -> Hello!

print(say_hello.count)  # 3

带参数的类装饰器

from functools import wraps

class LogWith:
    """带日志级别的装饰器"""
    def __init__(self, level="INFO"):
        self.level = level

    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            print(f"[{self.level}] 调用 {func.__name__}")
            result = func(*args, **kwargs)
            print(f"[{self.level}] {func.__name__} 返回 {result}")
            return result
        return wrapper

@LogWith(level="DEBUG")
def add(a, b):
    return a + b

result = add(3, 5)
# [DEBUG] 调用 add
# [DEBUG] add 返回 8

# 4.5、装饰器叠加

多个装饰器可以叠加使用。

def make_bold(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        return "<b>" + func(*args, **kwargs) + "</b>"
    return wrapper

def make_italic(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        return "<i>" + func(*args, **kwargs) + "</i>"
    return wrapper

@make_bold
@make_italic
def say_hello():
    return "Hello!"

print(say_hello())  # <b><i>Hello!</i></b>

# 等价于:
# say_hello = make_bold(make_italic(say_hello))
# 执行顺序:从下到上装饰,从上到下执行

顺序理解

def decorator1(func):
    print(f"装饰器1 装饰 {func.__name__}")
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("装饰器1: 执行前")
        result = func(*args, **kwargs)
        print("装饰器1: 执行后")
        return result
    return wrapper

def decorator2(func):
    print(f"装饰器2 装饰 {func.__name__}")
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("装饰器2: 执行前")
        result = func(*args, **kwargs)
        print("装饰器2: 执行后")
        return result
    return wrapper

@decorator1
@decorator2
def test():
    print("执行test函数")

# 装饰阶段输出:
# 装饰器2 装饰 test
# 装饰器1 装饰 wrapper

test()
# 执行阶段输出:
# 装饰器1: 执行前
# 装饰器2: 执行前
# 执行test函数
# 装饰器2: 执行后
# 装饰器1: 执行后

# 4.6、常用内置装饰器

Python提供了一些常用的内置装饰器。

@property(属性装饰器)

详细内容:property装饰器的完整用法已在第3章"面向对象编程"中详细介绍。

class Circle:
    def __init__(self, radius):
        self._radius = radius

    @property
    def radius(self):
        """半径(只读)"""
        return self._radius

    @property
    def area(self):
        """面积(计算属性)"""
        return 3.14159 * self._radius ** 2

    @area.setter
    def area(self, value):
        """通过面积反推半径"""
        self._radius = (value / 3.14159) ** 0.5

circle = Circle(5)
print(circle.area)    # 78.53975
circle.area = 100
print(circle.radius)  # 5.641895835477563

@staticmethod(静态方法)

class MathUtils:
    @staticmethod
    def add(a, b):
        """静态方法,不需要访问类或实例"""
        return a + b

    @staticmethod
    def is_even(n):
        return n % 2 == 0

# 使用
print(MathUtils.add(3, 5))      # 8
print(MathUtils.is_even(4))     # True

@classmethod(类方法)

class Date:
    def __init__(self, year, month, day):
        self.year = year
        self.month = month
        self.day = day

    @classmethod
    def from_string(cls, date_string):
        """工厂方法:从字符串创建"""
        year, month, day = map(int, date_string.split('-'))
        return cls(year, month, day)

    @classmethod
    def today(cls):
        """工厂方法:创建今天的日期"""
        import datetime
        now = datetime.date.today()
        return cls(now.year, now.month, now.day)

# 使用
date1 = Date.from_string("2024-10-26")
date2 = Date.today()

@cached_property(缓存属性)

from functools import cached_property

class DataProcessor:
    def __init__(self, data):
        self.data = data

    @cached_property
    def processed_data(self):
        """计算量大的属性,只计算一次"""
        print("处理数据中...")
        import time
        time.sleep(2)  # 模拟耗时操作
        return [x * 2 for x in self.data]

processor = DataProcessor([1, 2, 3, 4, 5])
print(processor.processed_data)  # 处理数据中... [2, 4, 6, 8, 10]
print(processor.processed_data)  # [2, 4, 6, 8, 10](直接返回缓存)

# 4.7、装饰器实战案例

案例1:性能计时器

from functools import wraps
import time

def timer(func):
    """测量函数执行时间"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} 执行时间: {end - start:.4f}秒")
        return result
    return wrapper

@timer
def slow_function():
    time.sleep(1)
    return "完成"

result = slow_function()
# slow_function 执行时间: 1.0012秒

案例2:权限检查

from functools import wraps

def require_auth(func):
    """检查用户是否已登录"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 假设有一个全局的当前用户对象
        if not hasattr(wrapper, 'current_user') or not wrapper.current_user:
            raise PermissionError("需要登录")
        return func(*args, **kwargs)
    return wrapper

def require_role(role):
    """检查用户角色"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if not hasattr(wrapper, 'current_user'):
                raise PermissionError("需要登录")
            if wrapper.current_user.get('role') != role:
                raise PermissionError(f"需要{role}权限")
            return func(*args, **kwargs)
        return wrapper
    return decorator

@require_auth
def view_profile():
    return "个人资料"

@require_role('admin')
def delete_user(user_id):
    return f"删除用户 {user_id}"

# 设置当前用户
view_profile.current_user = {"username": "张三", "role": "user"}
delete_user.current_user = {"username": "管理员", "role": "admin"}

print(view_profile())      # 个人资料
print(delete_user(123))    # 删除用户 123

案例3:缓存/记忆化

from functools import wraps

def memoize(func):
    """缓存函数结果"""
    cache = {}

    @wraps(func)
    def wrapper(*args):
        if args not in cache:
            print(f"计算 {func.__name__}{args}")
            cache[args] = func(*args)
        else:
            print(f"从缓存获取 {args}")
        return cache[args]

    # 添加清除缓存的方法
    wrapper.cache = cache
    wrapper.clear_cache = lambda: cache.clear()

    return wrapper

@memoize
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

print(fibonacci(10))
# 计算 fibonacci(10)
# 计算 fibonacci(9)
# ...
# 55

print(fibonacci(10))  # 从缓存获取 (10)
print(fibonacci.cache)  # 查看缓存内容
fibonacci.clear_cache()  # 清除缓存

案例4:输入验证

from functools import wraps

def validate_types(**type_hints):
    """验证函数参数类型"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 获取函数签名
            import inspect
            sig = inspect.signature(func)
            bound = sig.bind(*args, **kwargs)
            bound.apply_defaults()

            # 验证类型
            for param_name, expected_type in type_hints.items():
                if param_name in bound.arguments:
                    value = bound.arguments[param_name]
                    if not isinstance(value, expected_type):
                        raise TypeError(
                            f"{param_name} 应为 {expected_type.__name__},"
                            f"实际为 {type(value).__name__}"
                        )

            return func(*args, **kwargs)
        return wrapper
    return decorator

@validate_types(name=str, age=int, salary=float)
def create_employee(name, age, salary):
    return f"{name}, {age}岁, 薪资{salary}"

# 正确调用
print(create_employee("张三", 25, 5000.0))

# 错误调用
try:
    create_employee("李四", "30", 6000.0)  # age类型错误
except TypeError as e:
    print(e)  # age 应为 int,实际为 str

案例5:日志记录

from functools import wraps
import logging

logging.basicConfig(level=logging.INFO)

def log_calls(log_args=True, log_result=True):
    """记录函数调用日志"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            logger = logging.getLogger(func.__module__)

            # 记录调用
            if log_args:
                logger.info(f"调用 {func.__name__},参数: {args}, {kwargs}")
            else:
                logger.info(f"调用 {func.__name__}")

            try:
                result = func(*args, **kwargs)

                # 记录结果
                if log_result:
                    logger.info(f"{func.__name__} 返回: {result}")

                return result
            except Exception as e:
                logger.error(f"{func.__name__} 抛出异常: {e}")
                raise

        return wrapper
    return decorator

@log_calls(log_args=True, log_result=True)
def divide(a, b):
    return a / b

result = divide(10, 2)
# INFO:__main__:调用 divide,参数: (10, 2), {}
# INFO:__main__:divide 返回: 5.0

try:
    divide(10, 0)
except ZeroDivisionError:
    pass
# INFO:__main__:调用 divide,参数: (10, 0), {}
# ERROR:__main__:divide 抛出异常: division by zero

案例6:单例模式

from functools import wraps

def singleton(cls):
    """单例装饰器"""
    instances = {}

    @wraps(cls)
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]

    return get_instance

@singleton
class DatabaseConnection:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        print(f"连接到数据库 {host}:{port}")

# 使用
db1 = DatabaseConnection("localhost", 5432)
# 连接到数据库 localhost:5432

db2 = DatabaseConnection("localhost", 5432)
# (不会打印,因为返回的是同一个实例)

print(db1 is db2)  # True

小结对比表

特性 Python装饰器 Java注解+AOP
语法 @decorator @Annotation
灵活性 高(运行时修改行为) 中(需要额外配置)
嵌套 支持多层装饰 支持
带参数 支持 支持
类装饰 支持装饰类 主要用于方法
内省 容易(__wrapped__) 需要反射

Python的装饰器比Java的注解更灵活强大,可以在运行时动态修改函数/类的行为,而且语法更简洁!

# 5、上下文管理器

上下文管理器让资源管理变得优雅安全,自动处理资源的获取和释放。这类似于Java的try-with-resources,但更灵活。

# 5.1、with语句原理

基本概念

with语句确保资源在使用后被正确清理,即使发生异常也能保证。

# 传统方式:需要手动管理
file = open("data.txt", "r")
try:
    content = file.read()
    process(content)
finally:
    file.close()  # 必须手动关闭

# with语句:自动管理
with open("data.txt", "r") as file:
    content = file.read()
    process(content)
# 自动关闭文件,即使发生异常

对比Java:

// Java try-with-resources(Java 7+)
try (BufferedReader reader = new BufferedReader(new FileReader("data.txt"))) {
    String content = reader.readLine();
    process(content);
} // 自动关闭资源

上下文管理器协议

上下文管理器需要实现__enter__和__exit__方法:

class MyContextManager:
    def __enter__(self):
        """进入with块时调用"""
        print("进入上下文")
        return self  # 返回值赋给as后的变量

    def __exit__(self, exc_type, exc_value, traceback):
        """退出with块时调用"""
        print("退出上下文")
        if exc_type is not None:
            print(f"发生异常: {exc_type.__name__}: {exc_value}")
        return False  # False表示不抑制异常

# 使用
with MyContextManager() as cm:
    print("执行代码")
    # raise ValueError("测试异常")

# 输出:
# 进入上下文
# 执行代码
# 退出上下文

__exit__方法参数说明

def __exit__(self, exc_type, exc_value, traceback):
    """
    exc_type: 异常类型(如果没有异常则为None)
    exc_value: 异常实例(如果没有异常则为None)
    traceback: 异常堆栈(如果没有异常则为None)

    返回值:
    - False或None: 异常继续传播
    - True: 抑制异常(异常被吞掉)
    """
    pass

# 5.2、自定义上下文管理器

文件操作管理器

class FileManager:
    """文件管理器"""
    def __init__(self, filename, mode):
        self.filename = filename
        self.mode = mode
        self.file = None

    def __enter__(self):
        """打开文件"""
        print(f"打开文件: {self.filename}")
        self.file = open(self.filename, self.mode)
        return self.file

    def __exit__(self, exc_type, exc_value, traceback):
        """关闭文件"""
        if self.file:
            print(f"关闭文件: {self.filename}")
            self.file.close()
        return False

# 使用
with FileManager("test.txt", "w") as f:
    f.write("Hello, World!")
# 输出:
# 打开文件: test.txt
# 关闭文件: test.txt

数据库连接管理器

class DatabaseConnection:
    """数据库连接管理器"""
    def __init__(self, host, database):
        self.host = host
        self.database = database
        self.connection = None

    def __enter__(self):
        """建立连接"""
        print(f"连接数据库: {self.host}/{self.database}")
        # 这里模拟连接
        self.connection = f"Connection({self.host}, {self.database})"
        return self.connection

    def __exit__(self, exc_type, exc_value, traceback):
        """关闭连接"""
        print("关闭数据库连接")
        self.connection = None

        # 如果有异常,回滚事务
        if exc_type is not None:
            print("发生异常,回滚事务")
        else:
            print("提交事务")

        return False

# 使用
with DatabaseConnection("localhost", "mydb") as conn:
    print(f"使用连接: {conn}")
    # 执行数据库操作

计时器上下文管理器

import time

class Timer:
    """计时器上下文管理器"""
    def __init__(self, name="代码块"):
        self.name = name
        self.start_time = None

    def __enter__(self):
        self.start_time = time.time()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        elapsed = time.time() - self.start_time
        print(f"{self.name} 执行时间: {elapsed:.4f}秒")
        return False

# 使用
with Timer("数据处理"):
    # 模拟耗时操作
    time.sleep(1)
    result = sum(range(1000000))

# 输出: 数据处理 执行时间: 1.xxxx秒

异常处理上下文管理器

class IgnoreException:
    """忽略特定异常"""
    def __init__(self, *exceptions):
        self.exceptions = exceptions

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        # 如果异常类型在指定列表中,抑制异常
        if exc_type is not None and issubclass(exc_type, self.exceptions):
            print(f"忽略异常: {exc_type.__name__}: {exc_value}")
            return True  # 抑制异常
        return False

# 使用
with IgnoreException(ValueError, KeyError):
    data = {"a": 1}
    value = data["b"]  # KeyError,但会被忽略
    print("继续执行")  # 这行不会执行

print("程序继续")  # 这行会执行
# 输出:
# 忽略异常: KeyError: 'b'
# 程序继续

# 5.3、contextlib模块

contextlib模块提供了创建上下文管理器的便捷工具。

@contextmanager装饰器

使用生成器函数创建上下文管理器:

from contextlib import contextmanager

@contextmanager
def file_manager(filename, mode):
    """使用装饰器创建上下文管理器"""
    print(f"打开文件: {filename}")
    file = open(filename, mode)
    try:
        yield file  # yield前是__enter__,yield后是__exit__
    finally:
        print(f"关闭文件: {filename}")
        file.close()

# 使用
with file_manager("test.txt", "w") as f:
    f.write("Hello!")

工作原理

@contextmanager
def my_context():
    # __enter__阶段
    print("进入")
    resource = "资源对象"

    try:
        yield resource  # 返回给as变量
        # with块正常结束后继续执行
        print("正常退出")
    except Exception as e:
        # with块抛出异常时执行
        print(f"异常退出: {e}")
        raise  # 重新抛出异常
    finally:
        # __exit__阶段,无论如何都执行
        print("清理资源")

# 使用
with my_context() as res:
    print(f"使用: {res}")

实用示例

from contextlib import contextmanager
import time

@contextmanager
def timing(label):
    """计时上下文管理器"""
    start = time.time()
    try:
        yield
    finally:
        end = time.time()
        print(f"{label}: {end - start:.4f}秒")

# 使用
with timing("数据库查询"):
    time.sleep(0.5)
    # 执行查询

临时修改环境

from contextlib import contextmanager
import os

@contextmanager
def temporary_env(**env_vars):
    """临时设置环境变量"""
    old_env = {}

    # 保存旧值并设置新值
    for key, value in env_vars.items():
        old_env[key] = os.environ.get(key)
        os.environ[key] = value

    try:
        yield
    finally:
        # 恢复旧值
        for key, old_value in old_env.items():
            if old_value is None:
                os.environ.pop(key, None)
            else:
                os.environ[key] = old_value

# 使用
with temporary_env(DEBUG="true", LOG_LEVEL="debug"):
    print(os.environ.get("DEBUG"))  # true
    # 执行需要特定环境的代码

print(os.environ.get("DEBUG"))  # None(已恢复)

suppress(抑制异常)

from contextlib import suppress

# 传统写法
try:
    os.remove("somefile.txt")
except FileNotFoundError:
    pass

# 使用suppress
with suppress(FileNotFoundError):
    os.remove("somefile.txt")

# 抑制多个异常
with suppress(FileNotFoundError, PermissionError):
    os.remove("protected_file.txt")

closing(自动关闭)

from contextlib import closing
from urllib.request import urlopen

# 确保对象的close()方法被调用
with closing(urlopen("http://example.com")) as page:
    content = page.read()
# 自动调用page.close()

redirect_stdout/redirect_stderr(重定向输出)

from contextlib import redirect_stdout, redirect_stderr
import io

# 重定向标准输出
f = io.StringIO()
with redirect_stdout(f):
    print("这些内容会被捕获")
    print("不会打印到控制台")

output = f.getvalue()
print(f"捕获的输出: {output}")

# 重定向到文件
with open("output.txt", "w") as f:
    with redirect_stdout(f):
        print("写入文件")
        help(str.upper)

ExitStack(管理多个上下文)

from contextlib import ExitStack

# 动态管理多个上下文
with ExitStack() as stack:
    # 动态添加上下文管理器
    files = [
        stack.enter_context(open(f"file{i}.txt", "w"))
        for i in range(5)
    ]

    # 使用所有文件
    for i, f in enumerate(files):
        f.write(f"内容 {i}\n")
# 所有文件自动关闭

# 条件性添加上下文
def process_files(filenames, use_backup=False):
    with ExitStack() as stack:
        files = []

        for filename in filenames:
            file = stack.enter_context(open(filename, "r"))
            files.append(file)

            if use_backup:
                backup = stack.enter_context(
                    open(f"{filename}.bak", "w")
                )
                files.append(backup)

        # 处理文件
        for f in files:
            process(f)

# 5.4、嵌套上下文管理器

多个with语句

# 方式1:嵌套with
with open("input.txt", "r") as infile:
    with open("output.txt", "w") as outfile:
        data = infile.read()
        outfile.write(data.upper())

# 方式2:单行with(Python 2.7+)
with open("input.txt", "r") as infile, open("output.txt", "w") as outfile:
    data = infile.read()
    outfile.write(data.upper())

组合使用

from contextlib import contextmanager
import threading

@contextmanager
def acquire_lock(lock):
    """获取锁"""
    print("获取锁")
    lock.acquire()
    try:
        yield
    finally:
        print("释放锁")
        lock.release()

# 使用
lock = threading.Lock()
with acquire_lock(lock):
    # 临界区代码
    print("执行临界区代码")

# 5.5、资源管理最佳实践

案例1:数据库事务管理

from contextlib import contextmanager

class Database:
    def __init__(self):
        self.connection = None

    @contextmanager
    def transaction(self):
        """事务上下文管理器"""
        print("开始事务")
        try:
            yield self
            print("提交事务")
            # self.connection.commit()
        except Exception as e:
            print(f"回滚事务: {e}")
            # self.connection.rollback()
            raise

# 使用
db = Database()
with db.transaction():
    # 执行数据库操作
    pass

案例2:临时修改配置

from contextlib import contextmanager

class Config:
    debug = False
    log_level = "INFO"

@contextmanager
def temporary_config(**changes):
    """临时修改配置"""
    original = {}

    # 保存并修改
    for key, value in changes.items():
        original[key] = getattr(Config, key)
        setattr(Config, key, value)

    try:
        yield Config
    finally:
        # 恢复
        for key, value in original.items():
            setattr(Config, key, value)

# 使用
print(Config.debug)  # False

with temporary_config(debug=True, log_level="DEBUG"):
    print(Config.debug)  # True
    print(Config.log_level)  # DEBUG

print(Config.debug)  # False(已恢复)

案例3:批量操作

from contextlib import contextmanager

@contextmanager
def batch_operation(batch_size=100):
    """批量操作上下文"""
    items = []

    def add_item(item):
        """添加项目"""
        items.append(item)
        if len(items) >= batch_size:
            process_batch(items)
            items.clear()

    try:
        yield add_item
    finally:
        # 处理剩余项目
        if items:
            process_batch(items)

def process_batch(items):
    print(f"处理批次: {len(items)} 项")

# 使用
with batch_operation(batch_size=3) as add:
    for i in range(10):
        add(f"项目{i}")
# 输出:
# 处理批次: 3 项
# 处理批次: 3 项
# 处理批次: 3 项
# 处理批次: 1 项

案例4:性能分析

from contextlib import contextmanager
import time
import functools

@contextmanager
def profile_section(name):
    """性能分析上下文"""
    start_time = time.time()
    start_memory = 0  # 简化示例

    try:
        yield
    finally:
        elapsed = time.time() - start_time
        print(f"{name}:")
        print(f"  时间: {elapsed:.4f}秒")

# 使用
with profile_section("数据处理"):
    data = [i ** 2 for i in range(1000000)]

with profile_section("数据保存"):
    time.sleep(0.1)

案例5:资源池管理

from contextlib import contextmanager
from queue import Queue

class ConnectionPool:
    """连接池"""
    def __init__(self, size=5):
        self.pool = Queue(maxsize=size)
        for i in range(size):
            self.pool.put(f"Connection_{i}")

    @contextmanager
    def get_connection(self):
        """获取连接"""
        conn = self.pool.get()
        print(f"获取连接: {conn}")
        try:
            yield conn
        finally:
            print(f"归还连接: {conn}")
            self.pool.put(conn)

# 使用
pool = ConnectionPool(size=2)

with pool.get_connection() as conn1:
    print(f"使用 {conn1}")

    with pool.get_connection() as conn2:
        print(f"使用 {conn2}")

小结对比表

特性 Python上下文管理器 Java try-with-resources
语法 with obj as var: try (Type var = ...) {}
协议 __enter__/__exit__ AutoCloseable.close()
自定义 类或@contextmanager 实现AutoCloseable
嵌套 支持 支持
异常抑制 支持(__exit__返回True) 不支持
多资源 with a, b: try (A a; B b)

# 6、函数式编程

说明:列表推导式和生成器表达式已在第5章"数据结构与推导式"中详细讲解,这里不再重复。

Python虽然不是纯函数式语言,但提供了丰富的函数式编程工具。对于熟悉Java 8+ Stream API的开发者来说,Python的函数式特性会感觉很亲切。

# 6.1、map()、filter()、reduce()

这三个函数是函数式编程的基石。

// Java Stream API方式
List<Integer> squares = IntStream.range(0, 10)
    .map(x -> x * x)
    .boxed()
    .collect(Collectors.toList());

List<Integer> evenSquares = IntStream.range(0, 10)
    .filter(x -> x % 2 == 0)
    .map(x -> x * x)
    .boxed()
    .collect(Collectors.toList());

// 多重循环 - Java需要嵌套Stream或flatMap
List<String> pairs = IntStream.rangeClosed(1, 3)
    .boxed()
    .flatMap(x -> Stream.of("a", "b", "c")
        .map(y -> "(" + x + ", " + y + ")"))
    .collect(Collectors.toList());

字符串处理示例:

# 提取所有单词的首字母
sentence = "Hello World Python Programming"
initials = [word[0] for word in sentence.split()]
print(initials)  # ['H', 'W', 'P', 'P']

# 过滤并转换
words = ["apple", "banana", "cherry", "date"]
upper_long_words = [w.upper() for w in words if len(w) > 5]
print(upper_long_words)  # ['BANANA', 'CHERRY']

# 嵌套列表展平
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flat = [num for row in matrix for num in row]
print(flat)  # [1, 2, 3, 4, 5, 6, 7, 8, 9]

# 6.2、高阶函数

高阶函数是指接受函数作为参数或返回函数的函数。

函数作为参数:

def apply_operation(numbers, operation):
    """对列表中每个数字应用操作"""
    return [operation(x) for x in numbers]

# 定义不同的操作
def square(x):
    return x ** 2

def double(x):
    return x * 2

def negate(x):
    return -x

numbers = [1, 2, 3, 4, 5]
print(apply_operation(numbers, square))   # [1, 4, 9, 16, 25]
print(apply_operation(numbers, double))   # [2, 4, 6, 8, 10]
print(apply_operation(numbers, negate))   # [-1, -2, -3, -4, -5]

# 使用lambda
print(apply_operation(numbers, lambda x: x**3))  # [1, 8, 27, 64, 125]

函数作为返回值:

def make_multiplier(n):
    """返回一个乘以n的函数"""
    def multiplier(x):
        return x * n
    return multiplier

times2 = make_multiplier(2)
times10 = make_multiplier(10)

print(times2(5))   # 10
print(times10(5))  # 50

# 实际应用:创建验证器
def make_range_validator(min_val, max_val):
    """创建范围验证函数"""
    def validator(value):
        return min_val <= value <= max_val
    return validator

age_validator = make_range_validator(0, 120)
percentage_validator = make_range_validator(0, 100)

print(age_validator(25))    # True
print(age_validator(150))   # False
print(percentage_validator(85))  # True

内置高阶函数:

# sorted() - 自定义排序
students = [
    {'name': 'Alice', 'age': 25, 'score': 85},
    {'name': 'Bob', 'age': 22, 'score': 92},
    {'name': 'Charlie', 'age': 23, 'score': 78}
]

# 按年龄排序
by_age = sorted(students, key=lambda s: s['age'])
print([s['name'] for s in by_age])  # ['Bob', 'Charlie', 'Alice']

# 按分数降序
by_score = sorted(students, key=lambda s: s['score'], reverse=True)
print([s['name'] for s in by_score])  # ['Bob', 'Alice', 'Charlie']

# 多级排序:先按分数降序,再按年龄升序
multi_sort = sorted(students, key=lambda s: (-s['score'], s['age']))

# max()/min() - 自定义比较
oldest = max(students, key=lambda s: s['age'])
print(oldest['name'])  # Alice

highest_score = max(students, key=lambda s: s['score'])
print(highest_score['name'])  # Bob

# 6.3、偏函数(functools.partial)

偏函数用于固定函数的某些参数,创建新函数。

from functools import partial

# 基本示例
def power(base, exponent):
    return base ** exponent

# 创建偏函数 - 固定exponent=2
square = partial(power, exponent=2)
print(square(5))  # 25
print(square(10))  # 100

# 固定exponent=3
cube = partial(power, exponent=3)
print(cube(5))  # 125

# 实际应用1:日志记录
def log_message(message, level='INFO', timestamp=True):
    import datetime
    prefix = f"[{datetime.datetime.now()}] " if timestamp else ""
    print(f"{prefix}{level}: {message}")

# 创建专用日志函数
log_error = partial(log_message, level='ERROR')
log_warning = partial(log_message, level='WARNING')
log_debug = partial(log_message, level='DEBUG', timestamp=False)

log_error("Database connection failed")
log_warning("Low memory")
log_debug("Variable value: 42")

# 实际应用2:数据转换
def convert_value(value, multiplier=1, offset=0, round_digits=2):
    """通用数值转换函数"""
    result = value * multiplier + offset
    return round(result, round_digits)

# 摄氏度转华氏度: F = C * 9/5 + 32
celsius_to_fahrenheit = partial(convert_value, multiplier=9/5, offset=32)
print(celsius_to_fahrenheit(0))    # 32.0
print(celsius_to_fahrenheit(100))  # 212.0

# 米转英尺: feet = meter * 3.28084
meter_to_feet = partial(convert_value, multiplier=3.28084)
print(meter_to_feet(10))  # 32.81

# 实际应用3:配置HTTP请求
import functools

def make_request(url, method='GET', headers=None, timeout=30):
    """模拟HTTP请求"""
    headers = headers or {}
    print(f"{method} {url}")
    print(f"Headers: {headers}")
    print(f"Timeout: {timeout}s")

# API特定配置
api_headers = {'Authorization': 'Bearer token123', 'Content-Type': 'application/json'}
api_request = partial(make_request, headers=api_headers, timeout=60)

# 使用
api_request('https://api.example.com/users')
api_request('https://api.example.com/posts', method='POST')

与Java对比:

// Java没有直接的偏函数,需要手动包装
BiFunction<Integer, Integer, Integer> power = (base, exp) -> (int) Math.pow(base, exp);

// 创建"偏函数"
Function<Integer, Integer> square = base -> power.apply(base, 2);
Function<Integer, Integer> cube = base -> power.apply(base, 3);

# 6.4、函数组合

函数组合是将多个函数组合成一个新函数。

# 手动实现函数组合
def compose(*functions):
    """从右到左组合函数"""
    def inner(arg):
        result = arg
        for func in reversed(functions):
            result = func(result)
        return result
    return inner

# 定义基础函数
def add_one(x):
    return x + 1

def double(x):
    return x * 2

def square(x):
    return x ** 2

# 组合函数:(x+1) * 2 然后平方
combined = compose(square, double, add_one)
print(combined(3))  # ((3+1)*2)^2 = 64

# 更优雅的实现
from functools import reduce

def compose2(*functions):
    """使用reduce实现函数组合"""
    return reduce(lambda f, g: lambda x: f(g(x)), functions)

combined2 = compose2(square, double, add_one)
print(combined2(3))  # 64

# 实际应用:数据处理管道
def remove_spaces(text):
    return text.replace(' ', '')

def to_lowercase(text):
    return text.lower()

def remove_punctuation(text):
    import string
    return text.translate(str.maketrans('', '', string.punctuation))

# 组合文本清洗函数
clean_text = compose(remove_punctuation, to_lowercase, remove_spaces)

text = "Hello, World! Python is AWESOME."
print(clean_text(text))  # heloworldpythonisawesome

# 更Pythonic的方式:使用管道
class Pipeline:
    """函数管道"""
    def __init__(self, value):
        self.value = value

    def pipe(self, func):
        """应用函数并返回新的Pipeline"""
        return Pipeline(func(self.value))

    def get(self):
        """获取最终值"""
        return self.value

# 使用管道
result = (Pipeline("Hello, World!")
         .pipe(str.lower)
         .pipe(lambda s: s.replace(' ', ''))
         .pipe(lambda s: s.replace('!', ''))
         .get())
print(result)  # helloworld

实战:数据转换管道

# 处理用户数据
users = [
    {'name': '  ALICE  ', 'age': '25', 'email': 'ALICE@EXAMPLE.COM'},
    {'name': 'bob', 'age': '30', 'email': 'bob@example.com  '},
    {'name': '  Charlie  ', 'age': '22', 'email': '  charlie@EXAMPLE.com'}
]

# 定义转换函数
def clean_name(user):
    user['name'] = user['name'].strip().capitalize()
    return user

def convert_age(user):
    user['age'] = int(user['age'])
    return user

def normalize_email(user):
    user['email'] = user['email'].strip().lower()
    return user

# 组合所有转换
from functools import reduce

def process_user(user):
    transformations = [clean_name, convert_age, normalize_email]
    return reduce(lambda u, transform: transform(u), transformations, user)

# 应用到所有用户
processed = list(map(process_user, users))
for user in processed:
    print(user)
# {'name': 'Alice', 'age': 25, 'email': 'alice@example.com'}
# {'name': 'Bob', 'age': 30, 'email': 'bob@example.com'}
# {'name': 'Charlie', 'age': 22, 'email': 'charlie@example.com'}

# 6.5、纯函数与副作用

纯函数是函数式编程的核心概念。

纯函数的特征:

# 纯函数 - 相同输入总是产生相同输出,无副作用
def add(a, b):
    return a + b

def multiply(a, b):
    return a * b

# 纯函数 - 不修改输入
def append_item_pure(lst, item):
    """返回新列表,不修改原列表"""
    return lst + [item]

original = [1, 2, 3]
new_list = append_item_pure(original, 4)
print(original)  # [1, 2, 3] - 未被修改
print(new_list)  # [1, 2, 3, 4]

# 非纯函数 - 有副作用
def append_item_impure(lst, item):
    """直接修改列表"""
    lst.append(item)
    return lst

original = [1, 2, 3]
new_list = append_item_impure(original, 4)
print(original)  # [1, 2, 3, 4] - 被修改了!
print(new_list)  # [1, 2, 3, 4]

# 非纯函数 - 依赖外部状态
counter = 0

def increment_impure():
    global counter
    counter += 1
    return counter

print(increment_impure())  # 1
print(increment_impure())  # 2 - 相同调用,不同结果!

# 纯函数替代方案
def increment_pure(value):
    return value + 1

counter = 0
counter = increment_pure(counter)  # 1
counter = increment_pure(counter)  # 2

纯函数的优势:

# 1. 可测试性
def calculate_total(items, tax_rate):
    """纯函数 - 易于测试"""
    subtotal = sum(item['price'] * item['quantity'] for item in items)
    return subtotal * (1 + tax_rate)

# 测试简单
items = [
    {'price': 10, 'quantity': 2},
    {'price': 5, 'quantity': 3}
]
assert calculate_total(items, 0.1) == 38.5

# 2. 可组合性
def filter_active(users):
    return [u for u in users if u.get('active', False)]

def sort_by_name(users):
    return sorted(users, key=lambda u: u['name'])

def get_emails(users):
    return [u['email'] for u in users]

# 轻松组合
users = [
    {'name': 'Alice', 'active': True, 'email': 'alice@example.com'},
    {'name': 'Bob', 'active': False, 'email': 'bob@example.com'},
    {'name': 'Charlie', 'active': True, 'email': 'charlie@example.com'}
]

active_emails = get_emails(sort_by_name(filter_active(users)))
print(active_emails)  # ['alice@example.com', 'charlie@example.com']

# 3. 并发安全
from concurrent.futures import ThreadPoolExecutor

def square_pure(x):
    """纯函数 - 线程安全"""
    return x ** 2

numbers = range(100)
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(square_pure, numbers))

避免副作用的技巧:

# 1. 使用copy避免修改原始数据
import copy

def update_user_pure(user, **updates):
    """返回更新后的新用户对象"""
    new_user = copy.deepcopy(user)
    new_user.update(updates)
    return new_user

user = {'name': 'Alice', 'age': 25}
updated = update_user_pure(user, age=26)
print(user)     # {'name': 'Alice', 'age': 25}
print(updated)  # {'name': 'Alice', 'age': 26}

# 2. 使用不可变数据结构
from collections import namedtuple

User = namedtuple('User', ['name', 'age', 'email'])
user = User('Alice', 25, 'alice@example.com')

# 创建新对象而不是修改
updated = user._replace(age=26)
print(user)     # User(name='Alice', age=25, ...)
print(updated)  # User(name='Alice', age=26, ...)

# 3. 使用dataclasses(Python 3.7+)
from dataclasses import dataclass, replace

@dataclass(frozen=True)  # frozen=True使其不可变
class Product:
    name: str
    price: float
    quantity: int

product = Product('Book', 29.99, 10)
# product.price = 19.99  # 错误!不可修改

# 使用replace创建新对象
discounted = replace(product, price=19.99)
print(product)     # Product(name='Book', price=29.99, quantity=10)
print(discounted)  # Product(name='Book', price=19.99, quantity=10)

# 7、类型提示(Type Hints)

作为Java开发者,你习惯了强类型系统。Python从3.5版本开始引入了类型提示,虽然不强制检查,但能让代码更清晰、IDE提示更好。

# 7.1、基本类型注解

# 变量类型注解
name: str = "Alice"
age: int = 25
salary: float = 5000.50
is_active: bool = True

# 函数参数和返回值注解
def greet(name: str, age: int) -> str:
    return f"Hello {name}, you are {age} years old"

def add(a: int, b: int) -> int:
    return a + b

def get_user_info() -> dict:
    return {"name": "Alice", "age": 25}

# 无返回值
def log_message(message: str) -> None:
    print(message)

# Java对比
"""
// Java强制类型
String name = "Alice";
int age = 25;

public String greet(String name, int age) {
    return "Hello " + name;
}
"""

# 7.2、复合类型注解

from typing import List, Dict, Tuple, Set

# 列表类型
numbers: List[int] = [1, 2, 3, 4]
names: List[str] = ["Alice", "Bob"]

# 字典类型
user: Dict[str, int] = {"age": 25, "score": 90}
config: Dict[str, any] = {"host": "localhost", "port": 8080}

# 元组类型
point: Tuple[int, int] = (10, 20)
person: Tuple[str, int, bool] = ("Alice", 25, True)

# 集合类型
unique_ids: Set[int] = {1, 2, 3}

# 嵌套类型
users: List[Dict[str, any]] = [
    {"name": "Alice", "age": 25},
    {"name": "Bob", "age": 30}
]

# 实际应用
def get_user_scores() -> Dict[str, List[int]]:
    """返回用户和他们的分数列表"""
    return {
        "Alice": [85, 90, 88],
        "Bob": [92, 87, 95]
    }

def process_data(items: List[Tuple[str, int]]) -> Dict[str, int]:
    """处理数据列表"""
    return {name: score for name, score in items}

# 7.3、Optional与Union

from typing import Optional, Union

# Optional - 可能为None
def find_user(user_id: int) -> Optional[dict]:
    """返回用户或None"""
    if user_id > 0:
        return {"id": user_id, "name": "Alice"}
    return None

# Optional[X] 等价于 Union[X, None]
def get_config(key: str) -> Optional[str]:
    return None

# Union - 多种可能类型
def process_value(value: Union[int, str, float]) -> str:
    """处理int、str或float类型"""
    return str(value)

# 实际应用
def parse_input(data: Union[str, bytes]) -> str:
    """处理字符串或字节"""
    if isinstance(data, bytes):
        return data.decode('utf-8')
    return data

# Java对比
"""
// Java使用泛型和null
Optional<User> findUser(int id) {
    if (id > 0) {
        return Optional.of(new User(id));
    }
    return Optional.empty();
}
"""

# 7.4、泛型类型

from typing import TypeVar, Generic, List

# 定义类型变量
T = TypeVar('T')

# 泛型函数
def first_element(items: List[T]) -> Optional[T]:
    """返回列表第一个元素"""
    return items[0] if items else None

# 泛型类
class Stack(Generic[T]):
    """通用栈实现"""
    def __init__(self) -> None:
        self._items: List[T] = []

    def push(self, item: T) -> None:
        self._items.append(item)

    def pop(self) -> Optional[T]:
        return self._items.pop() if self._items else None

# 使用
int_stack: Stack[int] = Stack()
int_stack.push(1)
int_stack.push(2)

str_stack: Stack[str] = Stack()
str_stack.push("hello")

# 7.5、typing模块常用类型

from typing import Callable, Any, Sequence, Mapping, Iterable

# Callable - 可调用对象
def apply_func(func: Callable[[int, int], int], a: int, b: int) -> int:
    return func(a, b)

result = apply_func(lambda x, y: x + y, 3, 5)  # 8

# Any - 任意类型
def process_data(data: Any) -> str:
    return str(data)

# Sequence - 序列类型
def sum_values(numbers: Sequence[int]) -> int:
    return sum(numbers)

sum_values([1, 2, 3])  # List
sum_values((1, 2, 3))  # Tuple

# Mapping - 映射类型
def print_config(config: Mapping[str, any]) -> None:
    for key, value in config.items():
        print(f"{key}: {value}")

# Iterable - 可迭代对象
def process_items(items: Iterable[str]) -> List[str]:
    return [item.upper() for item in items]

# 7.6、类型别名

from typing import List, Dict, Tuple

# 定义类型别名
UserId = int
UserName = str
Score = float

# 复杂类型别名
User = Dict[str, any]
UserList = List[User]
Coordinate = Tuple[float, float]
ScoreBoard = Dict[UserName, List[Score]]

# 使用类型别名
def get_user(user_id: UserId) -> User:
    return {"id": user_id, "name": "Alice"}

def calculate_average(scores: ScoreBoard) -> Dict[UserName, Score]:
    return {
        name: sum(score_list) / len(score_list)
        for name, score_list in scores.items()
    }

# 7.7、mypy静态类型检查

# 安装mypy: pip install mypy

# 示例代码 example.py
def add_numbers(a: int, b: int) -> int:
    return a + b

result: str = add_numbers(1, 2)  # 类型错误!

# 运行检查
# mypy example.py
# error: Incompatible types in assignment (expression has type "int", variable has type "str")

# 配置mypy: mypy.ini
"""
[mypy]
python_version = 3.9
warn_return_any = True
warn_unused_configs = True
disallow_untyped_defs = True
"""

# 实际项目示例
from typing import List, Optional

class UserService:
    """用户服务类"""

    def __init__(self, db_connection: any) -> None:
        self.db = db_connection

    def find_user(self, user_id: int) -> Optional[Dict[str, any]]:
        """查找用户"""
        # 实现...
        return {"id": user_id, "name": "Alice"}

    def get_all_users(self) -> List[Dict[str, any]]:
        """获取所有用户"""
        return [{"id": 1, "name": "Alice"}]

对比总结:

特性 Python类型提示 Java类型系统
强制性 可选,运行时不检查 强制,编译时检查
基本语法 name: str String name
泛型 List[int] List<Integer>
可选类型 Optional[int] Optional<Integer>
联合类型 Union[int, str] 不直接支持
类型别名 UserId = int typedef(C++)或接口
检查工具 mypy, pyright javac内置
IDE支持 VSCode, PyCharm IntelliJ IDEA

# 8、反射与内省

Python的反射能力比Java更强大。作为Java开发者,你会发现Python的反射API更简单直接。

# 8.1、inspect模块

import inspect

# 定义示例类
class User:
    """用户类"""
    def __init__(self, name: str, age: int):
        self.name = name
        self.age = age

    def greet(self):
        return f"Hello, I'm {self.name}"

    @staticmethod
    def create_guest():
        return User("Guest", 0)

# 检查对象类型
print(inspect.isclass(User))          # True
print(inspect.isfunction(User.greet)) # True
print(inspect.ismethod(User().greet)) # True

# 获取源代码
print(inspect.getsource(User))        # 打印User类的源代码
print(inspect.getfile(User))          # 获取文件路径

# 获取函数签名
def example_func(name: str, age: int = 18) -> str:
    return f"{name}: {age}"

sig = inspect.signature(example_func)
print(sig)  # (name: str, age: int = 18) -> str

for param_name, param in sig.parameters.items():
    print(f"{param_name}: {param.annotation}, default={param.default}")

# 获取类成员
members = inspect.getmembers(User)
for name, value in members:
    print(f"{name}: {type(value)}")

# 8.2、动态属性操作

class Config:
    host = "localhost"
    port = 8080

# getattr - 获取属性
print(getattr(Config, 'host'))  # localhost
print(getattr(Config, 'timeout', 30))  # 30 (默认值)

# setattr - 设置属性
setattr(Config, 'host', '192.168.1.1')
print(Config.host)  # 192.168.1.1

# hasattr - 检查属性是否存在
print(hasattr(Config, 'host'))    # True
print(hasattr(Config, 'timeout')) # False

# delattr - 删除属性
delattr(Config, 'port')
print(hasattr(Config, 'port'))  # False

# 实际应用:动态配置加载
config_data = {
    'database_host': 'localhost',
    'database_port': 3306,
    'cache_enabled': True
}

class AppConfig:
    pass

for key, value in config_data.items():
    setattr(AppConfig, key, value)

print(AppConfig.database_host)  # localhost

# 8.3、动态导入模块

import importlib

# 动态导入模块
math_module = importlib.import_module('math')
print(math_module.sqrt(16))  # 4.0

# 动态导入并获取属性
module = importlib.import_module('json')
dumps_func = getattr(module, 'dumps')
print(dumps_func({'name': 'Alice'}))  # {"name": "Alice"}

# __import__() 函数
os_module = __import__('os')
print(os_module.getcwd())

# 实际应用:插件系统
def load_plugin(plugin_name: str):
    """动态加载插件"""
    try:
        module = importlib.import_module(f'plugins.{plugin_name}')
        if hasattr(module, 'Plugin'):
            return module.Plugin()
        else:
            raise AttributeError(f"Plugin class not found in {plugin_name}")
    except ImportError as e:
        print(f"Failed to load plugin: {e}")
        return None

# 8.4、类型检查

# type() vs isinstance()
value = [1, 2, 3]
print(type(value))  # <class 'list'>
print(type(value) == list)  # True

# isinstance() - 推荐使用,支持继承
print(isinstance(value, list))  # True
print(isinstance(value, (list, tuple)))  # True - 检查多个类型

# issubclass() - 判断继承关系
class Animal:
    pass

class Dog(Animal):
    pass

print(issubclass(Dog, Animal))  # True
print(issubclass(Dog, object))  # True - 所有类都继承自object

# 鸭子类型应用
def process_iterable(obj):
    """处理任何可迭代对象"""
    if hasattr(obj, '__iter__'):
        for item in obj:
            print(item)
    else:
        print("Not iterable")

process_iterable([1, 2, 3])      # 处理列表
process_iterable("hello")        # 处理字符串
process_iterable(range(5))       # 处理range对象

# 8.5、动态创建类

# 使用type()创建类
# type(name, bases, dict)
User = type('User', (object,), {
    'name': 'Alice',
    'greet': lambda self: f"Hello, {self.name}"
})

user = User()
print(user.name)     # Alice
print(user.greet())  # Hello, Alice

# 更复杂的示例
def __init__(self, name, age):
    self.name = name
    self.age = age

def get_info(self):
    return f"{self.name}, {self.age} years old"

Person = type('Person', (object,), {
    '__init__': __init__,
    'get_info': get_info
})

p = Person("Bob", 25)
print(p.get_info())  # Bob, 25 years old

# 实际应用:ORM模型动态生成
def create_model(table_name: str, fields: dict):
    """动态创建数据库模型"""
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)

    def __repr__(self):
        field_strs = [f"{k}={getattr(self, k)}" for k in fields.keys()]
        return f"{table_name}({', '.join(field_strs)})"

    attrs = {
        '__init__': __init__,
        '__repr__': __repr__,
        '_table_name': table_name,
        '_fields': fields
    }

    return type(table_name, (object,), attrs)

# 创建User模型
UserModel = create_model('users', {
    'id': 'INTEGER',
    'name': 'VARCHAR(100)',
    'email': 'VARCHAR(100)'
})

user = UserModel(id=1, name='Alice', email='alice@example.com')
print(user)  # users(id=1, name=Alice, email=alice@example.com)

# 8.6、元类(Metaclass)基础

# 元类是创建类的类
class Meta(type):
    """自定义元类"""
    def __new__(mcs, name, bases, attrs):
        # 在类创建时执行
        print(f"Creating class: {name}")
        # 添加类属性
        attrs['created_by'] = 'Meta'
        return super().__new__(mcs, name, bases, attrs)

# 使用元类
class MyClass(metaclass=Meta):
    pass

# 输出: Creating class: MyClass
print(MyClass.created_by)  # Meta

# 实际应用:自动注册类
_registry = {}

class RegisterMeta(type):
    """自动注册元类"""
    def __new__(mcs, name, bases, attrs):
        cls = super().__new__(mcs, name, bases, attrs)
        if name != 'Base':  # 跳过基类
            _registry[name] = cls
        return cls

class Base(metaclass=RegisterMeta):
    pass

class PluginA(Base):
    pass

class PluginB(Base):
    pass

print(_registry)  # {'PluginA': <class '__main__.PluginA'>, 'PluginB': <class '__main__.PluginB'>}

# 8.7、实战案例

案例1:简易依赖注入

class Container:
    """依赖注入容器"""
    def __init__(self):
        self._services = {}

    def register(self, interface, implementation):
        """注册服务"""
        self._services[interface] = implementation

    def resolve(self, interface):
        """解析服务"""
        implementation = self._services.get(interface)
        if implementation is None:
            raise ValueError(f"Service {interface} not registered")

        # 检查是否需要依赖注入
        sig = inspect.signature(implementation.__init__)
        dependencies = {}

        for param_name, param in sig.parameters.items():
            if param_name == 'self':
                continue
            if param.annotation != inspect.Parameter.empty:
                dependencies[param_name] = self.resolve(param.annotation)

        return implementation(**dependencies)

# 使用示例
class Database:
    def query(self, sql):
        return f"Executing: {sql}"

class UserRepository:
    def __init__(self, db: Database):
        self.db = db

    def find_all(self):
        return self.db.query("SELECT * FROM users")

# 注册服务
container = Container()
container.register(Database, Database)
container.register(UserRepository, UserRepository)

# 解析服务(自动注入依赖)
repo = container.resolve(UserRepository)
print(repo.find_all())  # Executing: SELECT * FROM users

案例2:序列化/反序列化

import json
from typing import get_type_hints

class Serializable:
    """可序列化基类"""

    def to_dict(self):
        """转换为字典"""
        result = {}
        for key, value in self.__dict__.items():
            if isinstance(value, Serializable):
                result[key] = value.to_dict()
            else:
                result[key] = value
        return result

    @classmethod
    def from_dict(cls, data: dict):
        """从字典创建对象"""
        hints = get_type_hints(cls.__init__)
        kwargs = {}

        for key, value in data.items():
            if key in hints:
                hint = hints[key]
                # 如果是Serializable子类,递归创建
                if isinstance(hint, type) and issubclass(hint, Serializable):
                    kwargs[key] = hint.from_dict(value)
                else:
                    kwargs[key] = value
            else:
                kwargs[key] = value

        return cls(**kwargs)

class Address(Serializable):
    def __init__(self, city: str, street: str):
        self.city = city
        self.street = street

class User(Serializable):
    def __init__(self, name: str, age: int, address: Address):
        self.name = name
        self.age = age
        self.address = address

# 使用
user = User("Alice", 25, Address("Beijing", "Main St"))
user_dict = user.to_dict()
print(json.dumps(user_dict, indent=2))

# 反序列化
restored = User.from_dict(user_dict)
print(f"{restored.name} lives in {restored.address.city}")

对比总结:

特性 Python Java
反射API inspect, getattr, setattr java.lang.reflect
获取类信息 inspect.getmembers() Class.getDeclaredMethods()
动态调用 getattr(obj, 'method')() method.invoke(obj)
动态创建类 type(name, bases, dict) Proxy.newProxyInstance()
类型检查 isinstance(), issubclass() instanceof, isAssignableFrom()
元类 class Meta(type) 不支持(使用注解处理器)
灵活性 极高 中等
性能 较慢 较快

# 二、Python标准库

Python的标准库非常丰富,开箱即用。作为Java开发者,你会发现很多功能在Python标准库中已经实现,无需引入第三方依赖。

# 1、常用内置模块

# 1.1、os与sys模块

os模块 - 操作系统交互

import os

# 获取当前工作目录
print(os.getcwd())  # D:\workspace\project

# 改变工作目录
os.chdir('/tmp')

# 列出目录内容
files = os.listdir('.')
print(files)

# 创建目录
os.mkdir('new_folder')
os.makedirs('path/to/folder', exist_ok=True)  # 递归创建

# 删除文件和目录
os.remove('file.txt')  # 删除文件
os.rmdir('folder')     # 删除空目录
import shutil
shutil.rmtree('folder')  # 删除非空目录

# 文件和目录判断
print(os.path.exists('file.txt'))  # 是否存在
print(os.path.isfile('file.txt'))  # 是否是文件
print(os.path.isdir('folder'))     # 是否是目录

# 路径操作
full_path = os.path.join('path', 'to', 'file.txt')  # path/to/file.txt
dirname = os.path.dirname('/path/to/file.txt')      # /path/to
basename = os.path.basename('/path/to/file.txt')    # file.txt
name, ext = os.path.splitext('file.txt')            # ('file', '.txt')

# 环境变量
print(os.environ.get('PATH'))
os.environ['MY_VAR'] = 'value'

# 执行系统命令
os.system('ls -l')  # 不推荐,使用subprocess模块

sys模块 - Python解释器交互

import sys

# 命令行参数
print(sys.argv)  # ['script.py', 'arg1', 'arg2']

# Python版本信息
print(sys.version)
print(sys.version_info)  # sys.version_info(major=3, minor=9, ...)

# 模块搜索路径
print(sys.path)
sys.path.append('/custom/path')

# 退出程序
sys.exit(0)  # 正常退出
sys.exit(1)  # 异常退出

# 标准输入输出
sys.stdout.write("Hello\n")
line = sys.stdin.readline()

# 获取对象大小
numbers = [1, 2, 3, 4, 5]
print(sys.getsizeof(numbers))  # 字节数

Java对比:

// Java获取当前目录
String currentDir = System.getProperty("user.dir");

// Java环境变量
String path = System.getenv("PATH");

// Java命令行参数
public static void main(String[] args) {
    // args数组
}

# 1.2、datetime模块

from datetime import datetime, date, time, timedelta

# 获取当前时间
now = datetime.now()
print(now)  # 2025-01-26 15:30:45.123456

today = date.today()
print(today)  # 2025-01-26

# 创建日期时间
dt = datetime(2025, 1, 26, 15, 30, 45)
d = date(2025, 1, 26)
t = time(15, 30, 45)

# 格式化输出
print(now.strftime('%Y-%m-%d %H:%M:%S'))  # 2025-01-26 15:30:45
print(now.strftime('%Y年%m月%d日'))        # 2025年01月26日

# 解析字符串
dt = datetime.strptime('2025-01-26 15:30:45', '%Y-%m-%d %H:%M:%S')

# 日期运算
tomorrow = today + timedelta(days=1)
next_week = today + timedelta(weeks=1)
one_hour_later = now + timedelta(hours=1)

# 时间差
diff = datetime(2025, 12, 31) - now
print(diff.days)  # 剩余天数
print(diff.total_seconds())  # 总秒数

# 时间戳
timestamp = now.timestamp()  # 转为时间戳
dt = datetime.fromtimestamp(timestamp)  # 从时间戳创建

# Java对比
"""
// Java 8+
LocalDateTime now = LocalDateTime.now();
LocalDate today = LocalDate.now();

// 格式化
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String formatted = now.format(formatter);

// 日期运算
LocalDate tomorrow = today.plusDays(1);
"""

# 1.3、json模块

import json

# Python对象转JSON字符串
data = {
    'name': 'Alice',
    'age': 25,
    'skills': ['Python', 'Java'],
    'active': True
}

json_str = json.dumps(data)
print(json_str)  # {"name": "Alice", "age": 25, ...}

# 美化输出
json_str = json.dumps(data, indent=2, ensure_ascii=False)
print(json_str)

# JSON字符串转Python对象
data = json.loads(json_str)
print(data['name'])  # Alice

# 读写JSON文件
# 写入
with open('data.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, indent=2, ensure_ascii=False)

# 读取
with open('data.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

# 自定义JSON编码
from datetime import datetime

class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return super().default(obj)

data = {'created': datetime.now()}
json_str = json.dumps(data, cls=DateTimeEncoder)

# Java对比
"""
// Java使用Jackson或Gson
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(data);
Data data = mapper.readValue(json, Data.class);
"""

# 1.4、re模块(正则表达式)

import re

# 基本匹配
text = "My email is alice@example.com"
match = re.search(r'\w+@\w+\.\w+', text)
if match:
    print(match.group())  # alice@example.com

# 查找所有匹配
text = "Phone: 123-456-7890, Mobile: 098-765-4321"
phones = re.findall(r'\d{3}-\d{3}-\d{4}', text)
print(phones)  # ['123-456-7890', '098-765-4321']

# 替换
text = "Hello World"
result = re.sub(r'World', 'Python', text)
print(result)  # Hello Python

# 分割
text = "apple,banana;orange:grape"
fruits = re.split(r'[,;:]', text)
print(fruits)  # ['apple', 'banana', 'orange', 'grape']

# 编译正则(提高性能)
pattern = re.compile(r'\d+')
numbers = pattern.findall("I have 3 apples and 5 oranges")
print(numbers)  # ['3', '5']

# 捕获组
text = "Name: Alice, Age: 25"
match = re.search(r'Name: (\w+), Age: (\d+)', text)
if match:
    print(match.group(1))  # Alice
    print(match.group(2))  # 25
    print(match.groups())  # ('Alice', '25')

# 常用正则表达式
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
url_pattern = r'^https?://(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b'
phone_pattern = r'^\d{3}-\d{3}-\d{4}$'

# 1.5、math与random模块

import math
import random

# math模块
print(math.pi)      # 3.141592653589793
print(math.e)       # 2.718281828459045

print(math.sqrt(16))    # 4.0
print(math.pow(2, 3))   # 8.0
print(math.ceil(4.3))   # 5
print(math.floor(4.7))  # 4
print(math.fabs(-5))    # 5.0

# 三角函数
print(math.sin(math.pi / 2))  # 1.0
print(math.cos(0))             # 1.0

# random模块
print(random.random())  # 0.0到1.0的随机浮点数

# 随机整数
print(random.randint(1, 10))  # 1到10的随机整数(包含10)
print(random.randrange(1, 10))  # 1到9的随机整数(不含10)

# 随机选择
colors = ['red', 'green', 'blue']
print(random.choice(colors))  # 随机选择一个

# 随机多个(有放回)
print(random.choices(colors, k=3))  # ['red', 'blue', 'red']

# 随机多个(无放回)
print(random.sample(colors, k=2))  # ['green', 'red']

# 打乱列表
numbers = [1, 2, 3, 4, 5]
random.shuffle(numbers)
print(numbers)  # [3, 1, 5, 2, 4]

# 设置随机种子(可复现)
random.seed(42)
print(random.random())  # 相同种子产生相同序列

# 1.6、collections模块

from collections import Counter, defaultdict, deque, namedtuple, OrderedDict

# Counter - 计数器
words = ['apple', 'banana', 'apple', 'orange', 'banana', 'apple']
counter = Counter(words)
print(counter)  # Counter({'apple': 3, 'banana': 2, 'orange': 1})
print(counter.most_common(2))  # [('apple', 3), ('banana', 2)]

# defaultdict - 默认值字典
dd = defaultdict(list)
dd['fruits'].append('apple')  # 不需要检查key是否存在
print(dd)  # defaultdict(<class 'list'>, {'fruits': ['apple']})

# 按类别分组
data = [('fruit', 'apple'), ('veg', 'carrot'), ('fruit', 'banana')]
grouped = defaultdict(list)
for category, item in data:
    grouped[category].append(item)
print(dict(grouped))  # {'fruit': ['apple', 'banana'], 'veg': ['carrot']}

# deque - 双端队列
dq = deque([1, 2, 3])
dq.append(4)      # 右端添加
dq.appendleft(0)  # 左端添加
dq.pop()          # 右端移除
dq.popleft()      # 左端移除
print(dq)  # deque([1, 2, 3])

# 限制长度的deque(用作循环缓冲区)
buffer = deque(maxlen=3)
for i in range(5):
    buffer.append(i)
print(buffer)  # deque([2, 3, 4], maxlen=3)

# namedtuple - 命名元组
Point = namedtuple('Point', ['x', 'y'])
p = Point(10, 20)
print(p.x, p.y)  # 10 20
print(p[0], p[1])  # 也可以用索引

# OrderedDict - 有序字典(Python 3.7+普通dict也保序)
od = OrderedDict()
od['a'] = 1
od['b'] = 2
od['c'] = 3
print(list(od.keys()))  # ['a', 'b', 'c']

# Java对比
"""
// Java Counter类似
Map<String, Integer> counter = new HashMap<>();
// 需要手动计数

// Java deque
Deque<Integer> deque = new ArrayDeque<>();
deque.addFirst(1);
deque.addLast(2);
"""

# 1.7、pathlib模块

from pathlib import Path

# 创建路径对象
p = Path('/usr/local/bin')
p = Path.home()  # 用户主目录
p = Path.cwd()   # 当前工作目录

# 路径拼接
config_path = Path.home() / '.config' / 'app' / 'settings.json'
print(config_path)  # /home/user/.config/app/settings.json

# 路径属性
p = Path('/path/to/file.txt')
print(p.name)       # file.txt
print(p.stem)       # file
print(p.suffix)     # .txt
print(p.parent)     # /path/to
print(p.parts)      # ('/', 'path', 'to', 'file.txt')

# 文件操作
p = Path('test.txt')
p.write_text('Hello World', encoding='utf-8')  # 写入
content = p.read_text(encoding='utf-8')        # 读取
p.unlink()  # 删除文件

# 目录操作
dir_path = Path('new_folder')
dir_path.mkdir(exist_ok=True)  # 创建目录
dir_path.mkdir(parents=True, exist_ok=True)  # 递归创建

# 判断
p = Path('file.txt')
print(p.exists())   # 是否存在
print(p.is_file())  # 是否是文件
print(p.is_dir())   # 是否是目录

# 遍历目录
for file in Path('.').iterdir():
    print(file)

# 递归查找
for py_file in Path('.').rglob('*.py'):
    print(py_file)

# 对比os.path
"""
# 旧方式
import os
path = os.path.join(os.path.expanduser('~'), '.config', 'app')

# 新方式(pathlib)
path = Path.home() / '.config' / 'app'
"""

模块对比总结:

功能 Python模块 Java对应
操作系统交互 os System, Runtime
文件路径 pathlib, os.path java.nio.file.Path
日期时间 datetime java.time.*
JSON处理 json Jackson, Gson
正则表达式 re java.util.regex.Pattern
随机数 random java.util.Random
高级集合 collections java.util.*

# 2、数据处理

# 2.1、csv模块

import csv

# 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as f:
    reader = csv.reader(f)
    headers = next(reader)  # 读取表头
    for row in reader:
        print(row)  # ['Alice', '25', 'Beijing']

# 使用DictReader(推荐)
with open('data.csv', 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row['name'], row['age'])  # 按列名访问

# 写入CSV文件
data = [
    ['name', 'age', 'city'],
    ['Alice', 25, 'Beijing'],
    ['Bob', 30, 'Shanghai']
]

with open('output.csv', 'w', encoding='utf-8', newline='') as f:
    writer = csv.writer(f)
    writer.writerows(data)

# 使用DictWriter
with open('output.csv', 'w', encoding='utf-8', newline='') as f:
    fieldnames = ['name', 'age', 'city']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerow({'name': 'Alice', 'age': 25, 'city': 'Beijing'})

# 2.2、XML处理

import xml.etree.ElementTree as ET

# 解析XML
xml_str = '''
<users>
    <user id="1">
        <name>Alice</name>
        <age>25</age>
    </user>
    <user id="2">
        <name>Bob</name>
        <age>30</age>
    </user>
</users>
'''

root = ET.fromstring(xml_str)

# 遍历元素
for user in root.findall('user'):
    user_id = user.get('id')
    name = user.find('name').text
    age = user.find('age').text
    print(f"ID: {user_id}, Name: {name}, Age: {age}")

# 创建XML
root = ET.Element('users')
user = ET.SubElement(root, 'user', id='1')
ET.SubElement(user, 'name').text = 'Alice'
ET.SubElement(user, 'age').text = '25'

# 保存XML
tree = ET.ElementTree(root)
tree.write('users.xml', encoding='utf-8', xml_declaration=True)

# 2.3、pickle模块(序列化)

import pickle

# Python对象序列化
data = {
    'name': 'Alice',
    'scores': [85, 90, 88],
    'metadata': {'created': '2025-01-26'}
}

# 保存到文件
with open('data.pkl', 'wb') as f:
    pickle.dump(data, f)

# 从文件加载
with open('data.pkl', 'rb') as f:
    loaded_data = pickle.load(f)
    print(loaded_data)

# 序列化为字节串
bytes_data = pickle.dumps(data)
restored = pickle.loads(bytes_data)

# 2.4、struct模块(二进制数据)

import struct

# 打包二进制数据
# 格式: i=int, f=float, s=string
packed = struct.pack('i f 10s', 42, 3.14, b'Hello')
print(packed)  # b'*\x00\x00\x00\xc3\xf5H@Hello\x00\x00\x00\x00\x00'

# 解包二进制数据
unpacked = struct.unpack('i f 10s', packed)
print(unpacked)  # (42, 3.140000104904175, b'Hello\x00\x00\x00\x00\x00')

# 实际应用:读取二进制文件
with open('data.bin', 'rb') as f:
    data = f.read(struct.calcsize('i f'))
    values = struct.unpack('i f', data)

# 3、网络编程

# 3.1、socket模块

import socket

# TCP服务器
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 8080))
server.listen(5)
print("Server listening on port 8080...")

while True:
    client, addr = server.accept()
    print(f"Connection from {addr}")
    data = client.recv(1024)
    client.send(b"Hello from server")
    client.close()

# TCP客户端
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 8080))
client.send(b"Hello server")
response = client.recv(1024)
print(response.decode())
client.close()

# 3.2、urllib模块

from urllib import request, parse

# GET请求
response = request.urlopen('https://api.github.com')
html = response.read().decode('utf-8')
print(html)

# POST请求
data = parse.urlencode({'key': 'value'}).encode()
req = request.Request('https://httpbin.org/post', data=data)
response = request.urlopen(req)
print(response.read().decode())

# 设置请求头
req = request.Request('https://api.github.com')
req.add_header('User-Agent', 'Python App')
response = request.urlopen(req)

# 3.3、http.server

# 命令行启动简单HTTP服务器
# python -m http.server 8000

# 自定义HTTP服务器
from http.server import HTTPServer, BaseHTTPRequestHandler

class MyHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        self.wfile.write(b'<h1>Hello World</h1>')

server = HTTPServer(('localhost', 8000), MyHandler)
server.serve_forever()

# 4、多线程与多进程

# 4.1、threading模块

import threading
import time

# 创建线程
def worker(name):
    print(f"Thread {name} starting")
    time.sleep(2)
    print(f"Thread {name} done")

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

# 等待所有线程完成
for t in threads:
    t.join()

# 线程类
class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"{self.name} is running")

# 线程锁
lock = threading.Lock()

def safe_increment():
    global counter
    with lock:
        counter += 1

# 4.2、multiprocessing模块

from multiprocessing import Process, Pool, Queue

# 创建进程
def worker(name):
    print(f"Process {name} starting")

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

# 进程池
def square(x):
    return x ** 2

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        results = pool.map(square, range(10))
        print(results)

# 4.3、concurrent.futures模块

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def task(n):
    time.sleep(1)
    return n ** 2

# 线程池
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(task, i) for i in range(10)]
    for future in futures:
        print(future.result())

# 进程池
with ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(task, range(10))
    print(list(results))

# 4.4、GIL详解

GIL(全局解释器锁)是CPython的实现细节:

  1. 影响:

    • 同一时刻只有一个线程执行Python字节码
    • 多线程无法利用多核CPU进行CPU密集型任务
    • I/O密集型任务不受影响
  2. 解决方案:

    • CPU密集型: 使用multiprocessing
    • I/O密集型: 使用threading或asyncio
    • 混合型: 根据具体情况选择

Java对比:

  • Java没有GIL,多线程可以真正并行
  • Python的多线程更适合I/O操作
  • Python的多进程类似Java的多线程

# 5、日志与调试

# 5.1、logging模块

import logging

# 基本配置
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    filename='app.log'
)

# 使用日志
logging.debug("Debug message")
logging.info("Info message")
logging.warning("Warning message")
logging.error("Error message")
logging.critical("Critical message")

# 创建logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# 添加处理器
file_handler = logging.FileHandler('app.log')
console_handler = logging.StreamHandler()

# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

logger.addHandler(file_handler)
logger.addHandler(console_handler)

# 使用logger
logger.info("Application started")

# 5.2、pdb调试器

import pdb

def buggy_function(x, y):
    result = x + y
    pdb.set_trace()  # 设置断点
    result = result * 2
    return result

# 调试命令:
# n - 下一行
# s - 进入函数
# c - 继续执行
# p variable - 打印变量
# l - 显示当前代码
# q - 退出调试

# 5.3、traceback模块

import traceback

try:
    result = 1 / 0
except Exception as e:
    # 打印完整堆栈
    traceback.print_exc()

    # 获取堆栈信息
    tb_str = traceback.format_exc()
    print(tb_str)

    # 记录到日志
    logging.error("Error occurred", exc_info=True)

# 三、异步编程

异步编程是Python的重要特性,特别适合处理I/O密集型任务。对于Java开发者来说,Python的异步编程模型比Java的CompletableFuture更加直观和强大。

# 1、asyncio基础

# 1.1、async/await语法

Python 3.5引入了async/await语法,让异步代码看起来像同步代码一样直观。

import asyncio

async def greet(name):
    print(f"开始问候 {name}")
    await asyncio.sleep(1)  # 异步等待1秒
    return f"Hello, {name}!"

async def main():
    result = await greet("张三")
    print(result)

# 运行异步程序
asyncio.run(main())

对比Java的异步编程:

// Java使用CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, 张三!";
});

future.thenAccept(System.out::println);

核心概念:

  • async def:定义协程函数
  • await:暂停当前协程,等待另一个协程完成
  • asyncio.run():启动事件循环并运行主协程

# 1.2、协程概念

协程(Coroutine)是可以暂停和恢复执行的函数,比线程更轻量级。

import asyncio
import time

async def task(name, duration):
    print(f"任务 {name} 开始")
    await asyncio.sleep(duration)
    print(f"任务 {name} 完成")
    return f"{name} 结果"

async def main():
    start_time = time.time()

    # 顺序执行(总时间 = 3秒)
    result1 = await task("A", 1)
    result2 = await task("B", 2)

    end_time = time.time()
    print(f"顺序执行耗时: {end_time - start_time:.2f}秒")

asyncio.run(main())

协程 vs 线程对比:

特性 协程 线程
创建成本 极低(几KB) 较高(几MB)
切换成本 用户态切换 内核态切换
数量限制 可以创建数万个 通常数百个
适用场景 I/O密集型 CPU密集型

# 1.3、事件循环

事件循环是异步编程的核心,负责调度和执行协程。

import asyncio

async def worker(name):
    print(f"{name} 开始工作")
    await asyncio.sleep(1)
    print(f"{name} 完成工作")

async def main():
    # 创建任务
    task1 = asyncio.create_task(worker("任务1"))
    task2 = asyncio.create_task(worker("任务2"))

    # 等待所有任务完成
    await task1
    await task2
    print("所有任务完成")

# 使用asyncio.run()
asyncio.run(main())

# 手动控制事件循环(高级用法)
loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

asyncio.run() vs await的区别:

  • asyncio.run():启动新的事件循环,通常用于程序入口
  • await:在现有事件循环中等待协程完成
import asyncio

async def nested():
    return "嵌套协程结果"

async def outer():
    # ✅ 正确:使用await
    result = await nested()
    print(result)

    # ❌ 错误:不能在协程中使用asyncio.run()
    # asyncio.run(nested())  # RuntimeError: no running event loop

asyncio.run(outer())

# 1.4、Task与Future

Task是Future的子类,用于包装协程并调度执行。

import asyncio

async def background_task(name, delay):
    print(f"后台任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"后台任务 {name} 完成")
    return f"{name} 的结果"

async def main():
    # 创建多个任务(立即开始执行)
    tasks = [
        asyncio.create_task(background_task("A", 2)),
        asyncio.create_task(background_task("B", 1)),
        asyncio.create_task(background_task("C", 3))
    ]

    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    print("所有任务结果:", results)

    # 带超时的等待
    try:
        await asyncio.wait_for(asyncio.create_task(background_task("D", 5)), timeout=3)
    except asyncio.TimeoutError:
        print("任务D超时")

asyncio.run(main())

Task的高级用法:

import asyncio

async def cancellable_task():
    try:
        for i in range(10):
            print(f"工作中... {i}")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("任务被取消")
        # 清理资源
        await asyncio.sleep(0.5)
        print("清理完成")
        raise

async def main():
    task = asyncio.create_task(cancellable_task())

    # 3秒后取消任务
    await asyncio.sleep(3)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("主协程捕获到取消异常")

asyncio.run(main())

# 2、异步I/O

# 2.1、异步文件操作

Python 3.4+提供了异步文件操作API。

import asyncio
import aiofiles  # 第三方库,功能更完整

async def read_file_async(filename):
    # 使用aiofiles(推荐)
    async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
        content = await f.read()
        print(f"文件内容长度: {len(content)}")
        return content

async def write_file_async(filename, content):
    async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
        await f.write(content)
        print(f"写入文件: {filename}")

async def process_files():
    # 并发处理多个文件
    files = ['file1.txt', 'file2.txt', 'file3.txt']

    tasks = []
    for filename in files:
        content = f"这是 {filename} 的内容\n"
        tasks.append(write_file_async(filename, content))

    await asyncio.gather(*tasks)

    # 并发读取文件
    read_tasks = [read_file_async(f) for f in files]
    contents = await asyncio.gather(*read_tasks)

    print(f"读取了 {len(contents)} 个文件")

# 安装aiofiles: pip install aiofiles
# asyncio.run(process_files())

对比Java的异步文件操作:

// Java NIO.2异步文件操作
AsynchronousFileChannel channel = AsynchronousFileChannel.open(
    Paths.get("file.txt"), StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> readResult = channel.read(buffer, 0);

// 处理结果
while (!readResult.isDone()) {
    // 可以做其他工作
}

# 2.2、异步网络请求

使用aiohttp库进行异步HTTP请求。

import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    try:
        async with session.get(url) as response:
            print(f"获取 {url} - 状态: {response.status}")
            content = await response.text()
            return {
                'url': url,
                'status': response.status,
                'length': len(content),
                'content': content[:100]  # 只返回前100个字符
            }
    except Exception as e:
        print(f"请求 {url} 失败: {e}")
        return {'url': url, 'error': str(e)}

async def fetch_multiple_urls(urls):
    # 创建会话(复用连接)
    async with aiohttp.ClientSession() as session:
        # 并发请求多个URL
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 处理结果
        success_count = 0
        for result in results:
            if isinstance(result, dict) and 'error' not in result:
                success_count += 1
                print(f"✅ {result['url']}: {result['status']} ({result['length']} 字符)")
            else:
                print(f"❌ 失败: {result}")

        print(f"成功请求: {success_count}/{len(urls)}")
        return results

async def main():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/status/200',
        'https://httpbin.org/status/404'
    ]

    start_time = time.time()
    await fetch_multiple_urls(urls)
    end_time = time.time()

    print(f"总耗时: {end_time - start_time:.2f}秒")

# 安装aiohttp: pip install aiohttp
# asyncio.run(main())

异步HTTP客户端的最佳实践:

import asyncio
import aiohttp
from aiohttp import ClientTimeout, ClientSession

class AsyncHttpClient:
    def __init__(self, timeout=30, max_connections=100):
        self.timeout = ClientTimeout(total=timeout)
        self.connector = aiohttp.TCPConnector(limit=max_connections)

    async def __aenter__(self):
        self.session = ClientSession(
            timeout=self.timeout,
            connector=self.connector
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()

    async def get(self, url, **kwargs):
        async with self.session.get(url, **kwargs) as response:
            return await response.json()

    async def post(self, url, data=None, json=None, **kwargs):
        async with self.session.post(url, data=data, json=json, **kwargs) as response:
            return await response.json()

async def api_client_example():
    async with AsyncHttpClient() as client:
        # GET请求
        data = await client.get('https://api.github.com/users/python')
        print(f"Python GitHub followers: {data['followers']}")

        # POST请求
        result = await client.post('https://httpbin.org/post', json={'key': 'value'})
        print(f"POST响应: {result['json']}")

# asyncio.run(api_client_example())

# 3、异步编程最佳实践

# 3.1、何时使用异步

适合异步的场景:

  • I/O密集型操作(网络请求、数据库查询、文件读写)
  • 需要并发处理大量连接
  • 实时性要求高的应用
  • WebSocket、聊天服务器、API网关
# ✅ 适合异步:网络爬虫
import asyncio
import aiohttp

async def crawl_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

# ✅ 适合异步:WebSocket聊天服务器
async def handle_chat_messages(websocket):
    async for message in websocket:
        await broadcast_to_other_clients(message)

# ❌ 不适合异步:CPU密集型计算
async def heavy_computation():
    # 这会阻塞事件循环
    result = sum(i * i for i in range(10000000))
    return result

不适合异步的场景:

  • CPU密集型计算(数值计算、图像处理)
  • 简单的脚本程序
  • 同步第三方库的包装

# 3.2、异步与多线程的选择

异步 vs 多线程对比:

维度 异步 多线程
并发模型 单线程事件循环 多线程抢占式
内存占用 低(单线程) 高(每个线程栈空间)
上下文切换 快(用户态) 慢(内核态)
编程复杂度 高(需要异步思维) 中等
调试难度 较高 中等
适用场景 I/O密集型 CPU密集型或阻塞操作

混合使用示例:

import asyncio
import concurrent.futures
import time

def cpu_intensive_task(n):
    """CPU密集型任务 - 在线程池中执行"""
    return sum(i * i for i in range(n))

async def async_io_task():
    """I/O密集型任务 - 异步执行"""
    await asyncio.sleep(1)
    return "I/O任务完成"

async def mixed_workload():
    start_time = time.time()

    # 创建线程池执行器
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        # 并发执行不同类型的任务
        tasks = [
            # CPU密集型任务在线程池中
            loop.run_in_executor(executor, cpu_intensive_task, 1000000),
            loop.run_in_executor(executor, cpu_intensive_task, 2000000),

            # I/O密集型任务异步执行
            async_io_task(),
            async_io_task(),
        ]

        results = await asyncio.gather(*tasks)

    end_time = time.time()
    print(f"混合任务完成,耗时: {end_time - start_time:.2f}秒")
    print(f"结果: {results}")

asyncio.run(mixed_workload())

# 3.3、常见陷阱

陷阱1:在协程中调用阻塞函数

import asyncio
import time

# ❌ 错误:阻塞事件循环
async def bad_example():
    time.sleep(2)  # 阻塞2秒,整个事件循环被阻塞
    return "完成"

# ✅ 正确:使用异步版本
async def good_example():
    await asyncio.sleep(2)  # 非阻塞等待
    return "完成"

# ✅ 或者在线程池中执行阻塞操作
async def blocking_in_thread():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        result = await loop.run_in_executor(executor, time.sleep, 2)
    return "完成"

陷阱2:忘记await

import asyncio

async def forget_await():
    # ❌ 错误:忘记await,协程不会执行
    asyncio.sleep(1)
    print("这可能不会按预期执行")

async def correct_usage():
    # ✅ 正确:使用await
    await asyncio.sleep(1)
    print("1秒后执行")

陷阱3:异常处理不当

import asyncio

async def task_with_error():
    await asyncio.sleep(0.1)
    raise ValueError("任务失败")

async def bad_error_handling():
    # ❌ 错误:异常被忽略
    asyncio.create_task(task_with_error())

async def good_error_handling():
    # ✅ 正确:处理异常
    task = asyncio.create_task(task_with_error())
    try:
        await task
    except ValueError as e:
        print(f"捕获到异常: {e}")

# ✅ 使用gather处理异常
async def gather_with_exceptions():
    tasks = [
        asyncio.create_task(task_with_error()),
        asyncio.create_task(asyncio.sleep(1))
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
        else:
            print(f"任务 {i} 成功: {result}")

陷阱4:过度使用异步

# ❌ 过度设计:简单的同步任务不需要异步
async def over_engineered():
    result = await async_add(2, 3)
    return result

# ✅ 简单直接
def simple():
    return 2 + 3

最佳实践总结:

  1. 保持异步函数纯净:异步函数中只调用异步函数
  2. 正确处理异常:使用try/except包装可能失败的操作
  3. 避免阻塞操作:将阻塞操作放到线程池中
  4. 合理并发数:控制同时进行的任务数量
  5. 使用连接池:复用数据库和HTTP连接
  6. 性能监控:使用工具监控异步程序性能
import asyncio
import aiohttp
from asyncio import Semaphore

async def rate_limited_fetch(session, url, semaphore):
    """带速率限制的请求"""
    async with semaphore:  # 限制并发数
        async with session.get(url) as response:
            return await response.text()

async def robust_fetch_all(urls, max_concurrent=10):
    """健壮的批量请求"""
    semaphore = Semaphore(max_concurrent)

    async with aiohttp.ClientSession() as session:
        tasks = [
            rate_limited_fetch(session, url, semaphore)
            for url in urls
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        success_count = sum(1 for r in results if not isinstance(r, Exception))
        print(f"成功: {success_count}/{len(urls)}")

        return results

📖 系列文章导航

👈 上一篇:Java开发者转战Python:基础篇 - Python基础语法、数据结构、面向对象编程

👉 下一篇:Java开发者转战Python:热门库与质量管理 - 探索Python生态系统和工程实践

祝你变得更强!

编辑 (opens new window)
#Python
上次更新: 2025/11/03
Java开发者转战Python:基础篇
Java开发者转战Python:热门库与质量管理

← Java开发者转战Python:基础篇 Java开发者转战Python:热门库与质量管理→

最近更新
01
AI编程时代的一些心得
09-11
02
Claude Code与Codex的协同工作
09-01
03
Claude Code实战之供应商切换工具
08-18
更多文章>
Theme by Vdoing | Copyright © 2018-2025 京ICP备2021021832号-2 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式