Planners API
- class layeredrl.planners.Planner(predictor: Predictor, action_space: Box, n_env_instances: int, horizon: int, policy: Callable[[Tensor], Tensor] | None = None, aux_rewards: List[Callable[[Batch], Tensor]] | None = None, aux_reward_weights: List[float] | None = None, device=device(type='cpu'))[source]
Bases:
ABC- __init__(predictor: Predictor, action_space: Box, n_env_instances: int, horizon: int, policy: Callable[[Tensor], Tensor] | None = None, aux_rewards: List[Callable[[Batch], Tensor]] | None = None, aux_reward_weights: List[float] | None = None, device=device(type='cpu'))[source]
Initialize the planner.
- Parameters:
predictor – The predictor to use for planning. This includes the dynamics and reward model and the value function.
action_space – The action space.
n_env_instances – The number of environment instances. Relevant for vectorization.
horizon – The horizon of the plan.
policy – A function that takes in a state and returns an action.
aux_rewards – A list of functions that take in a batch and return a tensor with cumulative auxiliary rewards for each environment instance. These auxiliary rewards can also make use of the epistemic and aleatoric uncertainties of trajectories.
aux_reward_weights – A list of weights for the auxiliary rewards.
device – The device to use.
- get_aux_rewards(trajectory: Batch) Tensor[source]
Get the auxiliary rewards for the given trajectory.
- Parameters:
trajectory – The trajectory to get the auxiliary rewards for.
- Returns:
The auxiliary rewards for each environment instance.
- abstractmethod plan(initial_obs: Tensor) Tensor[source]
Plan a trajectory from the given observation and return it.
Note that observation has a batch dimension (for multiple environment instances).
- Parameters:
initial_obs – The initial observation of the environment(s).
- Returns:
The actions corresponding to the planned trajectory (a sequence of actions for each environment instance), and an info dictionary with additional information about the optimization.
- abstractmethod reset(initial_guess: Tensor, reset_instances: Tensor | None = None) None[source]
Reset the planner.
When doing MPC, this should be called at the beginning of each episode to reset the planner’s internal state.
- Parameters:
initial_guess – The initial guess for the optimal actions. Shape: (batch_size, horizon, action_dim)
reset_instances – A boolean tensor indicating which instances to reset.
- set_predictor(predictor: Predictor)[source]
Set the predictor for the planner.
- Parameters:
predictor – The new predictor to use for planning.
- abstractmethod shift_initialization(n_shift_steps: int, initial_guess: Tensor, active_instances: Tensor)[source]
Shift the initial action sequence by n_shift_steps and pad with initial_guess.
- Parameters:
n_shift_steps – The number of steps to shift the initial action sequence by.
initial_guess – The initial guess for the last n_shift_steps of the new action sequence. Shape: (n_envs, n_shift_steps, action_dim)
active_instances – A boolean tensor indicating which instances to shift.
- class layeredrl.planners.BlackboxPlanner(optimizer: Optimizer, rollout_callback: Callable[[Batch], None] = None, deterministic: bool = False, use_value_func: bool = False, use_term_prob: bool = True, use_ensemble_disagreement: bool = False, **kwargs)[source]
Bases:
PlannerPlans by optimizing a sequence of actions using a given optimizer.
- __init__(optimizer: Optimizer, rollout_callback: Callable[[Batch], None] = None, deterministic: bool = False, use_value_func: bool = False, use_term_prob: bool = True, use_ensemble_disagreement: bool = False, **kwargs)[source]
Initialize the planner.
Note that the class only supports predictors with value functions that do not depend on the action (no Q-functions). The reason for this is that the value function is used to calculate the value of the final state of the trajectory and the action is not known at that point.
The planner does not use the policy.
- Parameters:
optimizer – An instance of Optimizer that does the optimization of the action sequence.
rollout_callback – A function that is called after rolling out a new set of trajectories. Expects a batch with the trajectory data, the context, the const and the actions as input.
deterministic – Whether to use the deterministic version of the model.
use_value_func – Whether to use the value function to calculate the terminal cost.
use_term_prob – Whether to use the termination probability to calculate the cost.
use_ensemble_disagreement – Whether to use the ensemble disagreement instead of the task reward.
**kwargs – Additional arguments to pass to the parent class.
- cost(initial_obs: Tensor, x: Tensor, horizon: int, batch_size: int) Tensor[source]
Compute the cost of the given actions.
Makes use of the predictor (containing the dynamics model, reward and value functions).
- Parameters:
initial_obs – The initial observation of the environment(s) and samples. Shape: (batch_size * n_samples, state_dim) The batch dimension is collapsed with the sample dimension by using repeat_interleave.
x – The actions to compute the cost of but in shape for blackbox optimizer: (batch_size, n_samples, horizon * action_dim)
horizon – The horizon of the plan.
batch_size – The batch size of the trajectory.
- Returns:
(batch_size,)
- Return type:
The cost of the actions (for each environment instance). Shape
- plan(initial_obs: Tensor, active_instances: Tensor, verbose: bool = False) Tensor[source]
Plan a trajectory from the given observation and return it.
Note that observation has a batch dimension (for multiple environment instances).
- Parameters:
initial_obs – The initial observation of the environment(s).
active_instances – A boolean tensor indicating which instances are active.
verbose – Whether to print out the cost during optimization.
- Returns:
The actions corresponding to the planned trajectory (a sequence of actions for each environment instance), and an info dictionary with additional information about the optimization.
- reset(initial_guess: Tensor, reset_instances: Tensor | None = None) None[source]
Reset the planner.
When doing MPC, this should be called at the beginning of each episode to reset the planner’s internal state.
- Parameters:
initial_guess – The initial guess for the optimal actions. Shape: (batch_size, horizon, action_dim)
reset_instances – A boolean tensor indicating which instances to reset.
- class layeredrl.planners.CEMPlanner(initial_sigma: Tensor | float, use_icem: bool = True, cem_params: dict = None, device: device = device(type='cpu'), *args, **kwargs)[source]
Bases:
BlackboxPlanner- __init__(initial_sigma: Tensor | float, use_icem: bool = True, cem_params: dict = None, device: device = device(type='cpu'), *args, **kwargs)[source]
Initialize the planner.
- Parameters:
initial_sigma – The initial standard deviation of the samples. Shape: (action_dim, )
use_icem – Whether to use ICEM instead of CEM.
cem_params – Parameters for the CEM optimizer.
device – The device to use.
*args – Arguments for the BlackboxPlanner.
**kwargs – Keyword arguments for the BlackboxPlanner and optimizer.
- plan(initial_obs: Tensor, active_instances: Tensor, verbose: bool = False) Tensor[source]
Plan a trajectory from the given observation and return it.
Note that observation has a batch dimension (for multiple environment instances).
- Parameters:
initial_obs – The initial observation of the environment(s).
active_instances – A boolean tensor indicating which instances are active.
verbose – Whether to print out the cost during optimization.
- Returns:
The actions corresponding to the planned trajectory (a sequence of actions for each environment instance), and an info dictionary with additional information about the optimization.
- shift_initialization(n_shift_steps: int, initial_guess: Tensor, active_instances: Tensor)[source]
Shift the initial action sequence by n_shift_steps and pad with initial_guess.
- Parameters:
n_shift_steps – The number of steps to shift the initial action sequence by.
initial_guess – The initial guess for the last n_shift_steps of the new action sequence. Shape: (n_envs, n_shift_steps, action_dim)
active_instances – A boolean tensor indicating which instances to shift.