fromtypingimportAny,Dict,List,OptionalfrompydanticimportBaseModelfromgrutopia.core.utilimportlogclassMetaActionData(BaseModel):""" action status in grutopia """controller:strdata:AnyclassActionData(BaseModel):""" action status in grutopia """robot:strcontrollers:List[MetaActionData]class_IsaacData(BaseModel):""" isaac status in grutopia """actions:Optional[Dict[str,Dict[str,Any]]]={}obs:Optional[Dict[str,Dict[str,Any]]]={}task_idx_counter:Optional[int]=0finished_tasks:Optional[List[str]]=[]
[docs]classIsaacData:""" isaac status in grutopia There are three types of isaac status: * Action * Observation * Episode structure of isaac status like this:: { actions: { "task_0":{ robot_1: { cap: param, controller: param, } } }, observations: { "task_0":{ robot_1: { obs_1: data, obs_2: data } } }, task_idx_counter: 0 finished_tasks: [] } """data:_IsaacData=_IsaacData(actions={},obs={})def__init__(self)->None:pass@classmethoddefget_all(cls)->_IsaacData:returncls.data# Observation
[docs]@classmethoddefset_obs_data(cls,obs:Dict[str,Dict[str,Any]])->None:""" Set isaac observations data Args: obs (Dict[str, Dict[str, Any]]): obs data with task_name key. """fortask_id,obs_datainobs.items():cls.data.obs[task_id]=obs_data
[docs]@classmethoddefset_obs_data_by_task_name(cls,task_name:str,obs:Dict[str,Dict[str,Any]])->None:""" Set isaac observations data by task name Args: task_name: isaac task name obs (Dict[str, Dict[str, Any]]): obs data with robot_name key. """forrobot_name,obs_datainobs.items():cls.data.obs[task_name][robot_name]=obs_data
[docs]@classmethoddefset_obs_by_task_name_and_robot_name(cls,task_name:str,robot_name:str,obs:Dict[str,Dict[str,Any]])->None:""" Set isaac observation by task name and robot name Args: task_name: isaac task name robot_name: isaac robot name obs (Dict[str, Dict[str, Any]]): obs data with task_name key. """cls.data.obs[task_name][robot_name]=obs
[docs]@classmethoddefget_obs(cls)->Dict[str,Dict]:""" Get isaac observation by task name Args: task_name: isaac task name Returns: isaac observation data """returncls.data.obs
[docs]@classmethoddefget_obs_by_task_name(cls,task_name:str)->Dict[str,Dict]:""" Get isaac observation by task name Args: task_name: isaac task name Returns: isaac observation data """iftask_nameincls.data.obs:returncls.data.obs[task_name]return{}
[docs]@classmethoddefget_obs_by_task_name_and_robot_name(cls,task_name:str,robot_name:str)->Dict[str,Dict]:""" Get isaac observation by task name and robot name Args: task_name: isaac task name robot_name: isaac robot name Returns: isaac observation data """iftask_nameincls.data.obs:ifrobot_nameincls.data.obs[task_name]:returncls.data.obs[task_name][robot_name]return{}
# Action
[docs]@classmethoddefset_actions(cls,actions:Dict[str,ActionData])->None:""" Set action by task_name Args: actions (Dict[str, ActionData]): action data with task_name key """fortask_name,actioninactions.items():ifcls.data.actions.get(task_name)isNone:cls.data.actions[task_name]={}cls.data.actions[task_name].update(action.model_dump())
[docs]@classmethoddefget_action_by_task_name(cls,task_name:str)->Dict[str,Dict]:""" Get action by task name Returns: action(dict like {robot_name: {controller_name: param}}) """iftask_nameincls.data.actions:returncls.data.actions[task_name]return{}
[docs]@classmethoddefget_action_by_task_name_and_robot_name(cls,task_name:str,robot_name:str)->Dict[str,Dict]:""" Get action by task name and robot name Returns: action(dict like {robot_name: {controller_name: param}}) """iftask_nameincls.data.actions:ifrobot_nameincls.data.actions[task_name]:returncls.data.actions[task_name][robot_name]return{}
[docs]@classmethoddefgen_task_idx(cls)->str:""" Generate a id for isaac task Returns: task id """task_idx=cls.data.task_idx_countercls.data.task_idx_counter+=1returnstr(task_idx)
[docs]@classmethoddefset_episode_finished(cls,task_name:str):""" Set isaac task {task_name} finished Args: task_name (str): isaac task_name """iftask_namenotincls.data.finished_tasks:cls.data.finished_tasks.append(task_name)log.info(f'Set isaac task {task_name} finished')return