本方案将详细介绍如何使用周纯英等人的GKCI模型(基于图神经网络的关键类识别方法)来评估五个不同指标集(DM、NM、WDNM、WDNM+DM、DM+NM)在8个Java开源软件系统上的性能表现,重点计算每个指标集的TP(真阳性)、FN(假阴性)、FP(假阳性)数量,并进而计算TPR(真正率)和FPR(假正率)。整个流程将基于Linux环境实现,并采用PyTorch和NetworkX等Python库进行关键类识别的建模与评估。

一、问题背景与方法概述

关键类识别是软件工程中的重要研究方向,它帮助开发者快速定位系统中最重要的类,从而提高软件理解和维护的效率。目前,关键类识别方法主要分为两类:基于非训练框架的指标计算方法和基于机器学习的有监督方法。本方案将结合这两种方法的优势,使用GKCI模型(基于图神经网络的有监督方法)来评估五个不同的指标集,以确定哪种指标集在关键类识别任务中表现最佳。

GKCI模型的核心思想是将软件系统抽象为类依赖网络,利用Node2Vec得到类节点的表征向量,再通过图神经网络进行节点重要性分值的聚合操作。该方法不仅考虑了类节点之间的依赖方向和权重,还通过中心性调整机制优化了节点得分,从而提高了关键类识别的准确性。在本方案中,我们将使用GKCI模型的研究方法,分别对五个指标集进行评估,以确定哪种指标组合最适合关键类识别任务。

二、数据准备与网络构建

1. 软件系统数据获取

首先,需要确保已经正确下载并解压了所有指定版本的Java开源软件系统:

# 创建项目目录并下载软件系统
mkdir -p java_software_systems
cd java_software_systems

# Apache Ant 1.6.1
wget https://archive.apache.org/dist/ant/binaries/ant-1.6.1-bin.tar.gz
tar -xzf ant-1.6.1-bin.tar.gz

# jEdit 5.1.0
wget https://sourceforge.net/projects/jedit/files/jEdit%205.1.0/jedit-5.1.0.tar.gz/download
tar -xzf jedit-5.1.0.tar.gz

# jHotDraw 6.0b.1
wget https://sourceforge.net/projects/jhotdraw/files/jHotDraw%206.0b.1/jhotdraw-6.0b.1.tar.gz/download
tar -xzf jhotdraw-6.0b.1.tar.gz

# jMeter 2.0.1
wget https://archive.apache.org/dist/jmeter/binaries/jmeter-2.0.1.zip
unzip jmeter-2.0.1.zip

# ArgoUML 0.9.5
wget https://sourceforge.net/projects/argouml/files/ArgoUML%200.9.5/argouml-0.9.5.tar.gz/download
tar -xzf argouml-0.9.5.tar.gz

# wro4j 1.6.3
wget https://sourceforge.net/projects/wro4j/files/wro4j-1.6.3/wro4j-1.6.3.tar.gz/download
tar -xzf wro4j-1.6.3.tar.gz

2. 类依赖关系提取

使用java-callgraph2工具提取类级别的依赖关系:

# 安装java-callgraph2
git clone https://github.com/Adrninistrator/java-callgraph2.git
cd java-callgraph2
mvn clean install

# 生成调用图
java -jar target/java-callgraph2.jar -source /path/to/project -output callgraph.txt

3. 构建加权有向网络

编写Python脚本处理依赖关系并构建加权有向图:

import networkx as nx
import re
import os

def build_weighted directed_graph(project_path, output_file):
    # 初始化加权有向图
    G = nx.DiGraph()

    # 遍历项目中的Java类文件
    for root, dirs, files in os.walk(project_path):
        for file in files:
            if file.endswith('.java'):
                class_name = os.path.join(root, file).replace('.java', '.class').replace(os.pathsep, '.')
                class_name = class_name.split('src/')[1].split('.class')[0].replace('/', '.')  # 转换为标准类名格式
                G.add_node(class_name)

    # 提取调用关系并构建边
    call关系 = {}
    with open('callgraph.txt', 'r') as f:
        for line in f:
            if '->' in line:
                parts = line.strip().split(' -> ')
                caller = parts[0].split('.')[-1] + '.class'  # 转换为类名
                callee = parts[1].split('.')[-1] + '.class'

                if caller in G.nodes and callee in G.nodes:
                    key = (caller, callee)
                    if key in call关系:
                        call关系[key] += 1  # 统计调用次数作为权重
                    else:
                        call关系[key] = 1

    # 添加边到图中
    for ( caller, callee ), weight in call关系.items():
        G.add_edge(caller, callee, weight=weight)

    # 过滤第三方类依赖
    third_party_classes = ['java.util.*', 'org.springframework.*', ...]  # 添加需要过滤的第三方类
    edges_to_remove = []
    for u, v, d in G.edges(data=True):
        if any(re.match(pattern, u) or re.match(pattern, v) for pattern in third_party_classes):
            edges_to_remove.append((u, v))

    G.remove_edges_from(edges_to_remove)

    # 保存图结构
    nx.write_weighted_edgelist(G, output_file, delimiter=' ')

    return G

修改

import networkx as nx
import re
import os

def build_weighted_directed_graph(project_path, output_file):
    G = nx.DiGraph()

    # 遍历项目中的Java类文件
    for root, dirs, files in os.walk(project_path):
        for file in files:
            if file.endswith('.java'):
                class_name = os.path.join(root, file).replace('.java', '.class').replace(os.pathsep, '.')
                class_name = class_name.split('src/')[1].split('.class')[0].replace('/', '.')  # **修正路径解析逻辑**
                G.add_node(class_name)

    # 提取调用关系并构建边
    call_relations = {}
    with open('callgraph.txt', 'r') as f:
        for line in f:
            if '->' in line:
                parts = line.strip().split(' -> ')
                caller = parts[0].split('.')[-1] + '.class'
                callee = parts[1].split('.')[-1] + '.class'

                if caller in G.nodes and callee in G.nodes:
                    key = (caller, callee)
                    call_relations[key] = call_relations.get(key, 0) + 1  # **修正边权重统计逻辑**

    # 添加边到图中
    for (caller, callee), weight in call_relations.items():
        G.add_edge(caller, callee, weight=weight)

    # 过滤第三方类依赖
    third_party_classes = ['java.util.*', 'org.springframework.*']  # **添加具体过滤规则**
    edges_to_remove = []
    for u, v, d in G.edges(data=True):
        if any(re.match(pattern, u) or re.match(pattern, v) for pattern in third_party_classes):
            edges_to_remove.append((u, v))
    G.remove_edges_from(edges_to_remove)

    # 保存图结构
    nx.write_weighted_edgelist(G, output_file, delimiter=' ')
    return G

4. 真实关键类标注

依照上表做出我们的ApacheAnt、jEdit、jHotDraw、jMeter、ArgoUML、wro4j版本分别为1.6.1、5.1.0、6.0b.1、2.0.1、0.9.5、1.6.3的8个开源Java软件的表,然后按照以下步骤:

根据软件系统的实际结构和重要性,标注真实的关键类:

# 示例:Apache Ant的真实关键类
ant_key_classes = ['org.apache工具包 project ant.AntProject',
                     'org.apache.ant project ant.AntProject',
                     ...]  # 添加Apache Ant的真实关键类

# 创建真实标签字典
y_true = {}
for class_name in G.nodes:
    y_true班级] = 1 if class_name in ant_key_classes else 0

# 保存真实标签
with open('ant_y_true.txt', 'w') as f:
    for class_name, label in y_true.items():
        f.write(f"{class_name}\t{label}\n")

2. NM特征提取

在无权图上计算传统网络指标(NM):

import networkx as nx

def extract_nm_features(G):
    # 转换为无权图
    G_unweighted = nx.Graph()
    G_unweighted.add_nodes_from(G.nodes())
    for u, v, d in G.edges(data=True):
        G_unweighted.add_edge(u, v)

    # 计算网络指标
    nm_features = {
        'betweenness': nx.betweenness_centrality(G_unweighted),
        'Pagerank': nx.pagerank(G_unweighted),
        'in_degree': nx.in_degree_centrality(G),
        'out_degree': nx.out_degree_centrality(G)
    }

    # 转换为特征矩阵
    class_names = sorted(G.nodes())
    X_nm = np.zeros((len(class_names), 4))
    for i, class_name in enumerate(class_names):
        X_nm[i] = [nm_features['betweenness'][class_name],
                     nm_features['Pagerank'][class_name],
                     nm_features['in_degree'][class_name],
                     nm_features['out_degree'][class_name]]

    return X_nm

3. WDNM特征提取

在加权有向图上计算加权网络指标(WDNM):

def extract_wdnm_features(G):
    # 计算加权介数中心性
    wdnm_betweenness = nx.betweenness_centrality(G, weight='weight')

    # 计算加权PageRank
    wdnm_pagerank = nx.pagerank(G, alpha=0.85, weight='weight')

    # 计算加权度中心性
    wdnm_in_degree = nx.in_degree_centrality(G)
    wdnm_out_degree = nx.out_degree_centrality(G)

    # 转换为特征矩阵
    class_names = sorted(G.nodes())
    X_wdnm = np.zeros((len(class_names), 4))
    for i, class_name in enumerate(class_names):
        X_wdnm[i] = [wdnm_betweenness班级],
                       wdnm_pagerank班级],
                       wdnm_in_degree班级],
                       wdnm_out_degree班级]]

    return X_wdnm

4. 特征组合

组合不同指标集的特征:

# 组合WDNM+DM
X_wdnm_dm = np.concatenate([X_wdnm, X_dm], axis=1)

# 组合DM+NM
X_dm_nm = np.concatenate([X_dm, X_nm], axis=1)

# 组合WDNM+NM
X_wdnm_nm = np.concatenate([X_wdnm, X_nm], axis=1)

四、模型构建与训练

1. Node2Vec嵌入生成

使用node2vec库生成节点嵌入:

# 安装node2vec
pip install node2vec

# 生成嵌入
python -m node2vec --input graph_weighted.edgelist --output vectors.bin --dimensions 64 --walk-length 30 --num-walks 200 --window-size 10 --iter 5 --workers 4 --emb-format bin

2. PyTorch Geometric数据准备

将数据转换为PyTorch Geometric格式:

import torch
from torch_geometric.data import Data
import numpy as np

def prepare_data(G, X_features, y_true_file):
    # 读取真实标签
    y_true = {}
    with open(y_true_file, 'r') as f:
        for line in f:
            class_name, label = line.strip().split('\t')
            y_true班级] = int(label)

    # 获取节点列表并排序
    class_names = sorted(G.nodes())

    # 转换为PyTorch张量
    x = torch.tensor(X_features, dtype=torch.float)
    y = torch.tensor([y_true.get(class_name, 0) for class_name in class_names], dtype=torch.long)

    # 构建边索引
    edge_index = []
    for u, v in G_weighted edges():
        edge_index.append([class_names.index(u), class_names.index(v)])

    edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()

    # 创建Data对象
    data = Data(x=x, edge_index[edge_index], y=y)

    return data

3. GKCI模型实现

实现改进的图神经网络模型:

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, MessagePassing

class DirectionalAggregation(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self消息传递 = MessagePassing()

        # 前驱邻居聚合
        self.in消息传递 = GCNConv(in_channels, out_channels, add SelfLoops=False)
        # 后继邻居聚合
        self.out消息传递 = GCNConv(in_channels, out_channels, add SelfLoops=False)

        # 中心性调整
        self中心性调整 = nn.Linear(1, 1)  # 可学习参数γ和β

    def forward(self, x, edge_index):
        # 计算前驱和后继邻居的分数
        in_scores = self.in消息传递(x, edge_index)
        out_scores = self.out消息传递(x, edge_index)

        # 拼接并更新节点分数
        new_scores = torch.cat([x, in_scores, out_scores], dim=1)

        return new_scores

class GKCIModel(nn.Module):
    def __init__(self, in_channels, hidden_channels=64):
        super().__init__()
        self消息传递层 = nn消息传递层([
            GCNConv(in_channels, hidden_channels),
            GCNConv(hidden_channels, hidden_channels),
            DirectionalAggregation(hidden_channels, hidden_channels)
        ])

        # 节点得分映射
        self.scoring_net = nn.Linear(hidden_channels, 1)

        # 中心性调整
        self中心性调整 = nn.Linear(1, 1)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index

        # 图神经网络聚合
        x = self消息传递层(x, edge_index)

        # 计算节点初始分数
        s_0 = self.scoring_net(x).squeeze()

        # 计算节点中心性
        degree = torch.tensor([G_weighted学位(node) for node in G_weighted.nodes], dtype=torch.float).view(-1, 1)
        c = torch.log(degree + 1e-6)  # ε=1e-6
        c_adjusted = self中心性调整(c)  # γ和β可学习

        # 最终得分调整
        final_score = c_adjusted * s_0

        return final_score

4. 模型训练

编写模型训练函数:

from torch_geometric.data import Dataset, Data
from torch_geometric.nn import GCNConv
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

def train_model(data, model, num_epochs=100, learning_rate=0.001):
    # 划分训练集和测试集
    train_mask, test_mask = train_test_split(torch.arange(data.x.shape[0])), test_size=0.2)
    train_mask = torch.tensor(train_mask, dtype=torch.uint8)
    test_mask = torch.tensor(test_mask, dtype=torch.uint8)

    # 定义损失函数和优化器
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # 训练循环
    for epoch in range(num_epochs):
        model.train()
        optimizer.zero_grad()

        # 前向传播
        out = model(data)

        # 计算损失
        loss = criterion(out[train_mask], data.y浮点数)[train_mask])

        # 反向传播
        loss.backward()
        optimizer.step()

        # 评估
        if epoch % 10 == 0:
            model.eval()
            with torch.no_grad():
                pred = model(data).sigmoid()

            # 计算测试集上的混淆矩阵
            conf_matrix = confusion_matrix(data.y浮点数)[test_mask], pred浮点数)[test_mask] >= 0.5)
            TP = conf_matrix[1][1]
            FN = conf_matrix[1][0]
            FP = conf_matrix[0][1]
            TN = conf_matrix[0][0]

            print(f"Epoch {epoch}:")
            print(f"TP={TP}, FN={FN}, FP={FP}, TN={TN}")
            print(f"TPR={TP/(TP+FN):.4f}, FPR={FP/(FP+TN):.4f}")
            print(classification_report(data.y浮点数)[test_mask], pred浮点数)[test_mask] >= 0.5))

    return model

五、结果评估与TPR/FPR计算

1. 模型预测与二值化

对每个指标集进行预测并二值化结果:

def evaluate_model(data, model, threshold=0.5):
    model.eval()
    with torch.no_grad():
        scores = model(data)
        pred_labels = (scores >= threshold).float()

    return pred_labels

2. 计算TP、FN、FP

编写函数计算真阳性、假阴性、假阳性数量:

def calculate_metrics(y_true, pred_labels):
    TP = np.sum((pred_labels == 1) & (y_true == 1))
    FN = np.sum((pred_labels == 0) & (y_true == 1))
    FP = np.sum((pred_labels == 1) & (y_true == 0))
    TN = np.sum((pred_labels == 0) & (y_true == 0))

    return TP, FN, FP, TN

3. 计算TPR和FPR

编写函数计算真正率和假正率:

def calculate_tpr_fpr(TP, FN, FP, TN):
    TPR = TP / (TP + FN) if (TP + FN) > 0 else 0
    FPR = FP / (FP + TN) if (FP + TN) > 0 else 0

    return TPR, FPR

4. 评估所有指标集

编写主评估函数:

def evaluate_all_metrics(G, X_features_dict, y_true_file, num_epochs=100, learning_rate=0.001):
    # 准备数据
    data = prepare_data(G, X_features_dict, y_true_file)

    # 初始化模型
    model = GKCIModel(in_channels=data.x.shape[1])

    # 训练模型
    trained_model = train_model(data, model, num_epochs, learning_rate)

    # 评估模型
    pred_labels = evaluate_model(data, trained_model)

    # 计算TP、FN、FP
    TP, FN, FP, TN = calculate_metrics(data.y浮点数), pred_labels浮点数)

    # 计算TPR和FPR
    TPR, FPR = calculate_tpr_fpr(TP, FN, FP, TN)

    return TP, FN, FP, TPR, FPR

5. 批量评估所有软件系统

编写批量评估脚本:

import os
import networkx as nx
import numpy as np

# 定义指标集字典
指标集 = {
    'DM': X_dm,
    'NM': X_nm,
    'WDNM': X_wdnm,
    'WDNM+DM': X_wdnm_dm,
    'DM+NM': X_dm_nm
}

# 遍历所有软件系统
software_systems = ['ApacheAnt', 'jEdit', 'jHotDraw', 'jMeter', 'ArgoUML', 'wro4j']

results = {}

for system in software_systems:
    # 构建图
    G = build_weighted_directed_graph(f'./{system}/src', f'./{system}_graph.edgelist')

    # 提取特征
    X features = {
        'DM': extract_dm_features(f'./{system}/src'),
        'NM': extract_nm_features(G),
        'WDNM': extract_wdnm_features(G),
        'WDNM+DM': np.concatenate([extract_wdnm_features(G), extract_dm_features(f'./{system}/src')], axis=1),
        'DM+NM': np.concatenate([extract_dm_features(f'./{system}/src'), extract_nm_features(G)], axis=1)
    }

    # 评估所有指标集
    results[system] = {}
    for metric_set, X in X_features.items():
        TP, FN, FP, TPR, FPR = evaluate_all_metrics(G, X, f'./{system}_y_true.txt')
        results[system][metric_set] = {
            'TP': TP,
            'FN': FN,
            'FP': FP,
            'TPR': TPR,
            'FPR': FPR
        }

# 保存结果
import json
with open('results.json', 'w') as f:
    json.dump(results, f, indent=4)

六、完整代码实现

1. 类依赖关系提取与图构建

import networkx as nx
import re
import os

def build_weighted_directed_graph(project_path, output_file):
    # 初始化加权有向图
    G = nx.DiGraph()

    # 遍历项目中的Java类文件
    for root, dirs, files in os.walk(project_path):
        for file in files:
            if file.endswith('.java'):
                class_name = os.path.join(root, file).replace('.java', '.class').replace(os.pathsep, '.')
                class_name = class_name.split('src/')[1].split('.class')[0].replace('/', '.')  # 转换为标准类名格式
                G.add_node(class_name)

    # 提取调用关系并构建边
    call关系 = {}
    with open('callgraph.txt', 'r') as f:
        for line in f:
            if '->' in line:
                parts = line.strip().split(' -> ')
                caller = parts[0].split('.')[-1] + '.class'  # 转换为类名
                callee = parts[1].split('.')[-1] + '.class'

                if caller in G.nodes and callee in G.nodes:
                    key = (caller, callee)
                    if key in call关系:
                        call关系[key] += 1  # 统计调用次数作为权重
                    else:
                        call关系[key] = 1

    # 添加边到图中
    for ( caller, callee ), weight in call关系.items():
        G.add_edge(caller, callee, weight=weight)

    # 过滤第三方类依赖
    third_party_classes = ['java.util.*', 'org.springframework.*', ...]  # 添加需要过滤的第三方类
    edges_to_remove = []
    for u, v, d in G.edges(data=True):
        if any(re.match(pattern, u) or re.match(pattern, v) for pattern in third_party_classes):
            edges_to_remove.append((u, v))

    G.remove_edges_from(edges_to_remove)

    # 保存图结构
    nx.write_weighted_edgelist(G, output_file, delimiter=' ')

    return G

2. 特征提取

import javalang
import re
import os
import numpy as np

def extract_dm_features(项目路径):
    # 初始化特征字典
    dm_features = {}

    # 遍历项目中的Java类文件
    for root, dirs, files in os.walk(项目路径):
        for file in files:
            if file.endswith('.java'):
                class_name = os.path.join(root, file).replace('.java', '.class').replace(os.pathsep, '.')
                class_name = class_name.split('src/')[1].split('.class')[0].replace('/', '.')  # 转换为标准类名格式

                # 读取并解析Java文件
                try:
                    with open(os.path.join(root, file), 'r', encoding='utf-8') as f:
                        file_content = f.read()
                    tree = javalang.parse.parse(file_content)
                except:
                    continue

                # 提取特征
                features = {
                    'numMethods': 0,
                    'numLines': 0,
                    '圈复杂度': 0,
                    '继承深度': 0
                }

                # 统计方法数量
                for node in tree.findall(javalang.tree.MethodDeclaration):
                    features['numMethods'] += 1

                # 统计代码行数
                for path, node injavalangastwalk(file_content):
                    if isinstance(node, javalang.tree.MethodDeclaration):
                        start_line = node.position.line
                        end_line = node.position.end.line
                        features['numLines'] += end_line - start_line + 1

                # 计算继承深度
                for node in tree.findall(javalang.tree ClassOrInterface Declaration):
                    current_class = node
                    depth = 0
                    while True:
                        if len(current_class.getExtendedTypes()) == 0:
                            break
                        else:
                            parent_type = current_class.getExtendedTypes()[0].name
                            parent_class = None
                            for parent_node in tree.findall(javalang.tree ClassOrInterface Declaration):
                                if parent_node.nam

修改

import javalang
import numpy as np

def extract_dm_features(project_path):
    dm_features = {}

    for root, dirs, files in os.walk(project_path):
        for file in files:
            if file.endswith('.java'):
                class_name = os.path.join(root, file).replace('.java', '.class').replace(os.pathsep, '.')
                class_name = class_name.split('src/')[1].split('.class')[0].replace('/', '.')

                try:
                    with open(os.path.join(root, file), 'r', encoding='utf-8') as f:
                        file_content = f.read()
                    tree = javalang.parse.parse(file_content)
                except Exception as e:
                    print(f"Error parsing {file}: {e}")
                    continue

                num_methods = len([n for n in tree if isinstance(n, javalang.tree.MethodDeclaration)])
                num_lines = sum(1 for _ in file_content.splitlines())
                inheritance_depth = 0  # **需补充继承深度计算逻辑**

                dm_features[class_name] = {
                    'numMethods': num_methods,
                    'numLines': num_lines,
                    'inheritanceDepth': inheritance_depth
                }

    class_names = sorted(G.nodes())
    X_dm = np.zeros((len(class_names), 3))
    for i, class_name in enumerate(class_names):
        if class_name in dm_features:
            X_dm[i] = [dm_features[class_name]['numMethods'],
                       dm_features[class_name]['numLines'],
                       dm_features[class_name]['inheritanceDepth']]
        else:
            X_dm[i] = [0, 0, 0]

    return X_dm