本方案将详细介绍如何使用周纯英等人的GKCI模型(基于图神经网络的关键类识别方法)来评估五个不同指标集(DM、NM、WDNM、WDNM+DM、DM+NM)在8个Java开源软件系统上的性能表现,重点计算每个指标集的TP(真阳性)、FN(假阴性)、FP(假阳性)数量,并进而计算TPR(真正率)和FPR(假正率)。整个流程将基于Linux环境实现,并采用PyTorch和NetworkX等Python库进行关键类识别的建模与评估。
一、问题背景与方法概述
关键类识别是软件工程中的重要研究方向,它帮助开发者快速定位系统中最重要的类,从而提高软件理解和维护的效率。目前,关键类识别方法主要分为两类:基于非训练框架的指标计算方法和基于机器学习的有监督方法。本方案将结合这两种方法的优势,使用GKCI模型(基于图神经网络的有监督方法)来评估五个不同的指标集,以确定哪种指标集在关键类识别任务中表现最佳。
GKCI模型的核心思想是将软件系统抽象为类依赖网络,利用Node2Vec得到类节点的表征向量,再通过图神经网络进行节点重要性分值的聚合操作。该方法不仅考虑了类节点之间的依赖方向和权重,还通过中心性调整机制优化了节点得分,从而提高了关键类识别的准确性。在本方案中,我们将使用GKCI模型的研究方法,分别对五个指标集进行评估,以确定哪种指标组合最适合关键类识别任务。
二、数据准备与网络构建
1. 软件系统数据获取
首先,需要确保已经正确下载并解压了所有指定版本的Java开源软件系统:
# 创建项目目录并下载软件系统
mkdir -p java_software_systems
cd java_software_systems
# Apache Ant 1.6.1
wget https://archive.apache.org/dist/ant/binaries/ant-1.6.1-bin.tar.gz
tar -xzf ant-1.6.1-bin.tar.gz
# jEdit 5.1.0
wget https://sourceforge.net/projects/jedit/files/jEdit%205.1.0/jedit-5.1.0.tar.gz/download
tar -xzf jedit-5.1.0.tar.gz
# jHotDraw 6.0b.1
wget https://sourceforge.net/projects/jhotdraw/files/jHotDraw%206.0b.1/jhotdraw-6.0b.1.tar.gz/download
tar -xzf jhotdraw-6.0b.1.tar.gz
# jMeter 2.0.1
wget https://archive.apache.org/dist/jmeter/binaries/jmeter-2.0.1.zip
unzip jmeter-2.0.1.zip
# ArgoUML 0.9.5
wget https://sourceforge.net/projects/argouml/files/ArgoUML%200.9.5/argouml-0.9.5.tar.gz/download
tar -xzf argouml-0.9.5.tar.gz
# wro4j 1.6.3
wget https://sourceforge.net/projects/wro4j/files/wro4j-1.6.3/wro4j-1.6.3.tar.gz/download
tar -xzf wro4j-1.6.3.tar.gz
2. 类依赖关系提取
使用java-callgraph2工具提取类级别的依赖关系:
# 安装java-callgraph2
git clone https://github.com/Adrninistrator/java-callgraph2.git
cd java-callgraph2
mvn clean install
# 生成调用图
java -jar target/java-callgraph2.jar -source /path/to/project -output callgraph.txt
3. 构建加权有向网络
编写Python脚本处理依赖关系并构建加权有向图:
import networkx as nx
import re
import os
def build_weighted directed_graph(project_path, output_file):
# 初始化加权有向图
G = nx.DiGraph()
# 遍历项目中的Java类文件
for root, dirs, files in os.walk(project_path):
for file in files:
if file.endswith('.java'):
class_name = os.path.join(root, file).replace('.java', '.class').replace(os.pathsep, '.')
class_name = class_name.split('src/')[1].split('.class')[0].replace('/', '.') # 转换为标准类名格式
G.add_node(class_name)
# 提取调用关系并构建边
call关系 = {}
with open('callgraph.txt', 'r') as f:
for line in f:
if '->' in line:
parts = line.strip().split(' -> ')
caller = parts[0].split('.')[-1] + '.class' # 转换为类名
callee = parts[1].split('.')[-1] + '.class'
if caller in G.nodes and callee in G.nodes:
key = (caller, callee)
if key in call关系:
call关系[key] += 1 # 统计调用次数作为权重
else:
call关系[key] = 1
# 添加边到图中
for ( caller, callee ), weight in call关系.items():
G.add_edge(caller, callee, weight=weight)
# 过滤第三方类依赖
third_party_classes = ['java.util.*', 'org.springframework.*', ...] # 添加需要过滤的第三方类
edges_to_remove = []
for u, v, d in G.edges(data=True):
if any(re.match(pattern, u) or re.match(pattern, v) for pattern in third_party_classes):
edges_to_remove.append((u, v))
G.remove_edges_from(edges_to_remove)
# 保存图结构
nx.write_weighted_edgelist(G, output_file, delimiter=' ')
return G
修改
import networkx as nx
import re
import os
def build_weighted_directed_graph(project_path, output_file):
G = nx.DiGraph()
# 遍历项目中的Java类文件
for root, dirs, files in os.walk(project_path):
for file in files:
if file.endswith('.java'):
class_name = os.path.join(root, file).replace('.java', '.class').replace(os.pathsep, '.')
class_name = class_name.split('src/')[1].split('.class')[0].replace('/', '.') # **修正路径解析逻辑**
G.add_node(class_name)
# 提取调用关系并构建边
call_relations = {}
with open('callgraph.txt', 'r') as f:
for line in f:
if '->' in line:
parts = line.strip().split(' -> ')
caller = parts[0].split('.')[-1] + '.class'
callee = parts[1].split('.')[-1] + '.class'
if caller in G.nodes and callee in G.nodes:
key = (caller, callee)
call_relations[key] = call_relations.get(key, 0) + 1 # **修正边权重统计逻辑**
# 添加边到图中
for (caller, callee), weight in call_relations.items():
G.add_edge(caller, callee, weight=weight)
# 过滤第三方类依赖
third_party_classes = ['java.util.*', 'org.springframework.*'] # **添加具体过滤规则**
edges_to_remove = []
for u, v, d in G.edges(data=True):
if any(re.match(pattern, u) or re.match(pattern, v) for pattern in third_party_classes):
edges_to_remove.append((u, v))
G.remove_edges_from(edges_to_remove)
# 保存图结构
nx.write_weighted_edgelist(G, output_file, delimiter=' ')
return G
4. 真实关键类标注

依照上表做出我们的ApacheAnt、jEdit、jHotDraw、jMeter、ArgoUML、wro4j版本分别为1.6.1、5.1.0、6.0b.1、2.0.1、0.9.5、1.6.3的8个开源Java软件的表,然后按照以下步骤:
根据软件系统的实际结构和重要性,标注真实的关键类:
# 示例:Apache Ant的真实关键类
ant_key_classes = ['org.apache工具包 project ant.AntProject',
'org.apache.ant project ant.AntProject',
...] # 添加Apache Ant的真实关键类
# 创建真实标签字典
y_true = {}
for class_name in G.nodes:
y_true班级] = 1 if class_name in ant_key_classes else 0
# 保存真实标签
with open('ant_y_true.txt', 'w') as f:
for class_name, label in y_true.items():
f.write(f"{class_name}\t{label}\n")
2. NM特征提取
在无权图上计算传统网络指标(NM):
import networkx as nx
def extract_nm_features(G):
# 转换为无权图
G_unweighted = nx.Graph()
G_unweighted.add_nodes_from(G.nodes())
for u, v, d in G.edges(data=True):
G_unweighted.add_edge(u, v)
# 计算网络指标
nm_features = {
'betweenness': nx.betweenness_centrality(G_unweighted),
'Pagerank': nx.pagerank(G_unweighted),
'in_degree': nx.in_degree_centrality(G),
'out_degree': nx.out_degree_centrality(G)
}
# 转换为特征矩阵
class_names = sorted(G.nodes())
X_nm = np.zeros((len(class_names), 4))
for i, class_name in enumerate(class_names):
X_nm[i] = [nm_features['betweenness'][class_name],
nm_features['Pagerank'][class_name],
nm_features['in_degree'][class_name],
nm_features['out_degree'][class_name]]
return X_nm
3. WDNM特征提取
在加权有向图上计算加权网络指标(WDNM):
def extract_wdnm_features(G):
# 计算加权介数中心性
wdnm_betweenness = nx.betweenness_centrality(G, weight='weight')
# 计算加权PageRank
wdnm_pagerank = nx.pagerank(G, alpha=0.85, weight='weight')
# 计算加权度中心性
wdnm_in_degree = nx.in_degree_centrality(G)
wdnm_out_degree = nx.out_degree_centrality(G)
# 转换为特征矩阵
class_names = sorted(G.nodes())
X_wdnm = np.zeros((len(class_names), 4))
for i, class_name in enumerate(class_names):
X_wdnm[i] = [wdnm_betweenness班级],
wdnm_pagerank班级],
wdnm_in_degree班级],
wdnm_out_degree班级]]
return X_wdnm
4. 特征组合
组合不同指标集的特征:
# 组合WDNM+DM
X_wdnm_dm = np.concatenate([X_wdnm, X_dm], axis=1)
# 组合DM+NM
X_dm_nm = np.concatenate([X_dm, X_nm], axis=1)
# 组合WDNM+NM
X_wdnm_nm = np.concatenate([X_wdnm, X_nm], axis=1)
四、模型构建与训练
1. Node2Vec嵌入生成
使用node2vec库生成节点嵌入:
# 安装node2vec
pip install node2vec
# 生成嵌入
python -m node2vec --input graph_weighted.edgelist --output vectors.bin --dimensions 64 --walk-length 30 --num-walks 200 --window-size 10 --iter 5 --workers 4 --emb-format bin
2. PyTorch Geometric数据准备
将数据转换为PyTorch Geometric格式:
import torch
from torch_geometric.data import Data
import numpy as np
def prepare_data(G, X_features, y_true_file):
# 读取真实标签
y_true = {}
with open(y_true_file, 'r') as f:
for line in f:
class_name, label = line.strip().split('\t')
y_true班级] = int(label)
# 获取节点列表并排序
class_names = sorted(G.nodes())
# 转换为PyTorch张量
x = torch.tensor(X_features, dtype=torch.float)
y = torch.tensor([y_true.get(class_name, 0) for class_name in class_names], dtype=torch.long)
# 构建边索引
edge_index = []
for u, v in G_weighted edges():
edge_index.append([class_names.index(u), class_names.index(v)])
edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
# 创建Data对象
data = Data(x=x, edge_index[edge_index], y=y)
return data
3. GKCI模型实现
实现改进的图神经网络模型:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, MessagePassing
class DirectionalAggregation(nn.Module):
def __init__(self, in_channels, out_channels):
super().__init__()
self消息传递 = MessagePassing()
# 前驱邻居聚合
self.in消息传递 = GCNConv(in_channels, out_channels, add SelfLoops=False)
# 后继邻居聚合
self.out消息传递 = GCNConv(in_channels, out_channels, add SelfLoops=False)
# 中心性调整
self中心性调整 = nn.Linear(1, 1) # 可学习参数γ和β
def forward(self, x, edge_index):
# 计算前驱和后继邻居的分数
in_scores = self.in消息传递(x, edge_index)
out_scores = self.out消息传递(x, edge_index)
# 拼接并更新节点分数
new_scores = torch.cat([x, in_scores, out_scores], dim=1)
return new_scores
class GKCIModel(nn.Module):
def __init__(self, in_channels, hidden_channels=64):
super().__init__()
self消息传递层 = nn消息传递层([
GCNConv(in_channels, hidden_channels),
GCNConv(hidden_channels, hidden_channels),
DirectionalAggregation(hidden_channels, hidden_channels)
])
# 节点得分映射
self.scoring_net = nn.Linear(hidden_channels, 1)
# 中心性调整
self中心性调整 = nn.Linear(1, 1)
def forward(self, data):
x, edge_index = data.x, data.edge_index
# 图神经网络聚合
x = self消息传递层(x, edge_index)
# 计算节点初始分数
s_0 = self.scoring_net(x).squeeze()
# 计算节点中心性
degree = torch.tensor([G_weighted学位(node) for node in G_weighted.nodes], dtype=torch.float).view(-1, 1)
c = torch.log(degree + 1e-6) # ε=1e-6
c_adjusted = self中心性调整(c) # γ和β可学习
# 最终得分调整
final_score = c_adjusted * s_0
return final_score
4. 模型训练
编写模型训练函数:
from torch_geometric.data import Dataset, Data
from torch_geometric.nn import GCNConv
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
def train_model(data, model, num_epochs=100, learning_rate=0.001):
# 划分训练集和测试集
train_mask, test_mask = train_test_split(torch.arange(data.x.shape[0])), test_size=0.2)
train_mask = torch.tensor(train_mask, dtype=torch.uint8)
test_mask = torch.tensor(test_mask, dtype=torch.uint8)
# 定义损失函数和优化器
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 训练循环
for epoch in range(num_epochs):
model.train()
optimizer.zero_grad()
# 前向传播
out = model(data)
# 计算损失
loss = criterion(out[train_mask], data.y浮点数)[train_mask])
# 反向传播
loss.backward()
optimizer.step()
# 评估
if epoch % 10 == 0:
model.eval()
with torch.no_grad():
pred = model(data).sigmoid()
# 计算测试集上的混淆矩阵
conf_matrix = confusion_matrix(data.y浮点数)[test_mask], pred浮点数)[test_mask] >= 0.5)
TP = conf_matrix[1][1]
FN = conf_matrix[1][0]
FP = conf_matrix[0][1]
TN = conf_matrix[0][0]
print(f"Epoch {epoch}:")
print(f"TP={TP}, FN={FN}, FP={FP}, TN={TN}")
print(f"TPR={TP/(TP+FN):.4f}, FPR={FP/(FP+TN):.4f}")
print(classification_report(data.y浮点数)[test_mask], pred浮点数)[test_mask] >= 0.5))
return model
五、结果评估与TPR/FPR计算
1. 模型预测与二值化
对每个指标集进行预测并二值化结果:
def evaluate_model(data, model, threshold=0.5):
model.eval()
with torch.no_grad():
scores = model(data)
pred_labels = (scores >= threshold).float()
return pred_labels
2. 计算TP、FN、FP
编写函数计算真阳性、假阴性、假阳性数量:
def calculate_metrics(y_true, pred_labels):
TP = np.sum((pred_labels == 1) & (y_true == 1))
FN = np.sum((pred_labels == 0) & (y_true == 1))
FP = np.sum((pred_labels == 1) & (y_true == 0))
TN = np.sum((pred_labels == 0) & (y_true == 0))
return TP, FN, FP, TN
3. 计算TPR和FPR
编写函数计算真正率和假正率:
def calculate_tpr_fpr(TP, FN, FP, TN):
TPR = TP / (TP + FN) if (TP + FN) > 0 else 0
FPR = FP / (FP + TN) if (FP + TN) > 0 else 0
return TPR, FPR
4. 评估所有指标集
编写主评估函数:
def evaluate_all_metrics(G, X_features_dict, y_true_file, num_epochs=100, learning_rate=0.001):
# 准备数据
data = prepare_data(G, X_features_dict, y_true_file)
# 初始化模型
model = GKCIModel(in_channels=data.x.shape[1])
# 训练模型
trained_model = train_model(data, model, num_epochs, learning_rate)
# 评估模型
pred_labels = evaluate_model(data, trained_model)
# 计算TP、FN、FP
TP, FN, FP, TN = calculate_metrics(data.y浮点数), pred_labels浮点数)
# 计算TPR和FPR
TPR, FPR = calculate_tpr_fpr(TP, FN, FP, TN)
return TP, FN, FP, TPR, FPR
5. 批量评估所有软件系统
编写批量评估脚本:
import os
import networkx as nx
import numpy as np
# 定义指标集字典
指标集 = {
'DM': X_dm,
'NM': X_nm,
'WDNM': X_wdnm,
'WDNM+DM': X_wdnm_dm,
'DM+NM': X_dm_nm
}
# 遍历所有软件系统
software_systems = ['ApacheAnt', 'jEdit', 'jHotDraw', 'jMeter', 'ArgoUML', 'wro4j']
results = {}
for system in software_systems:
# 构建图
G = build_weighted_directed_graph(f'./{system}/src', f'./{system}_graph.edgelist')
# 提取特征
X features = {
'DM': extract_dm_features(f'./{system}/src'),
'NM': extract_nm_features(G),
'WDNM': extract_wdnm_features(G),
'WDNM+DM': np.concatenate([extract_wdnm_features(G), extract_dm_features(f'./{system}/src')], axis=1),
'DM+NM': np.concatenate([extract_dm_features(f'./{system}/src'), extract_nm_features(G)], axis=1)
}
# 评估所有指标集
results[system] = {}
for metric_set, X in X_features.items():
TP, FN, FP, TPR, FPR = evaluate_all_metrics(G, X, f'./{system}_y_true.txt')
results[system][metric_set] = {
'TP': TP,
'FN': FN,
'FP': FP,
'TPR': TPR,
'FPR': FPR
}
# 保存结果
import json
with open('results.json', 'w') as f:
json.dump(results, f, indent=4)
六、完整代码实现
1. 类依赖关系提取与图构建
import networkx as nx
import re
import os
def build_weighted_directed_graph(project_path, output_file):
# 初始化加权有向图
G = nx.DiGraph()
# 遍历项目中的Java类文件
for root, dirs, files in os.walk(project_path):
for file in files:
if file.endswith('.java'):
class_name = os.path.join(root, file).replace('.java', '.class').replace(os.pathsep, '.')
class_name = class_name.split('src/')[1].split('.class')[0].replace('/', '.') # 转换为标准类名格式
G.add_node(class_name)
# 提取调用关系并构建边
call关系 = {}
with open('callgraph.txt', 'r') as f:
for line in f:
if '->' in line:
parts = line.strip().split(' -> ')
caller = parts[0].split('.')[-1] + '.class' # 转换为类名
callee = parts[1].split('.')[-1] + '.class'
if caller in G.nodes and callee in G.nodes:
key = (caller, callee)
if key in call关系:
call关系[key] += 1 # 统计调用次数作为权重
else:
call关系[key] = 1
# 添加边到图中
for ( caller, callee ), weight in call关系.items():
G.add_edge(caller, callee, weight=weight)
# 过滤第三方类依赖
third_party_classes = ['java.util.*', 'org.springframework.*', ...] # 添加需要过滤的第三方类
edges_to_remove = []
for u, v, d in G.edges(data=True):
if any(re.match(pattern, u) or re.match(pattern, v) for pattern in third_party_classes):
edges_to_remove.append((u, v))
G.remove_edges_from(edges_to_remove)
# 保存图结构
nx.write_weighted_edgelist(G, output_file, delimiter=' ')
return G
2. 特征提取
import javalang
import re
import os
import numpy as np
def extract_dm_features(项目路径):
# 初始化特征字典
dm_features = {}
# 遍历项目中的Java类文件
for root, dirs, files in os.walk(项目路径):
for file in files:
if file.endswith('.java'):
class_name = os.path.join(root, file).replace('.java', '.class').replace(os.pathsep, '.')
class_name = class_name.split('src/')[1].split('.class')[0].replace('/', '.') # 转换为标准类名格式
# 读取并解析Java文件
try:
with open(os.path.join(root, file), 'r', encoding='utf-8') as f:
file_content = f.read()
tree = javalang.parse.parse(file_content)
except:
continue
# 提取特征
features = {
'numMethods': 0,
'numLines': 0,
'圈复杂度': 0,
'继承深度': 0
}
# 统计方法数量
for node in tree.findall(javalang.tree.MethodDeclaration):
features['numMethods'] += 1
# 统计代码行数
for path, node injavalangastwalk(file_content):
if isinstance(node, javalang.tree.MethodDeclaration):
start_line = node.position.line
end_line = node.position.end.line
features['numLines'] += end_line - start_line + 1
# 计算继承深度
for node in tree.findall(javalang.tree ClassOrInterface Declaration):
current_class = node
depth = 0
while True:
if len(current_class.getExtendedTypes()) == 0:
break
else:
parent_type = current_class.getExtendedTypes()[0].name
parent_class = None
for parent_node in tree.findall(javalang.tree ClassOrInterface Declaration):
if parent_node.nam
修改
import javalang
import numpy as np
def extract_dm_features(project_path):
dm_features = {}
for root, dirs, files in os.walk(project_path):
for file in files:
if file.endswith('.java'):
class_name = os.path.join(root, file).replace('.java', '.class').replace(os.pathsep, '.')
class_name = class_name.split('src/')[1].split('.class')[0].replace('/', '.')
try:
with open(os.path.join(root, file), 'r', encoding='utf-8') as f:
file_content = f.read()
tree = javalang.parse.parse(file_content)
except Exception as e:
print(f"Error parsing {file}: {e}")
continue
num_methods = len([n for n in tree if isinstance(n, javalang.tree.MethodDeclaration)])
num_lines = sum(1 for _ in file_content.splitlines())
inheritance_depth = 0 # **需补充继承深度计算逻辑**
dm_features[class_name] = {
'numMethods': num_methods,
'numLines': num_lines,
'inheritanceDepth': inheritance_depth
}
class_names = sorted(G.nodes())
X_dm = np.zeros((len(class_names), 3))
for i, class_name in enumerate(class_names):
if class_name in dm_features:
X_dm[i] = [dm_features[class_name]['numMethods'],
dm_features[class_name]['numLines'],
dm_features[class_name]['inheritanceDepth']]
else:
X_dm[i] = [0, 0, 0]
return X_dm
Comments NOTHING