您的位置:首页 > 新闻 > 热点要闻 > 哪里有好看的网站_有限责任公司怎么注册_品牌广告和效果广告_百度搜索链接

哪里有好看的网站_有限责任公司怎么注册_品牌广告和效果广告_百度搜索链接

2024/12/15 6:31:43 来源:https://blog.csdn.net/weixin_41369892/article/details/143643272  浏览:    关键词:哪里有好看的网站_有限责任公司怎么注册_品牌广告和效果广告_百度搜索链接
哪里有好看的网站_有限责任公司怎么注册_品牌广告和效果广告_百度搜索链接

持续更新常用的强化学习算法,采用单python文件实现,简单易读

  • 2024.11.09 更新:PPO(GAE); SAC
  • 2024.11.12 更新:OptionCritic(PPOC)
"PPO"
import copy
import time
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as Fimport gymnasium as gym
import matplotlib.pyplot as pltfrom tqdm import trange
from torch.distributions import Normalclass Actor(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.mu = nn.Linear(128, action_size)self.sigma = nn.Linear(128, action_size)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))mu = F.tanh(self.mu(x))sigma = F.softplus(self.sigma(x))return mu, sigmaclass Critic(nn.Module):def __init__(self, state_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.fc3 = nn.Linear(128, 1)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))return self.fc3(x)def ppo_training(trajectory, actor, critic, actor_optimizer, critic_optimizer,clip=0.2, k_epochs=10, gamma=0.99, lam=0.95, device='cpu', T=1e-2):states, actions, log_probs, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)).to(device),zip(*trajectory))rewards = rewards.view(-1, 1)dones = dones.view(-1, 1).int()with torch.no_grad():next_values = critic(next_states.float())td_target = rewards + gamma * next_values * (1 - dones)td_value = critic(states.float())td_delta = td_target - td_valuetd_delta = td_delta.detach().cpu().numpy()adv = 0.0advantages = []for delta in td_delta[::-1]:adv = gamma * lam * adv + deltaadvantages.append(adv)advantages.reverse()advantages = torch.from_numpy(np.array(advantages)).float().to(device)advantages = (advantages - advantages.mean()) / advantages.std()for k in range(k_epochs):mu, sigma = actor(states.float())dist = Normal(mu, sigma)new_log_probs = dist.log_prob(actions)entropy = dist.entropy()ratio = torch.exp(new_log_probs - log_probs.detach())surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1.0 - clip, 1 + clip) * advantagesactor_loss = - torch.min(surr1, surr2).mean() - entropy.mean() * Tcritic_loss = F.mse_loss(critic(states.float()), td_target.float().detach())actor_optimizer.zero_grad()critic_optimizer.zero_grad()actor_loss.backward()actor_optimizer.step()critic_loss.backward()critic_optimizer.step()if __name__ == '__main__':device = torch.device("cpu")env = gym.make('Walker2d')episodes = 1000train_timesteps = 1024clip = 0.2k_epochs = 40gamma = 0.9lam = 0.95T = 1e-2lr = 1e-4actor = Actor(env.observation_space.shape[0], env.action_space.shape[0]).to(device)critic = Critic(env.observation_space.shape[0]).to(device)actor_optimizer = torch.optim.Adam(actor.parameters(), lr=lr)critic_optimizer = torch.optim.Adam(critic.parameters(), lr=lr)trajectory = []timestep = 0pbar = trange(1, episodes+1)score_list = []for e in pbar:state, _ = env.reset()scores = 0.0while True:timestep += 1s = torch.from_numpy(state).float().to(device)mu, sigma = actor(s)dist = Normal(mu, sigma)a = dist.sample()log_prob = dist.log_prob(a).detach().cpu().numpy()action = a.detach().cpu().numpy()next_state, reward, done, _, _ = env.step(action)scores += rewardtrajectory.append([state, action, log_prob, reward, next_state, done])if timestep % train_timesteps == 0:ppo_training(trajectory,actor,critic,actor_optimizer,critic_optimizer,clip,k_epochs,gamma,lam,device,T)trajectory = []state = copy.deepcopy(next_state)if done: breakscore_list.append(scores)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}".format(e, episodes, scores, timestep))
"SAC"
from torch.distributions import Normal
from collections import deque
from tqdm import trangeimport torch
import torch.nn as nn
import torch.nn.functional as Fimport copy
import time
import random
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as pltclass ActorNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.mu = nn.Linear(128, action_size)self.sigma = nn.Linear(128, action_size)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))mu = self.mu(x)sigma = F.softplus(self.sigma(x))return mu, sigmaclass QNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size + action_size, 256)self.fc2 = nn.Linear(256, 128)self.fc3 = nn.Linear(128, 1)def forward(self, s, a):x = torch.cat((s, a), dim=-1)x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))return self.fc3(x)class ReplayBuffer:def __init__(self, capacity):self.memory = deque(maxlen=capacity)def __len__(self):return len(self.memory)def save_memory(self, state, action, reward, next_state, done):self.memory.append([state, action, reward, next_state, done])def sample(self, batch_size):sample_size = min(len(self), batch_size)experiences = random.sample(self.memory, sample_size)return experiencesdef soft_update(target, source, tau=0.05):for param, target_param in zip(source.parameters(), target.parameters()):target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)def choice_action(actor, state):mu, sigma = actor(state)normal_dist = Normal(torch.zeros_like(mu), torch.ones_like(sigma))epsilon = normal_dist.sample()action = torch.tanh(mu + sigma * epsilon)log_prob = normal_dist.log_prob(epsilon)log_prob -= torch.log(1 - action.pow(2) + 1e-6)log_prob = log_prob.sum(-1, keepdim=True)return action, log_probdef training(gamma, replay_buffer, models, log_alpha, target_entropy, optimizers, batch_size, tau):(actor,q1_net,target_q1_net,q2_net,target_q2_net) = models(actor_optimizer,q1_optimizer,q2_optimizer,alpha_optimizer) = optimizersbatch_data = replay_buffer.sample(batch_size)states, actions, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)).float().to(device),zip(*batch_data))with torch.no_grad():alpha = torch.exp(log_alpha)with torch.no_grad():next_state_actions, next_state_log_probs = choice_action(actor, next_states)target_q1_next = target_q1_net(next_states, next_state_actions)target_q2_next = target_q2_net(next_states, next_state_actions)min_q_next_target = torch.min(target_q1_next, target_q2_next) - alpha * next_state_log_probstd_target_value = rewards.view(-1, 1) + (1 - dones.view(-1, 1)) * gamma * min_q_next_targetq1 = q1_net(states, actions)q2 = q2_net(states, actions)q1_loss = F.mse_loss(q1, td_target_value)q2_loss = F.mse_loss(q2, td_target_value)q1_optimizer.zero_grad()q2_optimizer.zero_grad()q1_loss.backward()q2_loss.backward()q1_optimizer.step()q2_optimizer.step()state_actions, state_log_probs = choice_action(actor, states)q = torch.min(q1_net(states, state_actions), q2_net(states, state_actions))actor_loss = torch.mean((alpha * state_log_probs) - q)actor_optimizer.zero_grad()actor_loss.backward()actor_optimizer.step()with torch.no_grad():_, log_prob = choice_action(actor, states)alpha_loss = torch.mean(- log_alpha.exp() * (log_prob + target_entropy))alpha_optimizer.zero_grad()alpha_loss.backward()alpha_optimizer.step()soft_update(target_q1_net, q1_net, tau)soft_update(target_q2_net, q2_net, tau)if __name__ == '__main__':device = torch.device("cpu")env = gym.make('Walker2d')episodes = 1000train_timesteps = 4policy_lr = 1e-4q_lr = 1e-4alpha_lr = 1e-2tau = 0.05buffer_capacity = int(1e6)batch_size = 64gamma = 0.9state_size = env.observation_space.shape[0]action_size = env.action_space.shape[0]target_entropy = - torch.prod(torch.tensor(env.observation_space.shape, device=device))actor = ActorNetwork(state_size, action_size).to(device)q1_net = QNetwork(state_size, action_size).to(device)target_q1_net = QNetwork(state_size, action_size).to(device)q2_net = QNetwork(state_size, action_size).to(device)target_q2_net = QNetwork(state_size, action_size).to(device)target_q1_net.load_state_dict(q1_net.state_dict())target_q2_net.load_state_dict(q2_net.state_dict())log_alpha = torch.tensor(0.0, requires_grad=True, device=device)actor_optimizer = torch.optim.Adam(actor.parameters(), lr=policy_lr)q1_optimizer = torch.optim.Adam(q1_net.parameters(), lr=q_lr)q2_optimizer = torch.optim.Adam(q2_net.parameters(), lr=q_lr)alpha_optimizer = torch.optim.Adam([log_alpha], lr=alpha_lr)replay_buffer = ReplayBuffer(buffer_capacity)pbar = trange(1, episodes+1)timestep = 0score_list = []for episode in pbar:state, _ = env.reset()scores = 0.0while True:timestep += 1if timestep % train_timesteps == 0:training(gamma,replay_buffer,(actor,q1_net,target_q1_net,q2_net,target_q2_net),log_alpha,target_entropy,(actor_optimizer,q1_optimizer,q2_optimizer,alpha_optimizer),batch_size,tau)action, _ = choice_action(actor, torch.from_numpy(state).float().to(device))action = action.detach().cpu().numpy()next_state, reward, done, _, _ = env.step(action)scores += rewardreplay_buffer.save_memory(state, action, reward, next_state, done)state = copy.deepcopy(next_state)if done: breakscore_list.append(scores)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}, Log Alpha: {:.2f}".format(episode, episodes, scores, timestep, log_alpha.item()))
"OptionCritic(PPOC)"import torch
import torch.nn as nn
import torch.nn.functional as Ffrom torch.distributions import Bernoulli, Normal
from torch import optimfrom tqdm import trangeimport matplotlib.pyplot as plt
import gymnasium as gym
import numpy as np
import random
import copyclass QNetwork(nn.Module):def __init__(self, state_size, num_options):super().__init__()self.nn = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),nn.ReLU(),nn.Linear(128, num_options))def forward(self, x):return self.nn(x)class ActorNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),)self.mu = nn.Sequential(nn.ReLU(),nn.Linear(128, action_size),nn.Tanh())self.sigma = nn.Sequential(nn.ReLU(),nn.Linear(128, action_size),nn.Softplus())def forward(self, x):x = self.fc(x)return self.mu(x), self.sigma(x)class TerminationNetwork(nn.Module):def __init__(self, state_size, num_options):super().__init__()self.nn = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),nn.ReLU(),nn.Linear(128, num_options),nn.Sigmoid())def forward(self, x):return self.nn(x)class OptionCritic(nn.Module):def __init__(self, state_size, action_size, num_options):super().__init__()self.upper_policy_q_net = QNetwork(state_size, num_options)self.termination_network = TerminationNetwork(state_size, num_options)self.options = nn.ModuleList([ActorNetwork(state_size, action_size)for _ in range(num_options)])self.num_options = num_optionsdef get_option_id(self, state, epsilon):if np.random.rand() > epsilon:return torch.argmax(self.upper_policy_q_net(state),dim=-1).detach().cpu().numpy().item()else:return random.sample(range(self.num_options), 1)[0]def is_option_terminated(self, state, option_id):option_termination_prob = self.termination_network(state)[option_id]option_termination = Bernoulli(option_termination_prob).sample()return bool(option_termination.item())def select_action(self, state, epsilon, option_id):if self.is_option_terminated(state, option_id):option_id = self.get_option_id(state, epsilon)else: option_id = option_idmu, sigma = self.options[option_id](state)normal_dist = Normal(mu, sigma)action = normal_dist.sample()log_prob = normal_dist.log_prob(action)action = action.detach().cpu().numpy()log_prob = log_prob.detach().cpu().numpy()return action, log_prob, option_iddef training(agent, optimizer, trajectory, gamma, k_epochs, clip, lam, T):states, actions, log_probs, option_id, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)), zip(*trajectory))option_id = option_id.view(-1, 1)rewards = rewards.view(-1, 1)dones = dones.view(-1, 1).float()with torch.no_grad():option_terminated_prob = agent.termination_network(next_states.float()).gather(-1, option_id)next_q = agent.upper_policy_q_net(next_states.float())q_target = rewards + gamma * (1 - dones) * ((1 - option_terminated_prob) * next_q.gather(-1, option_id)+ option_terminated_prob * next_q.max(dim=-1, keepdim=True)[0])td_delta = q_target - agent.upper_policy_q_net(states.float()).gather(-1, option_id)td_delta = td_delta.detach().cpu().numpy()adv = 0.0advantages = []for delta in td_delta[::-1]:adv = gamma * lam * adv + deltaadvantages.append(adv)advantages.reverse()advantages = torch.from_numpy(np.array(advantages)).float()advantages = ((advantages - advantages.mean())/ (1e-6 + advantages.std()))for k in range(k_epochs):mus, sigmas = [], []for i in range(states.shape[0]):mu, sigma = agent.options[option_id[i]](states[i].float())mus.append(mu), sigmas.append(sigma)mu = torch.stack(mus, 0)sigma = torch.stack(sigmas, 0)normal_dist = Normal(mu, sigma)new_log_probs = normal_dist.log_prob(actions)entropy = normal_dist.entropy()ratio = torch.exp(new_log_probs - log_probs.detach())surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1.0 - clip, 1 + clip) * advantagespolicy_loss = - torch.min(surr1, surr2).mean() - entropy.mean() * Tcritic_loss = F.mse_loss(agent.upper_policy_q_net(states.float()).gather(-1, option_id),q_target.float())termination_loss = agent.termination_network(states.float()).gather(-1, option_id) * (agent.upper_policy_q_net(states.float()).gather(-1, option_id)- agent.upper_policy_q_net(states.float()).max(dim=-1, keepdim=True)[0]).detach()losses = policy_loss + critic_loss + termination_loss.mean()optimizer.zero_grad()losses.backward()optimizer.step()if __name__ == '__main__':env = gym.make('Walker2d')episodes = 1000train_timesteps = 1024clip = 0.2k_epochs = 10gamma = 0.9lam = 0.95T = 1e-2lr = 1e-4epsilon = 1.0epsilon_decay = 0.995mini_epsilon = 0.1state_size = env.observation_space.shape[0]action_size = env.action_space.shape[0]num_options = 4agent = OptionCritic(state_size, action_size, num_options)optimizer = optim.Adam(agent.parameters(), lr=lr)trajectory = []timestep = 0pbar = trange(1, episodes + 1)scores_list = []for e in pbar:state, _ = env.reset()scores = 0.0option_id = agent.get_option_id(torch.from_numpy(state).float(), epsilon)options = [option_id]while True:timestep += 1if timestep % train_timesteps == 0:training(agent, optimizer, trajectory, gamma, k_epochs, clip, lam, T)trajectory = []action, log_prob, option_id = agent.select_action(torch.from_numpy(state).float(), epsilon, option_id)options.append(option_id)next_state, reward, done, _, _ = env.step(action)scores += rewardtrajectory.append([state, action, log_prob, option_id, reward, next_state, done])state = copy.deepcopy(next_state)if done: breakscores_list.append(scores)epsilon = max(mini_epsilon, epsilon * epsilon_decay)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}, Epsilon: {:.2f}".format(e, episodes, scores, timestep, epsilon))plt.plot(scores_list)plt.show()

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com