目录
模块简介
os模块是Python标准库的核心模块之一,提供了与操作系统交互的接口。它允许你在Python程序中执行文件和目录操作、进程管理、环境变量访问等系统级操作。
导入方式
import os
# 或导入特定功能
from os import listdir, getcwd路径操作 (os.path)
这是os模块中最常用的子模块,用于跨平台路径处理。
基本路径操作
import os
# 当前工作目录
current_dir = os.getcwd() # 获取当前工作目录
os.chdir('/new/path') # 改变工作目录
# 路径拼接
path = os.path.join('dir1', 'dir2', 'file.txt') # 自动处理分隔符
# Windows: dir1\dir2\file.txt
# Linux/macOS: dir1/dir2/file.txt
# 路径拆分
directory, filename = os.path.split('/home/user/file.txt')
# directory = '/home/user', filename = 'file.txt'
basename = os.path.basename('/home/user/file.txt') # 'file.txt'
dirname = os.path.dirname('/home/user/file.txt') # '/home/user'
filename, ext = os.path.splitext('data.csv') # ('data', '.csv')路径检查和规范化
# 获取绝对路径
abs_path = os.path.abspath('file.txt')
# 规范化路径(清理..和.)
norm_path = os.path.normpath('/home/user/../user/file.txt')
# 结果: /home/user/file.txt
# 相对路径
rel_path = os.path.relpath('/home/user/file.txt', '/home')
# 结果: user/file.txt
# 检查路径是否存在
exists = os.path.exists('/some/path')
is_file = os.path.isfile('/path/to/file')
is_dir = os.path.isdir('/path/to/dir')
is_abs = os.path.isabs('/absolute/path')
is_link = os.path.islink('/path/to/link')路径信息获取
# 文件大小(字节)
size = os.path.getsize('file.txt')
# 文件时间戳
mtime = os.path.getmtime('file.txt') # 最后修改时间
atime = os.path.getatime('file.txt') # 最后访问时间
ctime = os.path.getctime('file.txt') # 创建时间/元数据修改时间
# 转换为可读格式
from datetime import datetime
dt = datetime.fromtimestamp(mtime)
print(f"最后修改: {dt.strftime('%Y-%m-%d %H:%M:%S')}")
# 获取真实路径(解析符号链接)
real_path = os.path.realpath('/path/to/link')文件和目录操作
目录操作
# 列出目录内容
files = os.listdir('.') # 当前目录下所有文件和目录
for item in os.listdir('.'):
if os.path.isfile(item):
print(f"文件: {item}")
elif os.path.isdir(item):
print(f"目录: {item}")
# 递归遍历目录树
for root, dirs, files in os.walk('.'):
print(f"当前目录: {root}")
print(f"子目录: {dirs}")
print(f"文件: {files}")
print("-" * 40)
# 创建和删除目录
os.mkdir('new_dir') # 创建单级目录
os.makedirs('dir1/dir2/dir3') # 创建多级目录(已存在不会报错)
os.makedirs('dir1/dir2/dir3', exist_ok=True) # 存在时不报错
os.rmdir('empty_dir') # 删除空目录
# 删除非空目录(递归删除)
import shutil
shutil.rmtree('dir_with_content')文件操作
# 创建文件
with open('new_file.txt', 'w') as f:
f.write('Hello, World!')
# 文件重命名/移动
os.rename('old_name.txt', 'new_name.txt')
os.replace('source.txt', 'dest.txt') # 原子操作,覆盖已存在的文件
# 复制文件
import shutil
shutil.copy2('source.txt', 'dest.txt') # 保留元数据
shutil.copy('source.txt', 'dest.txt') # 不保留元数据
# 删除文件
os.remove('file_to_delete.txt')
os.unlink('file_to_delete.txt') # 同remove
# 创建和读取链接
os.symlink('target.txt', 'link.txt') # 创建符号链接
os.link('source.txt', 'hardlink.txt') # 创建硬链接文件权限和属性
# 修改文件权限
os.chmod('file.txt', 0o755) # 八进制表示
# 等价于
os.chmod('file.txt',
os.R_OK | os.W_OK | os.X_OK) # 可读|可写|可执行
# 检查权限
can_read = os.access('file.txt', os.R_OK)
can_write = os.access('file.txt', os.W_OK)
can_exec = os.access('file.txt', os.X_OK)
exists = os.access('file.txt', os.F_OK)
# 修改所有者和组(Unix-like系统)
os.chown('file.txt', uid, gid) # 需要root权限
# 修改时间戳
# 设置最后访问和修改时间
timestamp = 1609459200 # Unix时间戳
os.utime('file.txt', (timestamp, timestamp))
# 设置访问和修改时间为当前时间
os.utime('file.txt', None)进程管理
执行系统命令
# 执行命令并获取退出码
exit_code = os.system('ls -la')
# 执行命令并获取输出
result = os.popen('ls -la').read()
print(result)
# 现代替代方案(Python 3.5+)
import subprocess
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)进程控制
# 获取进程ID
pid = os.getpid() # 当前进程ID
ppid = os.getppid() # 父进程ID
# 进程相关操作
os.kill(pid, signal.SIGTERM) # 发送信号
os.abort() # 中止当前进程
os._exit(0) # 立即退出,不执行清理
# 创建子进程
pid = os.fork() # Unix-like系统专用
if pid == 0:
# 子进程代码
print("我是子进程")
os._exit(0)
else:
# 父进程代码
print(f"子进程ID: {pid}")
os.waitpid(pid, 0) # 等待子进程结束系统信息
# 用户信息
username = os.getlogin() # 当前登录用户名
uid = os.getuid() # 当前用户ID(Unix)
gid = os.getgid() # 当前组ID(Unix)
# 系统标识
import platform
system = platform.system() # 'Linux', 'Windows', 'Darwin'
release = platform.release() # 系统版本
# CPU核心数
cpu_count = os.cpu_count()
# 终端信息
if os.isatty(0): # 检查标准输入是否是终端
print("运行在终端中")
columns, lines = os.get_terminal_size() # 终端大小
print(f"终端大小: {columns}x{lines}")环境变量和系统信息
环境变量操作
# 获取环境变量
home = os.environ.get('HOME')
path = os.environ.get('PATH')
# 安全获取(带默认值)
python_path = os.environ.get('PYTHONPATH', '/default/path')
# 设置环境变量
os.environ['MY_VAR'] = 'my_value'
# 更新环境变量(临时)
os.environ.update({
'VAR1': 'value1',
'VAR2': 'value2'
})
# 删除环境变量
if 'MY_VAR' in os.environ:
del os.environ['MY_VAR']
# 获取所有环境变量
for key, value in os.environ.items():
print(f"{key}={value}")系统路径
# 获取用户目录
user_home = os.path.expanduser('~')
desktop = os.path.expanduser('~/Desktop')
# 系统临时目录
temp_dir = os.environ.get('TMPDIR',
os.environ.get('TEMP',
os.environ.get('TMP', '/tmp')))
# 或使用标准库
import tempfile
temp_dir = tempfile.gettempdir()
# 可执行文件搜索路径
path_dirs = os.environ.get('PATH', '').split(os.pathsep)文件描述符和I/O
低级文件操作
# 使用文件描述符
fd = os.open('file.txt', os.O_RDWR | os.O_CREAT) # 打开文件
content = os.read(fd, 100) # 读取100字节
os.write(fd, b'New content') # 写入字节数据
os.lseek(fd, 0, os.SEEK_SET) # 移动文件指针
os.close(fd) # 关闭文件描述符
# 文件描述符复制
new_fd = os.dup(fd) # 复制文件描述符
os.dup2(old_fd, new_fd) # 复制到指定描述符管道和文件锁
# 创建管道
r, w = os.pipe() # 返回读端和写端的文件描述符
# 文件锁(Unix-like系统)
import fcntl
with open('file.txt', 'r+') as f:
# 非阻塞锁
try:
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
# 执行操作
fcntl.lockf(f, fcntl.LOCK_UN) # 解锁
except BlockingIOError:
print("文件被其他进程锁定")内存映射文件
import mmap
with open('large_file.bin', 'r+b') as f:
# 创建内存映射
mm = mmap.mmap(f.fileno(), 0)
# 随机访问
data = mm[1000:2000] # 读取1000字节
mm[5000:5010] = b'New data' # 写入数据
mm.close() # 关闭内存映射
错误和异常处理
常见异常
import os
import errno
try:
os.remove('non_existent_file.txt')
except FileNotFoundError:
print("文件不存在")
except PermissionError:
print("权限不足")
except OSError as e:
if e.errno == errno.ENOENT:
print("文件不存在")
elif e.errno == errno.EACCES:
print("权限不足")
else:
print(f"系统错误: {e}")
# 检查文件存在性
if not os.path.exists('file.txt'):
print("文件不存在")
# 检查目录是否为空
def is_dir_empty(dirpath):
try:
return len(os.listdir(dirpath)) == 0
except FileNotFoundError:
return True
except PermissionError:
print(f"无权限访问目录: {dirpath}")
return False错误代码
import errno
# 常见错误码
ERRORS = {
errno.ENOENT: "文件或目录不存在",
errno.EACCES: "权限不足",
errno.ENOSPC: "磁盘空间不足",
errno.EEXIST: "文件已存在",
errno.EINVAL: "无效参数",
errno.ENOTEMPTY: "目录非空"
}
def handle_os_error(e):
"""处理操作系统错误"""
error_msg = ERRORS.get(e.errno, str(e))
print(f"错误: {error_msg}")实用示例
示例1:目录清理工具
import os
import time
import shutil
from datetime import datetime, timedelta
def clean_old_files(directory, days_old=30, extensions=None):
"""
清理指定天数前的文件
"""
cutoff_time = time.time() - (days_old * 24 * 3600)
deleted = []
for root, dirs, files in os.walk(directory):
for file in files:
# 检查扩展名
if extensions and not file.endswith(tuple(extensions)):
continue
filepath = os.path.join(root, file)
try:
# 检查文件修改时间
if os.path.getmtime(filepath) < cutoff_time:
os.remove(filepath)
deleted.append(filepath)
print(f"删除: {filepath}")
except (PermissionError, OSError) as e:
print(f"无法删除 {filepath}: {e}")
return deleted
# 使用示例
clean_old_files('/tmp/logs', days_old=7, extensions=('.log', '.txt'))示例2:文件同步工具
import os
import hashlib
import shutil
def calculate_checksum(filepath):
"""计算文件校验和"""
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def sync_directories(source, destination):
"""同步两个目录"""
operations = {'copied': 0, 'deleted': 0, 'skipped': 0}
# 确保目标目录存在
os.makedirs(destination, exist_ok=True)
# 收集源文件信息
source_files = {}
for root, dirs, files in os.walk(source):
rel_path = os.path.relpath(root, source)
target_dir = os.path.join(destination, rel_path)
os.makedirs(target_dir, exist_ok=True)
for file in files:
src_file = os.path.join(root, file)
dst_file = os.path.join(target_dir, file)
# 比较文件
if not os.path.exists(dst_file):
# 目标文件不存在,直接复制
shutil.copy2(src_file, dst_file)
operations['copied'] += 1
else:
# 比较文件大小和修改时间
src_stat = os.stat(src_file)
dst_stat = os.stat(dst_file)
if (src_stat.st_size != dst_stat.st_size or
src_stat.st_mtime > dst_stat.st_mtime):
# 文件不同,复制更新
shutil.copy2(src_file, dst_file)
operations['copied'] += 1
else:
operations['skipped'] += 1
# 清理目标目录中不存在的文件
for root, dirs, files in os.walk(destination):
rel_path = os.path.relpath(root, destination)
source_dir = os.path.join(source, rel_path)
if not os.path.exists(source_dir):
# 源目录不存在,删除整个目录
shutil.rmtree(root)
operations['deleted'] += 1
continue
for file in files:
dst_file = os.path.join(root, file)
src_file = os.path.join(source_dir, file)
if not os.path.exists(src_file):
os.remove(dst_file)
operations['deleted'] += 1
return operations
# 使用示例
sync_directories('/path/to/source', '/path/to/backup')示例3:批量重命名工具
import os
import re
from pathlib import Path
def batch_rename(directory, pattern, replacement,
file_ext=None, recursive=False):
"""
批量重命名文件
"""
renamed = []
if recursive:
# 递归处理
file_list = []
for root, dirs, files in os.walk(directory):
for file in files:
file_list.append(os.path.join(root, file))
else:
# 仅当前目录
file_list = [os.path.join(directory, f)
for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))]
for filepath in file_list:
# 检查扩展名
if file_ext and not filepath.endswith(file_ext):
continue
dirname, filename = os.path.split(filepath)
new_name = re.sub(pattern, replacement, filename)
if new_name != filename:
new_path = os.path.join(dirname, new_name)
# 避免文件名冲突
counter = 1
while os.path.exists(new_path):
name, ext = os.path.splitext(new_name)
new_path = os.path.join(dirname,
f"{name}_{counter}{ext}")
counter += 1
os.rename(filepath, new_path)
renamed.append((filepath, new_path))
print(f"重命名: {filename} -> {os.path.basename(new_path)}")
return renamed
# 使用示例
batch_rename(
directory='./photos',
pattern=r'^IMG_(\d{4})\.jpg$',
replacement=r'photo_\1.jpg',
file_ext='.jpg',
recursive=True
)示例4:查找重复文件
import os
import hashlib
from collections import defaultdict
def find_duplicate_files(directory):
"""查找重复文件(基于内容)"""
hashes = defaultdict(list)
for root, dirs, files in os.walk(directory):
for filename in files:
filepath = os.path.join(root, filename)
try:
# 计算文件哈希
with open(filepath, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()
hashes[file_hash].append(filepath)
except (IOError, OSError) as e:
print(f"无法读取文件 {filepath}: {e}")
continue
# 返回重复的文件
duplicates = {h: paths for h, paths in hashes.items()
if len(paths) > 1}
return duplicates
def delete_duplicates(duplicates, keep_first=True):
"""删除重复文件"""
deleted = []
for file_hash, filepaths in duplicates.items():
# 保留第一个文件
if keep_first:
to_delete = filepaths[1:]
else:
to_delete = filepaths[:-1]
for filepath in to_delete:
try:
os.remove(filepath)
deleted.append(filepath)
print(f"删除: {filepath}")
except (PermissionError, OSError) as e:
print(f"无法删除 {filepath}: {e}")
return deleted
# 使用示例
duplicates = find_duplicate_files('/path/to/search')
for file_hash, filepaths in duplicates.items():
print(f"重复文件(哈希: {file_hash[:8]}...):")
for path in filepaths:
print(f" - {path}")最佳实践
1. 路径处理
# 正确:使用os.path处理路径
path = os.path.join('dir', 'subdir', 'file.txt')
# 错误:硬编码路径分隔符
path = 'dir' + '/' + 'subdir' + '/' + 'file.txt' # 不可移植2. 错误处理
# 正确:使用try-except处理异常
try:
os.remove('file.txt')
except OSError as e:
print(f"错误: {e}")
# 错误:不检查返回值
result = os.remove('file.txt') # 如果失败会抛出异常3. 资源管理
# 使用with语句确保资源释放
with open('file.txt', 'r') as f:
content = f.read()
# 文件会自动关闭
# 处理大量文件时
files = os.listdir('.')
for filename in files[:100]: # 限制数量
process_file(filename)4. 跨平台兼容性
# 检查操作系统
import platform
import os
system = platform.system()
if system == 'Windows':
# Windows特定代码
pass
elif system == 'Linux':
# Linux特定代码
pass
elif system == 'Darwin':
# macOS特定代码
pass
# 使用跨平台函数
home = os.path.expanduser('~') # 所有平台都适用5. 性能优化
# 批量操作时使用os.scandir而不是os.listdir
# os.scandir更高效,返回DirEntry对象
with os.scandir('.') as entries:
for entry in entries:
if entry.is_file():
print(f"文件: {entry.name}")
elif entry.is_dir():
print(f"目录: {entry.name}")
# 避免重复的状态检查
if os.path.exists('file.txt'):
if os.path.isfile('file.txt'):
# 文件存在且是文件
pass6. 安全考虑
# 验证用户输入路径
def safe_path(base_dir, user_path):
"""确保路径在指定目录内"""
# 规范化路径
full_path = os.path.abspath(
os.path.join(base_dir, user_path)
)
# 检查是否在基础目录内
if not full_path.startswith(os.path.abspath(base_dir)):
raise ValueError("路径越界")
return full_path
# 安全执行系统命令
import subprocess
# 使用参数列表而不是字符串
result = subprocess.run(['ls', '-la'], # 安全
capture_output=True, text=True)
# 不安全
result = os.system('ls -la') # 可能被注入7. 使用pathlib(Python 3.4+)
# 现代路径处理
from pathlib import Path
# 更Pythonic的API
path = Path('/home/user') / 'documents' / 'file.txt'
if path.exists():
print(f"文件大小: {path.stat().st_size}字节")
# 遍历目录
for file in Path('.').glob('*.txt'):
print(file.name)
# 读取文件
content = path.read_text()
# 写入文件
path.write_text('Hello, World!')常见问题解答
Q1: os.remove()和 os.unlink()的区别?
- 两者功能完全相同,
unlink是Unix系统调用的名称
Q2: 如何递归删除非空目录?
import shutil
shutil.rmtree('directory')Q3: 如何跨平台获取临时目录?
import tempfile
temp_dir = tempfile.gettempdir()Q4: 如何获取文件的创建时间?
-
在Unix系统上,
os.path.getctime()返回元数据修改时间 -
在Windows上,它返回创建时间
-
跨平台获取真实的创建时间需要使用平台特定API
Q5: 如何正确处理中文路径?
# 在Windows上可能需要处理编码
path = '中文路径'.encode('gbk') # Windows GBK编码
# 或使用Unicode路径
path = '中文路径' # Python 3默认使用Unicode总结
os模块是Python与操作系统交互的瑞士军刀。通过本指南,你应该能够:
- 处理文件和目录操作
- 管理路径(跨平台兼容)
- 执行系统命令和进程管理
- 访问和修改环境变量
- 处理文件权限和属性
- 编写安全、高效的脚本
记住,对于复杂的文件操作,考虑使用 shutil模块;对于路径处理,Python 3.4+推荐使用 pathlib。
核心原则:
- 总是使用
os.path.join()拼接路径 - 正确处理异常
- 考虑跨平台兼容性
- 遵循最小权限原则
- 使用with语句管理资源
通过合理使用os模块,你可以编写强大、可移植的系统管理脚本和工具程序。