公西颖初 发表于 2025-8-7 18:25:09

PLE模型简洁解读

PLE模型简洁解读


基础设定


[*]有 2 个任务:CTR、CVR
[*]使用 1 层 PLE(num_levels = 1)
[*]每个任务 2 个任务特定专家(specific_expert_num = 2)
[*]有 1 个共享专家(shared_expert_num = 1)
[*]输入 embedding 是: 的拼接向量
我们来看看“这一层”里的每一个步骤数据是如何流动的。
第 1 步:准备输入

ple_inputs =

[*]x_ctr = CTR 的输入 = 原始 embedding 向量
[*]x_cvr = CVR 的输入 = 同上
[*]x_shared = Shared 的输入 = 同上
注意:这三个向量在第 1 层是一样的,但在后续层会变得不同。
第 2 步:任务专家和共享专家网络

每个任务的 experts:

每个任务有 2 个 specific expert,输入是自己:

[*]CTR 的两个专家 → 输入 x_ctr → 输出
[*]CVR 的两个专家 → 输入 x_cvr → 输出
共享 experts:

只有 1 个共享专家,输入是 x_shared,输出
第 3 步:Gate 网络

我们看 CTR 任务的 gate 是怎么处理的:
CTR 的 gate 做了什么?


[*]输入: x_ctr → shape
[*]过一个小 DNN: 输出变为 (比如 H=32)
[*]线性变换 + softmax: 输出为 ,表示对 3 个专家的权重:

[*]expert_1_ctr
[*]expert_2_ctr
[*]expert_shared

gate_input = DNN(...)(x_ctr)   #
gate_weights = Dense(3, activation='softmax')(gate_input)# 第 4 步:Gate × Experts

将所有专家输出堆叠:
expert_outputs = tf.stack(, axis=1)# 将 gate 权重 reshape:
gate_weights = tf.expand_dims(gate_weights, -1)# 点乘加权求和:
fused_output = tf.reduce_sum(expert_outputs * gate_weights, axis=1)# ✅ 这就是 CTR 任务在这一层提取到的特征,来自自己和共享专家的动态组合。
CVR 任务也完全一样,只是换成用 x_cvr 输入,构建自己的 gate 和专家融合。
下一层(若存在):

然后这些输出(fused_output_ctr, fused_output_cvr, fused_output_shared)会作为下一层的输入,继续重复这一机制。
每一层都会重新生成:

[*]专家网络(不同任务分开)
[*]gate(使用该层输入为条件)
从而实现「逐层提纯」。
Gate 的本质:

Gate 是一个小的 DNN 网络,输入是当前任务的 embedding,输出是对所有专家的 softmax 权重
决定了“这个任务现在要听谁的话”
PLE的pytorch实现

class Expert(nn.Module):
    def __init__(self, input_dim, expert_dim):
      super(Expert, self).__init__()
      self.layer = nn.Sequential(
            nn.Linear(input_dim, expert_dim),
            nn.ReLU(),
            nn.BatchNorm1d(expert_dim),
            nn.Dropout(0.2)
      )

    def forward(self, x):
      return self.layer(x)

class Gate(nn.Module):
    def __init__(self, input_dim, n_experts):
      super(Gate, self).__init__()
      self.gate = nn.Sequential(
            nn.Linear(input_dim, n_experts),
            nn.Softmax(dim=-1)
      )

    def forward(self, x):
      weights = self.gate(x)#
      return weights.unsqueeze(-1)#

class PLELayer(nn.Module):
    def __init__(self, input_dim, expert_dim, n_tasks, n_task_experts, n_shared_experts):
      super(PLELayer, self).__init__()
      self.n_tasks = n_tasks
      self.task_experts = nn.ModuleList([
            nn.ModuleList()
            for _ in range(n_tasks)
      ])
      self.shared_experts = nn.ModuleList([
            Expert(input_dim, expert_dim) for _ in range(n_shared_experts)
      ])
      self.task_gates = nn.ModuleList([
            Gate(input_dim, n_task_experts + n_shared_experts)
            for _ in range(n_tasks)
      ])
      self.shared_gate = Gate(input_dim, n_tasks * n_task_experts + n_shared_experts)

    def forward(self, task_inputs, shared_input):
      # Compute expert outputs
      task_outputs = []
      for i in range(self.n_tasks):
            task_outputs.append() for expert in self.task_experts])
      shared_outputs =

      # Task-specific gate outputs
      next_task_inputs = []
      for i in range(self.n_tasks):
            all_expert_outputs = task_outputs + shared_outputs
            stacked = torch.stack(all_expert_outputs, dim=1)#
            weights = self.task_gates(task_inputs)      #
            fused = torch.sum(stacked * weights, dim=1)       #
            next_task_inputs.append(fused)

      # Shared gate output (for next layer's shared input)
      flat_all_experts = sum(task_outputs, []) + shared_outputs
      stacked_shared = torch.stack(flat_all_experts, dim=1)
      shared_weights = self.shared_gate(shared_input)
      next_shared_input = torch.sum(stacked_shared * shared_weights, dim=1)#

      return next_task_inputs, next_shared_input

class PLE(nn.Module):
    # 正确处理多层维度
    def __init__(self, input_dim, expert_dim, n_tasks=3, n_layers=2,
               n_task_experts=2, n_shared_experts=1):
      super(PLE, self).__init__()
      self.n_tasks = n_tasks
      self.ple_layers = nn.ModuleList()
      
      # 为每一层设置正确的输入维度
      for layer_idx in range(n_layers):
            if layer_idx == 0:
                # 第一层:使用原始输入维度
                current_input_dim = input_dim
            else:
                # 后续层:使用expert输出维度作为输入
                current_input_dim = expert_dim
               
            self.ple_layers.append(
                PLELayer(
                  input_dim=current_input_dim,# 动态设置输入维度
                  expert_dim=expert_dim,
                  n_tasks=n_tasks,
                  n_task_experts=n_task_experts,
                  n_shared_experts=n_shared_experts
                )
            )

    def forward(self, x):
      # Initial input: shared across all tasks and shared experts
      task_inputs =
      shared_input = x

      for layer in self.ple_layers:
            task_inputs, shared_input = layer(task_inputs, shared_input)

      return task_inputs# final task-specific vectors
来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除
页: [1]
查看完整版本: PLE模型简洁解读