grutopia.core.env¶
env¶
- class grutopia.core.gym_env.Env(simulator_runtime: SimulatorRuntime)[source]¶
Gym Env for a single environment with a single learning agent.¶
- get_observations() dict[Any, Any] | Any [source]¶
Get observations from Isaac environment
- Returns:
observation
- Return type:
observation (gym.Space)
- render(mode='human')[source]¶
Compute the render frames as specified by
render_mode
during the initialization of the environment.The environment’s
metadata
render modes (env.metadata[“render_modes”]) should contain the possible ways to implement the render modes. In addition, list versions for most render modes is achieved through gymnasium.make which automatically applies a wrapper to collect rendered frames.Note
As the
render_mode
is known during__init__
, the objects used to render the environment state should be initialised in__init__
.By convention, if the
render_mode
is:None (default): no render is computed.
“human”: The environment is continuously rendered in the current display or terminal, usually for human consumption. This rendering should occur during
step()
andrender()
doesn’t need to be called. ReturnsNone
.“rgb_array”: Return a single frame representing the current state of the environment. A frame is a
np.ndarray
with shape(x, y, 3)
representing RGB values for an x-by-y pixel image.“ansi”: Return a strings (
str
) orStringIO.StringIO
containing a terminal-style text representation for each time step. The text can include newlines and ANSI escape sequences (e.g. for colors).“rgb_array_list” and “ansi_list”: List based version of render modes are possible (except Human) through the wrapper,
gymnasium.wrappers.RenderCollection
that is automatically applied duringgymnasium.make(..., render_mode="rgb_array_list")
. The frames collected are popped afterrender()
is called orreset()
.
Note
Make sure that your class’s
metadata
"render_modes"
key includes the list of supported modes.Changed in version 0.25.0: The render function was changed to no longer accept parameters, rather these parameters should be specified in the environment initialised, i.e.,
gymnasium.make("CartPole-v1", render_mode="human")
- reset(*, seed=None, options=None) tuple[gymnasium.spaces.space.Space, dict[str, Any]] [source]¶
Resets the environment to an initial internal state, returning an initial observation and info.
- Parameters:
seed (optional int) – The seed that is used to initialize the environment’s PRNG (np_random).
options (optional dict) – Additional information to specify how the environment is reset (optional, depending on the specific environment)
- Returns:
Observation of the initial state. info (dictionary): Contains the key task_runtime if there is an unfinished task
- Return type:
observation (ObsType)
- property simulation_app¶
simulation app instance
- property simulation_runtime¶
config of simulation environment
- step(action: Any) tuple[Any, float, bool, bool, dict[str, Any]] [source]¶
run step with given action(with isaac step)
TODO: Implement the conversion between dict and action space/obs space
- Parameters:
action (Any) – an action provided by the agent to update the environment state.
- Returns:
An element of the environment’s
observation_space
as the next observation due to the agent actions. reward (float): The reward as a result of taking the action. terminated (bool): Whether the agent reaches the terminal state. If true, the user needs to callreset()
. truncated (bool): Whether the truncation condition outside the scope of the MDP is satisfied.Typically, this is a timelimit, but could also be used to indicate an agent physically going out of bounds. Can be used to end the episode prematurely before a terminal state is reached. If true, the user needs to call
reset()
.- info (dict): Contains auxiliary diagnostic information (helpful for debugging, learning, and logging).
Currently, it contains nothing.
- Return type:
observation (Any)
runner¶
- class grutopia.core.runner.SimulatorRunner(simulator_runtime: SimulatorRuntime)[source]¶
- clear_single_task(task_name: str)[source]¶
Clear single task with task_name
- Parameters:
task_name (str) – Task name to clear.
- next_episode(task_name: str | None = None) TaskRuntime | None [source]¶
Switch to the next episode.
This method cleanups a finished task specified by task name and then switches to the next task if exists.
- Parameters:
task_name (Optional[str]) – The task name of the finished task.
- Returns:
new task runtime.
- Return type:
TaskRuntime
- Raises:
RuntimeError – If the specified task_name is not found in the current tasks.
- reset(task: str | None = None) Tuple[Dict, TaskRuntime | None] [source]¶
Reset the task.
- Parameters:
task (str) – A task name to reset. if task is None, it always means the reset is invoked for the first time
step(). (before agents invoke) –
- Returns:
A tuple of two values. The first is a dict of observations. The second is a TaskRuntime object representing the new task runtime.
- Return type:
Tuple[Dict, TaskRuntime]
- step(actions: Dict | None = None, render: bool = True) Tuple[Dict, Dict[str, bool], Dict[str, float]] [source]¶
Step function to advance the simulation environment by one time step.
This method processes actions for active tasks, steps the simulation world, collects observations, updates metrics, and determines task terminations. It also handles rendering based on specified intervals.
- Parameters:
actions (Union[Dict, None], optional) – A dictionary mapping task names to another dictionary of robot names and their respective actions. If None, no actions are applied. Defaults to None.
render (bool, optional) – Flag indicating whether to render the simulation at this step. True triggers rendering if the render interval is met. Defaults to True.
- Returns:
obs (Dict): A dictionary containing observations for each task, further divided by robot names and their observation data.
terminated_status (Dict[str, bool]): A dictionary mapping task names to boolean values indicating whether the task has terminated.
reward (Dict[str, float]): A dictionary that would contain rewards for each task or robot; however, the actual return and computation of rewards is not shown in the provided code snippet.
- Return type:
Tuple[Dict, Dict[str, bool], Dict[str, float]]
- Raises:
Exception – If an error occurs when applying an action to a robot, the exception is logged and re-raised, providing context about the task, robot, and current tasks state.
Notes
The _world.step() method advances the simulation, optionally rendering the environment based on the render flag and the render interval.
get_obs() is a method to collect observations from the simulation world, though its implementation details are not shown.
Metrics for each task are updated, and upon task completion, results are saved to a JSON file. This includes a flag ‘normally_end’ set to True, which seems to indicate normal termination of the task.
The function also manages a mechanism to prevent further action application and metric updates for tasks that have been marked as finished.
Caution
The snippet contains a TODO comment suggesting there’s an aspect requiring attention related to “Key optimization interval,” which isn’t addressed in the docstring or the code shown.