|
|
@@ -0,0 +1,408 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+日志标签分析工具
|
|
|
+分析Android日志文件中不同tag的使用比例,帮助识别和减少不必要的日志
|
|
|
+
|
|
|
+使用方法:
|
|
|
+ python analyze_log_tags.py logfile.txt
|
|
|
+ python analyze_log_tags.py logfile.txt --top 20
|
|
|
+ python analyze_log_tags.py logfile.txt --min-count 100
|
|
|
+ python analyze_log_tags.py logfile.txt --export report.csv
|
|
|
+"""
|
|
|
+
|
|
|
+import re
|
|
|
+import sys
|
|
|
+import argparse
|
|
|
+from collections import Counter
|
|
|
+from pathlib import Path
|
|
|
+from typing import Dict, List, Tuple
|
|
|
+import json
|
|
|
+
|
|
|
+
|
|
|
+class LogTagAnalyzer:
|
|
|
+ """日志标签分析器"""
|
|
|
+
|
|
|
+ # Android日志格式匹配:日期 时间 进程号 线程号 级别 TAG: 消息
|
|
|
+ LOG_PATTERN = re.compile(
|
|
|
+ r'^\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\.\d+\s+\d+\s+\d+\s+[VDIWEF]\s+(\S+?)\s*:\s*(.*)$'
|
|
|
+ )
|
|
|
+
|
|
|
+ # 简化格式匹配:级别/TAG: 消息
|
|
|
+ SIMPLE_PATTERN = re.compile(r'^[VDIWEF]/(\S+?)\s*:\s*(.*)$')
|
|
|
+
|
|
|
+ # 自定义格式匹配:[级别][日期时间][进程信息][TAG][其他]
|
|
|
+ # 示例: [I][2025-09-22 +3.0 23:57:49.377][12735, 1*][tag_float_view][:0, ][remove, reason: clearWhenExitRoom
|
|
|
+ CUSTOM_PATTERN = re.compile(
|
|
|
+ r'^\[([VDIWEF])\]\[[\d\-+:\s.]+\]\[[^\]]+\]\[([^\]]+)\]'
|
|
|
+ )
|
|
|
+
|
|
|
+ # TAG_XXX 格式匹配
|
|
|
+ TAG_CONSTANT_PATTERN = re.compile(r'TAG_[A-Z_]+')
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.tag_counter = Counter()
|
|
|
+ self.tag_level_counter = {} # {tag: {level: count}}
|
|
|
+ self.total_lines = 0
|
|
|
+ self.matched_lines = 0
|
|
|
+ self.custom_format_count = 0 # 统计自定义格式的数量
|
|
|
+ self.long_log_counter = Counter() # 统计超长日志的TAG
|
|
|
+ self.long_log_samples = {} # 存储超长日志的示例 {tag: [samples]}
|
|
|
+ # 超长日志阈值:字符数(约等于5行,假设每行80字符)
|
|
|
+ self.LONG_LOG_THRESHOLD = 400
|
|
|
+
|
|
|
+ def analyze_file(self, file_path: str) -> None:
|
|
|
+ """分析日志文件"""
|
|
|
+ print(f"正在分析日志文件: {file_path}")
|
|
|
+
|
|
|
+ try:
|
|
|
+ with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
|
+ for line in f:
|
|
|
+ self.total_lines += 1
|
|
|
+ self._analyze_line(line.strip())
|
|
|
+
|
|
|
+ # 进度提示
|
|
|
+ if self.total_lines % 10000 == 0:
|
|
|
+ print(f"已处理 {self.total_lines} 行...")
|
|
|
+
|
|
|
+ except FileNotFoundError:
|
|
|
+ print(f"错误: 找不到文件 {file_path}")
|
|
|
+ sys.exit(1)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"错误: 读取文件时发生异常: {e}")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ print(f"\n分析完成!")
|
|
|
+ print(f"总行数: {self.total_lines}")
|
|
|
+ print(f"匹配的日志行数: {self.matched_lines}")
|
|
|
+ if self.custom_format_count > 0:
|
|
|
+ print(f"自定义格式日志数: {self.custom_format_count}")
|
|
|
+ print(f"不同TAG数量: {len(self.tag_counter)}")
|
|
|
+
|
|
|
+ # 统计超长日志
|
|
|
+ total_long_logs = sum(self.long_log_counter.values())
|
|
|
+ if total_long_logs > 0:
|
|
|
+ print(f"⚠️ 超长日志数量: {total_long_logs} ({total_long_logs/self.matched_lines*100:.2f}%)")
|
|
|
+ print(f" 包含超长日志的TAG数: {len(self.long_log_counter)}")
|
|
|
+
|
|
|
+ def _analyze_line(self, line: str) -> None:
|
|
|
+ """分析单行日志"""
|
|
|
+ if not line:
|
|
|
+ return
|
|
|
+
|
|
|
+ tag = None
|
|
|
+ level = None
|
|
|
+ message = None
|
|
|
+
|
|
|
+ # 优先尝试匹配自定义格式:[级别][日期][进程][TAG][其他][消息]
|
|
|
+ custom_match = self.CUSTOM_PATTERN.match(line)
|
|
|
+ if custom_match:
|
|
|
+ level = custom_match.group(1)
|
|
|
+ tag = custom_match.group(2)
|
|
|
+ self.custom_format_count += 1
|
|
|
+ # 提取消息内容(第5个方括号之后的部分)
|
|
|
+ msg_match = re.search(r'(?:\[[^\]]*\]){5}(.*)$', line)
|
|
|
+ if msg_match:
|
|
|
+ message = msg_match.group(1)
|
|
|
+ else:
|
|
|
+ # 尝试匹配Android标准格式
|
|
|
+ match = self.LOG_PATTERN.match(line)
|
|
|
+ if not match:
|
|
|
+ # 尝试匹配简化格式
|
|
|
+ match = self.SIMPLE_PATTERN.match(line)
|
|
|
+
|
|
|
+ if match:
|
|
|
+ tag = match.group(1)
|
|
|
+ if len(match.groups()) > 1:
|
|
|
+ message = match.group(2)
|
|
|
+ # 从原始行中提取级别
|
|
|
+ level_match = re.search(r'\s([VDIWEF])\s', line)
|
|
|
+ if level_match:
|
|
|
+ level = level_match.group(1)
|
|
|
+
|
|
|
+ # 如果成功提取到TAG,进行统计
|
|
|
+ if tag:
|
|
|
+ self.tag_counter[tag] += 1
|
|
|
+ self.matched_lines += 1
|
|
|
+
|
|
|
+ # 统计日志级别
|
|
|
+ if level:
|
|
|
+ if tag not in self.tag_level_counter:
|
|
|
+ self.tag_level_counter[tag] = Counter()
|
|
|
+ self.tag_level_counter[tag][level] += 1
|
|
|
+
|
|
|
+ # 检查是否为超长日志
|
|
|
+ if message and len(message) > self.LONG_LOG_THRESHOLD:
|
|
|
+ self.long_log_counter[tag] += 1
|
|
|
+ # 保存示例(每个TAG最多保存3个示例)
|
|
|
+ if tag not in self.long_log_samples:
|
|
|
+ self.long_log_samples[tag] = []
|
|
|
+ if len(self.long_log_samples[tag]) < 3:
|
|
|
+ # 截取前200个字符作为示例
|
|
|
+ sample = message[:200] + "..." if len(message) > 200 else message
|
|
|
+ self.long_log_samples[tag].append(sample)
|
|
|
+
|
|
|
+ def get_statistics(self) -> List[Tuple[str, int, float]]:
|
|
|
+ """获取统计结果: [(tag, count, percentage), ...]"""
|
|
|
+ if self.matched_lines == 0:
|
|
|
+ return []
|
|
|
+
|
|
|
+ stats = []
|
|
|
+ for tag, count in self.tag_counter.most_common():
|
|
|
+ percentage = (count / self.matched_lines) * 100
|
|
|
+ stats.append((tag, count, percentage))
|
|
|
+ return stats
|
|
|
+
|
|
|
+ def print_report(self, top_n: int = None, min_count: int = 0) -> None:
|
|
|
+ """打印分析报告"""
|
|
|
+ stats = self.get_statistics()
|
|
|
+
|
|
|
+ if not stats:
|
|
|
+ print("没有找到日志数据")
|
|
|
+ return
|
|
|
+
|
|
|
+ print("\n" + "=" * 100)
|
|
|
+ print("日志TAG使用统计报告")
|
|
|
+ print("=" * 100)
|
|
|
+ print(f"{'排名':<6} {'TAG名称':<40} {'数量':<12} {'占比':<10} {'级别分布'}")
|
|
|
+ print("-" * 100)
|
|
|
+
|
|
|
+ rank = 1
|
|
|
+ for tag, count, percentage in stats:
|
|
|
+ if min_count > 0 and count < min_count:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if top_n and rank > top_n:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 获取级别分布
|
|
|
+ level_dist = ""
|
|
|
+ if tag in self.tag_level_counter:
|
|
|
+ levels = []
|
|
|
+ for level in ['V', 'D', 'I', 'W', 'E', 'F']:
|
|
|
+ if level in self.tag_level_counter[tag]:
|
|
|
+ level_count = self.tag_level_counter[tag][level]
|
|
|
+ levels.append(f"{level}:{level_count}")
|
|
|
+ level_dist = " ".join(levels)
|
|
|
+
|
|
|
+ print(f"{rank:<6} {tag:<40} {count:<12} {percentage:>6.2f}% {level_dist}")
|
|
|
+ rank += 1
|
|
|
+
|
|
|
+ print("=" * 100)
|
|
|
+
|
|
|
+ # 额外统计
|
|
|
+ self._print_summary_statistics(stats, min_count)
|
|
|
+
|
|
|
+ def _print_summary_statistics(self, stats: List[Tuple[str, int, float]], min_count: int) -> None:
|
|
|
+ """打印汇总统计"""
|
|
|
+ print("\n" + "=" * 100)
|
|
|
+ print("汇总统计")
|
|
|
+ print("=" * 100)
|
|
|
+
|
|
|
+ # TOP 10占比
|
|
|
+ if len(stats) >= 10:
|
|
|
+ top10_count = sum(count for _, count, _ in stats[:10])
|
|
|
+ top10_percentage = (top10_count / self.matched_lines) * 100
|
|
|
+ print(f"TOP 10 TAG占比: {top10_percentage:.2f}%")
|
|
|
+
|
|
|
+ # TOP 20占比
|
|
|
+ if len(stats) >= 20:
|
|
|
+ top20_count = sum(count for _, count, _ in stats[:20])
|
|
|
+ top20_percentage = (top20_count / self.matched_lines) * 100
|
|
|
+ print(f"TOP 20 TAG占比: {top20_percentage:.2f}%")
|
|
|
+
|
|
|
+ # 低频TAG统计
|
|
|
+ low_freq_tags = [tag for tag, count, _ in stats if count < 10]
|
|
|
+ if low_freq_tags:
|
|
|
+ print(f"\n出现次数 < 10 的TAG: {len(low_freq_tags)} 个")
|
|
|
+ print(f"这些低频TAG总占比: {sum(count for _, count, _ in stats if count < 10) / self.matched_lines * 100:.2f}%")
|
|
|
+
|
|
|
+ # 按级别统计
|
|
|
+ level_totals = Counter()
|
|
|
+ for tag_levels in self.tag_level_counter.values():
|
|
|
+ level_totals.update(tag_levels)
|
|
|
+
|
|
|
+ if level_totals:
|
|
|
+ print("\n按日志级别统计:")
|
|
|
+ for level in ['V', 'D', 'I', 'W', 'E', 'F']:
|
|
|
+ if level in level_totals:
|
|
|
+ count = level_totals[level]
|
|
|
+ percentage = (count / self.matched_lines) * 100
|
|
|
+ level_name = {
|
|
|
+ 'V': 'VERBOSE',
|
|
|
+ 'D': 'DEBUG',
|
|
|
+ 'I': 'INFO',
|
|
|
+ 'W': 'WARN',
|
|
|
+ 'E': 'ERROR',
|
|
|
+ 'F': 'FATAL'
|
|
|
+ }.get(level, level)
|
|
|
+ print(f" {level_name:<10}: {count:>10} ({percentage:>6.2f}%)")
|
|
|
+
|
|
|
+ # 超长日志统计
|
|
|
+ if self.long_log_counter:
|
|
|
+ total_long = sum(self.long_log_counter.values())
|
|
|
+ print(f"\n⚠️ 超长日志统计(消息 > {self.LONG_LOG_THRESHOLD} 字符 ≈ 5行):")
|
|
|
+ print(f" 总超长日志数: {total_long} ({total_long/self.matched_lines*100:.2f}%)")
|
|
|
+ print(f" 包含超长日志的TAG TOP 10:")
|
|
|
+ for tag, count in self.long_log_counter.most_common(10):
|
|
|
+ tag_total = self.tag_counter[tag]
|
|
|
+ percentage_in_tag = (count / tag_total) * 100
|
|
|
+ print(f" • {tag:<35} {count:>6} 条 ({percentage_in_tag:>5.1f}% 的该TAG)")
|
|
|
+
|
|
|
+ print("=" * 100)
|
|
|
+
|
|
|
+ def show_long_log_samples(self, top_n: int = 5) -> None:
|
|
|
+ """显示超长日志的示例"""
|
|
|
+ if not self.long_log_counter:
|
|
|
+ print("没有发现超长日志")
|
|
|
+ return
|
|
|
+
|
|
|
+ print("\n" + "=" * 100)
|
|
|
+ print("超长日志示例")
|
|
|
+ print("=" * 100)
|
|
|
+
|
|
|
+ for tag, count in self.long_log_counter.most_common(top_n):
|
|
|
+ print(f"\n【{tag}】 共 {count} 条超长日志")
|
|
|
+ if tag in self.long_log_samples:
|
|
|
+ for i, sample in enumerate(self.long_log_samples[tag], 1):
|
|
|
+ print(f" 示例 {i}: {sample}")
|
|
|
+
|
|
|
+ print("=" * 100)
|
|
|
+
|
|
|
+ def export_csv(self, output_path: str, min_count: int = 0) -> None:
|
|
|
+ """导出为CSV文件"""
|
|
|
+ stats = self.get_statistics()
|
|
|
+
|
|
|
+ try:
|
|
|
+ with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
+ f.write("排名,TAG名称,数量,占比(%),超长日志数,超长占比(%)\n")
|
|
|
+ rank = 1
|
|
|
+ for tag, count, percentage in stats:
|
|
|
+ if min_count > 0 and count < min_count:
|
|
|
+ continue
|
|
|
+ long_count = self.long_log_counter.get(tag, 0)
|
|
|
+ long_percentage = (long_count / count * 100) if count > 0 else 0
|
|
|
+ f.write(f"{rank},{tag},{count},{percentage:.2f},{long_count},{long_percentage:.2f}\n")
|
|
|
+ rank += 1
|
|
|
+ print(f"\n已导出CSV报告到: {output_path}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"导出CSV失败: {e}")
|
|
|
+
|
|
|
+ def export_json(self, output_path: str) -> None:
|
|
|
+ """导出为JSON文件"""
|
|
|
+ stats = self.get_statistics()
|
|
|
+
|
|
|
+ result = {
|
|
|
+ "summary": {
|
|
|
+ "total_lines": self.total_lines,
|
|
|
+ "matched_lines": self.matched_lines,
|
|
|
+ "unique_tags": len(self.tag_counter)
|
|
|
+ },
|
|
|
+ "tags": []
|
|
|
+ }
|
|
|
+
|
|
|
+ rank = 1
|
|
|
+ for tag, count, percentage in stats:
|
|
|
+ tag_data = {
|
|
|
+ "rank": rank,
|
|
|
+ "tag": tag,
|
|
|
+ "count": count,
|
|
|
+ "percentage": round(percentage, 2)
|
|
|
+ }
|
|
|
+
|
|
|
+ # 添加级别分布
|
|
|
+ if tag in self.tag_level_counter:
|
|
|
+ tag_data["levels"] = dict(self.tag_level_counter[tag])
|
|
|
+
|
|
|
+ result["tags"].append(tag_data)
|
|
|
+ rank += 1
|
|
|
+
|
|
|
+ try:
|
|
|
+ with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(result, f, indent=2, ensure_ascii=False)
|
|
|
+ print(f"\n已导出JSON报告到: {output_path}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"导出JSON失败: {e}")
|
|
|
+
|
|
|
+ def suggest_cleanup(self, threshold_percentage: float = 0.01) -> List[str]:
|
|
|
+ """建议清理的TAG(占比低于阈值)"""
|
|
|
+ stats = self.get_statistics()
|
|
|
+ suggestions = []
|
|
|
+
|
|
|
+ for tag, count, percentage in stats:
|
|
|
+ if percentage < threshold_percentage:
|
|
|
+ suggestions.append(tag)
|
|
|
+
|
|
|
+ return suggestions
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ parser = argparse.ArgumentParser(
|
|
|
+ description='分析Android日志文件中TAG的使用情况',
|
|
|
+ formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
|
+ epilog="""
|
|
|
+示例用法:
|
|
|
+ python analyze_log_tags.py logcat.txt
|
|
|
+ python analyze_log_tags.py logcat.txt --top 30
|
|
|
+ python analyze_log_tags.py logcat.txt --min-count 50
|
|
|
+ python analyze_log_tags.py logcat.txt --export report.csv
|
|
|
+ python analyze_log_tags.py logcat.txt --export-json report.json
|
|
|
+ python analyze_log_tags.py logcat.txt --suggest-cleanup
|
|
|
+ """
|
|
|
+ )
|
|
|
+
|
|
|
+ parser.add_argument('logfile', help='日志文件路径')
|
|
|
+ parser.add_argument('--top', type=int, help='只显示TOP N个TAG', metavar='N')
|
|
|
+ parser.add_argument('--min-count', type=int, default=0,
|
|
|
+ help='只显示出现次数 >= N 的TAG', metavar='N')
|
|
|
+ parser.add_argument('--export', help='导出CSV报告到指定文件', metavar='FILE')
|
|
|
+ parser.add_argument('--export-json', help='导出JSON报告到指定文件', metavar='FILE')
|
|
|
+ parser.add_argument('--suggest-cleanup', action='store_true',
|
|
|
+ help='建议可以清理的低频TAG')
|
|
|
+ parser.add_argument('--threshold', type=float, default=0.01,
|
|
|
+ help='清理建议的阈值(默认0.01,即0.01%%)', metavar='PERCENT')
|
|
|
+ parser.add_argument('--show-long-logs', type=int, nargs='?', const=5, metavar='N',
|
|
|
+ help='显示超长日志的示例(默认TOP 5)')
|
|
|
+
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ # 检查文件是否存在
|
|
|
+ if not Path(args.logfile).exists():
|
|
|
+ print(f"错误: 文件不存在: {args.logfile}")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ # 分析日志
|
|
|
+ analyzer = LogTagAnalyzer()
|
|
|
+ analyzer.analyze_file(args.logfile)
|
|
|
+
|
|
|
+ # 打印报告
|
|
|
+ analyzer.print_report(top_n=args.top, min_count=args.min_count)
|
|
|
+
|
|
|
+ # 导出CSV
|
|
|
+ if args.export:
|
|
|
+ analyzer.export_csv(args.export, min_count=args.min_count)
|
|
|
+
|
|
|
+ # 导出JSON
|
|
|
+ if args.export_json:
|
|
|
+ analyzer.export_json(args.export_json)
|
|
|
+
|
|
|
+ # 显示超长日志示例
|
|
|
+ if args.show_long_logs is not None:
|
|
|
+ analyzer.show_long_log_samples(top_n=args.show_long_logs)
|
|
|
+
|
|
|
+ # 清理建议
|
|
|
+ if args.suggest_cleanup:
|
|
|
+ suggestions = analyzer.suggest_cleanup(threshold_percentage=args.threshold)
|
|
|
+ if suggestions:
|
|
|
+ print(f"\n建议清理的TAG(占比 < {args.threshold}%):")
|
|
|
+ print("=" * 100)
|
|
|
+ for i, tag in enumerate(suggestions, 1):
|
|
|
+ print(f"{i}. {tag}")
|
|
|
+ print(f"\n共 {len(suggestions)} 个TAG建议清理")
|
|
|
+ else:
|
|
|
+ print(f"\n没有找到需要清理的TAG(占比 < {args.threshold}%)")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|
|
|
+
|