Files
LanMountainDesktop/scripts/analyze_git_commits.py
lincube d8f75e86be Add IPC backoff/retries and safer disposal
Introduce exponential backoff, jitter and retry logic across IPC components to improve robustness and avoid tight retry loops; make disposal idempotent and add connection guards. Key changes:
- LauncherCoordinatorIpcServer / LauncherIpcServer: add backoff constants, ComputeBackoff(), consecutive error tracking and delayed retries with jitter.
- LanMountainDesktopIpcClient / LauncherIpcClient: add connect retry loops, timeouts, delayed retries, improved error logging, and use ArrayPool for buffered async writes; ensure proper cleanup on failures.
- PublicIpcHostService: add disposed flag, guard OnPeerConnected and Dispose, and clear connected peers on dispose.
- Add many auto-generated commit analysis docs under docs/auto_commit_md and new scripts for analyzing/generating commit docs.
These changes aim to make IPC connection handling more resilient and resource-safe.
2026-05-07 21:39:21 +08:00

601 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Git Commit 深度分析工具
用于解析 Git 对象文件并生成详细的代码变更分析报告
"""
import zlib
import os
import re
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class GitObject:
"""Git 对象基类"""
obj_type: str
content: bytes
raw_data: bytes
@dataclass
class CommitInfo:
"""提交信息"""
hash: str
parent: Optional[str]
tree: str
author: str
email: str
timestamp: int
timezone: str
message: str
changes: List[Dict] = field(default_factory=list)
stats: Dict = field(default_factory=dict)
@dataclass
class FileChange:
"""文件变更信息"""
path: str
change_type: str # added, modified, deleted, renamed
old_path: Optional[str] = None
additions: int = 0
deletions: int = 0
diff_content: str = ""
class GitObjectParser:
"""Git 对象解析器"""
def __init__(self, repo_path: str):
self.repo_path = Path(repo_path)
self.objects_path = self.repo_path / ".git" / "objects"
self.commit_cache: Dict[str, CommitInfo] = {}
self.tree_cache: Dict[str, Dict[str, str]] = {}
def read_object(self, obj_hash: str) -> Optional[GitObject]:
"""读取并解压缩 Git 对象"""
if len(obj_hash) < 4:
return None
obj_dir = obj_hash[:2]
obj_file = obj_hash[2:]
obj_path = self.objects_path / obj_dir / obj_file
if not obj_path.exists():
return None
try:
with open(obj_path, 'rb') as f:
compressed_data = f.read()
# 解压缩 zlib
decompressed = zlib.decompress(compressed_data)
# 解析对象头和内容
null_idx = decompressed.index(b'\x00')
header = decompressed[:null_idx].decode('utf-8')
content = decompressed[null_idx + 1:]
obj_type = header.split()[0]
return GitObject(obj_type=obj_type, content=content, raw_data=decompressed)
except Exception as e:
print(f"Error reading object {obj_hash}: {e}")
return None
def parse_commit(self, commit_hash: str) -> Optional[CommitInfo]:
"""解析 commit 对象"""
if commit_hash in self.commit_cache:
return self.commit_cache[commit_hash]
obj = self.read_object(commit_hash)
if not obj or obj.obj_type != 'commit':
return None
try:
content = obj.content.decode('utf-8', errors='replace')
lines = content.split('\n')
parent = None
tree = None
author = None
email = None
timestamp = None
timezone = None
message_lines = []
in_message = False
for line in lines:
if in_message:
message_lines.append(line)
elif line.startswith('tree '):
tree = line[5:].strip()
elif line.startswith('parent '):
parent = line[7:].strip()
elif line.startswith('author '):
# author name <email> timestamp timezone
match = re.match(r'author (.+) <(.+)> (\d+) ([+-]\d+)', line)
if match:
author = match.group(1)
email = match.group(2)
timestamp = int(match.group(3))
timezone = match.group(4)
elif line == '':
in_message = True
message = '\n'.join(message_lines).strip()
commit_info = CommitInfo(
hash=commit_hash,
parent=parent,
tree=tree,
author=author or "Unknown",
email=email or "",
timestamp=timestamp or 0,
timezone=timezone or "",
message=message
)
self.commit_cache[commit_hash] = commit_info
return commit_info
except Exception as e:
print(f"Error parsing commit {commit_hash}: {e}")
return None
def parse_tree(self, tree_hash: str) -> Dict[str, str]:
"""解析 tree 对象,返回文件路径到 blob hash 的映射"""
if tree_hash in self.tree_cache:
return self.tree_cache[tree_hash]
obj = self.read_object(tree_hash)
if not obj or obj.obj_type != 'tree':
return {}
entries = {}
content = obj.content
idx = 0
while idx < len(content):
# 查找空格分隔符
space_idx = content.find(b' ', idx)
if space_idx == -1:
break
mode = content[idx:space_idx].decode('utf-8')
# 查找 null 分隔符
null_idx = content.find(b'\x00', space_idx)
if null_idx == -1:
break
name = content[space_idx + 1:null_idx].decode('utf-8', errors='replace')
# 读取 20 字节的 SHA
sha_start = null_idx + 1
sha_end = sha_start + 20
if sha_end > len(content):
break
sha = content[sha_start:sha_end].hex()
entries[name] = sha
idx = sha_end
self.tree_cache[tree_hash] = entries
return entries
def get_blob_content(self, blob_hash: str) -> Optional[str]:
"""获取 blob 对象的内容"""
obj = self.read_object(blob_hash)
if not obj or obj.obj_type != 'blob':
return None
try:
return obj.content.decode('utf-8', errors='replace')
except:
return None
def compare_trees(self, old_tree: str, new_tree: str) -> List[FileChange]:
"""比较两个 tree 对象,返回文件变更列表"""
old_files = self.parse_tree(old_tree) if old_tree else {}
new_files = self.parse_tree(new_tree) if new_tree else {}
changes = []
# 查找新增和修改的文件
for path, new_hash in new_files.items():
if path not in old_files:
changes.append(FileChange(path=path, change_type='added'))
elif old_files[path] != new_hash:
changes.append(FileChange(path=path, change_type='modified'))
# 查找删除的文件
for path in old_files:
if path not in new_files:
changes.append(FileChange(path=path, change_type='deleted'))
return changes
def get_commit_changes(self, commit_hash: str) -> Tuple[List[FileChange], Dict]:
"""获取提交的所有变更"""
commit = self.parse_commit(commit_hash)
if not commit:
return [], {}
# 获取当前提交的 tree
current_tree = self.parse_tree(commit.tree)
# 获取父提交的 tree
parent_tree = {}
if commit.parent:
parent_commit = self.parse_commit(commit.parent)
if parent_commit:
parent_tree = self.parse_tree(parent_commit.tree)
changes = []
stats = {'added': 0, 'modified': 0, 'deleted': 0, 'total_additions': 0, 'total_deletions': 0}
# 比较 tree
all_paths = set(current_tree.keys()) | set(parent_tree.keys())
for path in all_paths:
if path in current_tree and path not in parent_tree:
# 新增文件
changes.append(FileChange(path=path, change_type='added'))
stats['added'] += 1
elif path not in current_tree and path in parent_tree:
# 删除文件
changes.append(FileChange(path=path, change_type='deleted'))
stats['deleted'] += 1
elif current_tree.get(path) != parent_tree.get(path):
# 修改文件
changes.append(FileChange(path=path, change_type='modified'))
stats['modified'] += 1
return changes, stats
class CommitAnalyzer:
"""提交分析器"""
def __init__(self, repo_path: str):
self.parser = GitObjectParser(repo_path)
self.repo_path = Path(repo_path)
def analyze_commit(self, commit_hash: str) -> Dict[str, Any]:
"""分析单个提交"""
commit = self.parser.parse_commit(commit_hash)
if not commit:
return {}
changes, stats = self.parser.get_commit_changes(commit_hash)
# 分析文件类型
file_types = defaultdict(int)
for change in changes:
ext = Path(change.path).suffix or 'no_extension'
file_types[ext] += 1
# 分析变更的重要性
importance = self._assess_importance(commit.message, changes, stats)
# 提取关键代码片段
key_snippets = self._extract_key_snippets(changes)
return {
'commit_hash': commit_hash,
'message': commit.message,
'author': commit.author,
'email': commit.email,
'timestamp': commit.timestamp,
'date': datetime.fromtimestamp(commit.timestamp).strftime('%Y-%m-%d %H:%M:%S'),
'parent': commit.parent,
'changes': [
{
'path': c.path,
'type': c.change_type,
'additions': c.additions,
'deletions': c.deletions
}
for c in changes
],
'stats': stats,
'file_types': dict(file_types),
'importance': importance,
'key_snippets': key_snippets,
'impact_analysis': self._analyze_impact(changes, commit.message),
'review_points': self._generate_review_points(changes, commit.message)
}
def _assess_importance(self, message: str, changes: List[FileChange], stats: Dict) -> str:
"""评估提交的重要性"""
message_lower = message.lower()
# 检查关键关键词
critical_keywords = ['fix', 'bug', 'security', 'crash', 'memory leak', 'deadlock']
feature_keywords = ['feat', 'feature', 'add', 'implement', 'new']
refactor_keywords = ['refactor', 'restructure', 'cleanup', 'optimize']
if any(kw in message_lower for kw in critical_keywords):
return 'critical'
elif any(kw in message_lower for kw in feature_keywords):
return 'feature'
elif stats.get('added', 0) + stats.get('modified', 0) + stats.get('deleted', 0) > 20:
return 'major'
elif any(kw in message_lower for kw in refactor_keywords):
return 'refactor'
else:
return 'minor'
def _extract_key_snippets(self, changes: List[FileChange]) -> List[Dict]:
"""提取关键代码片段"""
snippets = []
for change in changes[:10]: # 限制分析的文件数量
if change.change_type == 'deleted':
continue
# 尝试读取文件内容
file_path = self.repo_path / change.path
if file_path.exists() and file_path.is_file():
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
# 提取文件的基本信息
lines = content.split('\n')
snippet = {
'file': change.path,
'type': change.change_type,
'lines_count': len(lines),
'preview': '\n'.join(lines[:30]) if len(lines) > 30 else content
}
snippets.append(snippet)
except Exception:
pass
return snippets
def _analyze_impact(self, changes: List[FileChange], message: str) -> List[str]:
"""分析变更对项目的影响"""
impacts = []
# 分析受影响的模块
affected_modules = set()
for change in changes:
parts = change.path.split('/')
if len(parts) > 1:
affected_modules.add(parts[0])
if affected_modules:
impacts.append(f"受影响的模块: {', '.join(sorted(affected_modules))}")
# 分析文件类型影响
file_types = defaultdict(int)
for change in changes:
ext = Path(change.path).suffix
if ext:
file_types[ext] += 1
if '.cs' in file_types:
impacts.append(f"涉及 {file_types['.cs']} 个 C# 文件变更")
if '.axaml' in file_types or '.xaml' in file_types:
impacts.append("涉及 UI/XAML 文件变更")
if '.md' in file_types:
impacts.append("涉及文档更新")
# 根据提交消息分析
message_lower = message.lower()
if 'fix' in message_lower:
impacts.append("这是一个修复性提交,可能解决现有问题")
if 'feat' in message_lower or 'feature' in message_lower:
impacts.append("这是一个功能新增提交,扩展了项目能力")
if 'refactor' in message_lower:
impacts.append("这是一个重构提交,改善了代码结构")
if 'test' in message_lower:
impacts.append("涉及测试相关变更")
return impacts
def _generate_review_points(self, changes: List[FileChange], message: str) -> List[str]:
"""生成代码审查要点"""
points = []
# 检查大文件变更
large_files = [c for c in changes if c.additions + c.deletions > 100]
if large_files:
points.append(f"注意: 有 {len(large_files)} 个文件变更超过 100 行,需要仔细审查")
# 检查关键文件
critical_patterns = ['Program.cs', 'App.axaml', 'MainWindow', 'Core', 'Service']
for change in changes:
for pattern in critical_patterns:
if pattern in change.path:
points.append(f"关键文件变更: {change.path} - 需要特别关注")
break
# 检查提交消息质量
if len(message) < 10:
points.append("提交消息较短,建议提供更详细的变更说明")
if 'wip' in message.lower() or 'todo' in message.lower():
points.append("提交包含 WIP/TODO 标记,确认是否已完成")
# 检查文件删除
deleted = [c for c in changes if c.change_type == 'deleted']
if deleted:
points.append(f"删除了 {len(deleted)} 个文件,确认是否有其他代码依赖这些文件")
return points
def generate_markdown_report(analysis: Dict[str, Any]) -> str:
"""生成 Markdown 格式的分析报告"""
lines = []
# 标题
lines.append(f"# Commit 深度分析报告")
lines.append(f"")
lines.append(f"**提交哈希**: `{analysis['commit_hash']}`")
lines.append(f"**提交时间**: {analysis['date']}")
lines.append(f"**作者**: {analysis['author']} <{analysis['email']}>")
lines.append(f"**重要性**: {analysis['importance'].upper()}")
lines.append(f"")
# 提交消息
lines.append(f"## 提交消息")
lines.append(f"```")
lines.append(analysis['message'])
lines.append(f"```")
lines.append(f"")
# 变更统计
lines.append(f"## 变更统计")
stats = analysis['stats']
lines.append(f"- **新增文件**: {stats.get('added', 0)}")
lines.append(f"- **修改文件**: {stats.get('modified', 0)}")
lines.append(f"- **删除文件**: {stats.get('deleted', 0)}")
lines.append(f"")
# 文件类型分布
if analysis.get('file_types'):
lines.append(f"### 文件类型分布")
for ext, count in sorted(analysis['file_types'].items(), key=lambda x: -x[1]):
lines.append(f"- `{ext}`: {count} 个文件")
lines.append(f"")
# 变更文件列表
if analysis.get('changes'):
lines.append(f"## 变更文件列表")
lines.append(f"| 文件路径 | 变更类型 |")
lines.append(f"|---------|---------|")
type_map = {'added': '新增', 'modified': '修改', 'deleted': '删除'}
for change in analysis['changes'][:50]: # 限制显示数量
change_type = type_map.get(change['type'], change['type'])
lines.append(f"| `{change['path']}` | {change_type} |")
lines.append(f"")
# 影响分析
if analysis.get('impact_analysis'):
lines.append(f"## 影响分析")
for impact in analysis['impact_analysis']:
lines.append(f"- {impact}")
lines.append(f"")
# 代码审查要点
if analysis.get('review_points'):
lines.append(f"## 代码审查要点")
for point in analysis['review_points']:
lines.append(f"- ⚠️ {point}")
lines.append(f"")
# 关键代码片段
if analysis.get('key_snippets'):
lines.append(f"## 关键代码片段")
for snippet in analysis['key_snippets'][:5]:
lines.append(f"### {snippet['file']}")
lines.append(f"- **类型**: {snippet['type']}")
lines.append(f"- **行数**: {snippet['lines_count']}")
lines.append(f"")
lines.append(f"```")
lines.append(snippet['preview'][:2000]) # 限制预览长度
lines.append(f"```")
lines.append(f"")
return '\n'.join(lines)
def main():
"""主函数"""
repo_path = r"d:\github\LanMountainDesktop"
output_dir = Path(repo_path) / "docs" / "auto_commit_md"
# 确保输出目录存在
output_dir.mkdir(parents=True, exist_ok=True)
# 读取 HEAD 日志
head_log_path = Path(repo_path) / ".git" / "logs" / "HEAD"
if not head_log_path.exists():
print(f"错误: 找不到 HEAD 日志文件: {head_log_path}")
return
# 解析 HEAD 日志获取所有 commit
commits = []
with open(head_log_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
# 解析日志行
# 格式: old_hash new_hash name <email> timestamp timezone\taction: message
parts = line.split('\t')
if len(parts) < 2:
continue
meta_part = parts[0]
action_part = parts[1]
meta_tokens = meta_part.split()
if len(meta_tokens) < 5:
continue
new_hash = meta_tokens[1]
# 只处理 commit 操作
if 'commit' in action_part or action_part.startswith('commit:'):
message = action_part.replace('commit:', '').strip()
commits.append({
'hash': new_hash,
'message': message
})
print(f"找到 {len(commits)} 个 commit")
# 初始化分析器
analyzer = CommitAnalyzer(repo_path)
# 分析每个 commit
for i, commit_info in enumerate(commits):
commit_hash = commit_info['hash']
short_hash = commit_hash[:7]
print(f"[{i+1}/{len(commits)}] 分析 commit: {short_hash} - {commit_info['message'][:50]}")
try:
# 分析提交
analysis = analyzer.analyze_commit(commit_hash)
if not analysis:
print(f" 跳过: 无法解析 commit {short_hash}")
continue
# 生成报告
report = generate_markdown_report(analysis)
# 保存报告
date_str = datetime.fromtimestamp(analysis['timestamp']).strftime('%Y%m%d')
filename = f"{date_str}_{short_hash}_deep_analysis.md"
output_path = output_dir / filename
with open(output_path, 'w', encoding='utf-8') as f:
f.write(report)
print(f" 已保存: {filename}")
except Exception as e:
print(f" 错误: 分析 commit {short_hash} 时出错: {e}")
import traceback
traceback.print_exc()
print("\n分析完成!")
if __name__ == "__main__":
main()