"""GridWorld implementation This file implements the basics of a rectangular grid world, and the objects that might populate it. Including a simple agent that can move in four directions. """ from enum import Enum from inspect import cleandoc import random import textwrap from typing import Set, NamedTuple, Iterable, Dict, Union, Any def object_id(obj: Any, nbits: int = 32) -> str: """Build a string identifying the object using hashing anc class name. The object hash is reduced to the given number of bits nbits if it's greater than zero.""" # hash_id = hash(obj) hash_id = (hash(obj) + (1 << nbits)) % (1 << nbits) if nbits else hash(obj) return '{}_{:x}'.format(type(obj).__name__, hash_id) class Coordinate(NamedTuple): """Cartesian 2D coordinates.""" x: int y: int def coord(x: int, y: int) -> Coordinate: """Return a Coordinates named tuple, first argument is horizontal and second vertical.""" return Coordinate(x=x, y=y) class WorldObject(object): """An object, agent, or any other element that might be placed in a GridWorld.""" def __init__(self): self._world: GridWorld = None def _setWorld(self, world: 'GridWorld'): self._world = world @property def location(self) -> Coordinate: """Return the location of the object in the world or None if it's not inside a world.""" return self.world.location_of(self) if self.world is not None else None @property def world(self) -> 'GridWorld': """Return the world in which the object is or None if it's not inside a world.""" return self._world @property def name(self) -> str: """Return the name of the object.""" return object_id(self) def charSymbol(self) -> str: """Return the character for the object in the textual representation of the world.""" raise NotImplementedError class Agent(WorldObject): """Is a special kind of world object that perform actions.""" class Actions(Enum): """The actions that the agent can perform.""" pass @classmethod def actions(cls) -> Iterable['Agent.Actions']: """Return the actions that the agent can execute as an iterable object.""" return cls.Actions @property def reward(self) -> int: """The current accumulated reward""" raise NotImplementedError @property def isAlive(self): """Return true is the agent can still execute actions.""" return True def percept(self) -> Any: """Return the perception of the environment. None by default.""" return None def do(self, action: 'Agent.Actions') -> int: """Execute an action and return the reward of the action.""" raise NotImplementedError def on_done(self): """Called when the episode terminate.""" pass def success(self) -> bool: """Return true once the goal of the agent has been achieved.""" return False class GridWorldException(Exception): """Root of the exceptions raised by the GridWorld code.""" pass class OutOfBounds(GridWorldException): """Raised when an object is placed outside the bounds of the world.""" pass class Collision(GridWorldException): """Raised when an object cannot be placed because is colliding with another object or block.""" pass class GridWorld(object): def __init__(self, size: Coordinate, blocks: Iterable[Coordinate]): self._size = size self._blocks = set(blocks) self._objects: Dict[WorldObject, Coordinate] = {} self._location: Dict[Coordinate, Iterable[Coordinate]] = {} @classmethod def from_string(cls, world_desc: str) -> 'GridWorld': """Create a new grid world from a string describing the layout. Each line corresponds to a different row, and #s represent the position of a block, while any othe character is interpreted as an empty square. The size of the world is the number of lines (height) and the size of the longest line (width). E.g.: ....# #.... #.... #.... ..... """ BLOCK_STR = '#' rows = cleandoc(world_desc).splitlines() size = Coordinate(x=max(len(r) for r in rows), y=len(rows)) return cls(size, blocks=cls.find_coordinates(BLOCK_STR, rows)) @staticmethod def find_coordinates(items: str, world_desc: Union[str, Iterable[str]]): """Return all the coordinates in which any of the characters appears in the world description. The decription can be a multiline string or the list of lines.""" coordinates = [] rows = world_desc.splitlines() if isinstance(world_desc, str) else world_desc y = -1 for row in reversed(rows): y += 1 for x in range(len(row)): if row[x] in items: coordinates.append(Coordinate(x=x, y=y)) return coordinates @property def size(self) -> Coordinate: """Return the size of the world.""" return self._size @property def blocks(self) -> Set[Coordinate]: """Return the set of coordinates where blocks are placed.""" return self._blocks def isBlock(self, pos: Coordinate) -> bool: """Return true if in the coordinate there's a block.""" return pos in self.blocks def isInside(self, pos: Coordinate) -> bool: """Return true if the coordinate is inside the world.""" return (0 <= pos.x < self.size.x) and (0 <= pos.y < self.size.y) def objects_at(self, pos: Coordinate) -> Iterable[WorldObject]: """Return an iterable over the objects at the given coordinate.""" return self._location.get(pos, []) def location_of(self, obj: WorldObject) -> Coordinate: """Return the coordinate of the object within the world, or none if it's not in it.""" return self._objects.get(obj, None) def empty_cells(self, count_objects=False) -> Iterable[Coordinate]: """Return an iterable object over the cells without blocks. If count_objects is not False then also other objects are taken into account.""" all_cells = set([coord(x, y) for x in range(0, self.size.x) for y in range(0, self.size.y)]) all_cells.difference_update(self.blocks) if count_objects: all_cells.difference_update(self._location.keys()) return all_cells @property def objects(self) -> Iterable[WorldObject]: """Return an iterable over the objects within the world.""" return self._objects.keys() def removeBlock(self, pos: Coordinate): self._blocks.discard(pos) def addBlock(self, pos: Coordinate): self._blocks.add(pos) def addObject(self, obj: WorldObject, pos: Coordinate): if not self.isInside(pos): raise OutOfBounds('Placing {} outside the world at {}'.format(obj, pos)) if self.isBlock(pos): raise Collision('Placing {} inside a block {}'.format(obj, pos)) if pos in self._location: self._location[pos].append(obj) else: self._location[pos] = [obj] self._objects[obj] = pos obj._setWorld(self) def removeObject(self, obj: WorldObject): try: self._location[self.location_of(obj)].remove(obj) del self._objects[obj] obj._setWorld(None) except KeyError: pass except ValueError: pass def moveObject(self, obj: WorldObject, pos: Coordinate): if not self.isInside(pos): raise OutOfBounds('Moving {} outside the world at {}'.format(obj, pos)) if self.isBlock(pos): raise Collision('Moving {} inside a block {}'.format(obj, pos)) old_pos = self._objects.get(obj, None) if old_pos in self._location: self._location[old_pos].remove(obj) if pos in self._location: self._location[pos].append(obj) else: self._location[pos] = [obj] self._objects[obj] = pos obj._setWorld(self) def __str__(self): CELL_WIDTH = 2 BLANK = '.'.rjust(CELL_WIDTH) BLOCK = '█' * CELL_WIDTH maze_strs = [[BLANK for j in range(self.size.x)] for i in range(self.size.y)] for pos in self.blocks: maze_strs[pos.y][pos.x] = BLOCK for obj, pos in self._objects.items(): maze_strs[pos.y][pos.x] = obj.charSymbol().ljust(CELL_WIDTH) top_frame = '┌' + '─' * CELL_WIDTH * self.size.x + '┐' + '\n' bottom_frame = '\n' + '└' + '─' * CELL_WIDTH * self.size.x + '┘' side_frame = '│' return top_frame + "\n".join(reversed([side_frame + ''.join(maze_strs[i]) + side_frame for i in range(self.size.y)])) + bottom_frame def run_episode(self, agent: Agent, player: 'Player', horizon: int = 0, show=True): """Run an episode on the world using the player to control the agent. The horizon specifies the maximun number of steps, 0 or None means no limit. If show is true then the world is printed ad each iteration before the player's turn. Raise the exception GridWorldException is the agent is not in the world.""" if agent not in self.objects: raise GridWorldException('Missing agent {}, cannot run the episode'.format(agent)) # inform the player of the start of the episode player.start_episode() step = 0 while not horizon or step < horizon: if agent.success(): print('The agent {} succeded!'.format(agent.name)) break if not agent.isAlive: print('The agent {} died!'.format(agent.name)) break if show: print(self) action = player.play(step, agent.percept(), agent.actions()) if action is None: print('Episode terminated by the player {}.'.format(player.name)) break print('Step {}: agent {} executing {}'.format(step, agent.name, action.name)) reward = agent.do(action) player.feedback(action, reward, agent.percept()) step += 1 else: print('Episode terminated by maximun number of steps ({}).'.format(horizon)) player.end_episode() print(self) print('Episode terminated with a reward of {} for agent {}'.format(agent.reward, agent.name)) class Player(object): """A player for a given agent. It implements the play method which should return one of the actions for the agent or None to give up. """ def start_episode(self): """Method called at the beginning of the episode.""" pass def end_episode(self): """Method called at the when an episode is completed.""" pass def play(self, turn: int, state, actions: Iterable[Agent.Actions]) -> Agent.Actions: """Given a turn (integer) and a percept, which might differ according to the specific problem, returns an action, among the given list of possible actions, to play at the given turn or None to stop the episode.""" raise NotImplementedError def feedback(self, action: Agent.Actions, reward: int, state): """Receive in input the reward of the last action and the resulting state. The function is called right after the execution of the action.""" pass @property def name(self) -> str: """Return the name of the player or a default value based on its type and hash.""" try: return self._name except AttributeError: return object_id(self) @property def world(self) -> GridWorld: """Return the world the player is playing on. If it's not known then None is returned.""" try: return self._world except AttributeError: return None @property def agent(self) -> Agent: """Return the agent the player is controlling. If it's not known then None is returned.""" try: return self._agent except AttributeError: return None @classmethod def player(cls, name: str = None, world: GridWorld = None, agent: Agent = None, **args) -> 'Player': """Create a new player with a name, world is playing on and the agent is controlling. Additional args are passed to the default object constructor.""" ply = cls(**args) if name is not None: ply._name = name if world is not None: ply._world = world if agent is not None: ply._agent = agent return ply ############################################################################ # # Examples of the use of the API ####################################### # Trivial players # class RandomPlayer(Player): """This player selects randomly one of the available actions.""" def play(self, turn: int, state, actions: Iterable[Agent.Actions]) -> Agent.Actions: actions_lst = list(actions) return actions_lst[random.randint(0, len(actions) - 1)] def feedback(self, action: Agent.Actions, reward: int, state): print('{}: action {} reward is {}'.format(self.name, action.name, reward)) class UserPlayer(Player): def play(self, turn: int, state, actions: Iterable[Agent.Actions]) -> Agent.Actions: actions_dict = {a.name: a for a in actions} print('{} percept:'.format(self.name)) print(textwrap.indent(str(state), ' ')) while True: answer = input('{}: select an action {} and press enter, or empty to stop: '.format(self.name, list(actions_dict.keys()))).strip() if len(answer) < 1: return None elif answer in actions_dict: return actions_dict[answer] else: options = [k for k in actions_dict.keys() if k.lower().startswith(answer.lower())] if len(options) == 1: return actions_dict[options[0]] else: print('Canot understand <{}>'.format(answer)) def feedback(self, action: Agent.Actions, reward: int, state): print('{}: action {} reward is {}'.format(self.name, action.name, reward)) ####################################### # Simple agent moving in four directions that eats on the way. # Its goal is to consume all the food class Food(WorldObject): def charSymbol(self): return '🍌' class SimpleEater(Agent): class Actions(Agent.Actions): N = (0, 1) S = (0, -1) E = (1, 0) W = (-1, 0) def __init__(self): self._foodcount = 0 self._reward = 0 self.FOOD_BONUS = 10 def charSymbol(self): return '🐒' @property def reward(self) -> int: """The current accumulated reward""" return self._reward def do(self, action: Agent.Actions) -> int: delta = action.value new_pos = coord(self.location.x + delta[0], self.location.y + delta[1]) try: self.world.moveObject(self, new_pos) except (OutOfBounds, Collision): pass # every action costs 1 cost = -1 for obj in self.world.objects_at(self.location): if isinstance(obj, Food): # eat the food self.world.removeObject(obj) self._foodcount += 1 # food give bonus! cost += self.FOOD_BONUS self._reward += cost return cost def on_done(self): print('{} agent: Got food {} times.'.format(type(self).__name__, self._foodcount)) def success(self) -> bool: """Return true once all the food has been consumed.""" food = [o for o in self.world.objects if isinstance(o, Food)] return len(food) == 0 def eaterWorld(map_desc: str, foods: Iterable[Coordinate] = [], food_amount: float = .1) -> GridWorld: """Create a new world using the decription and placing food in the given list foods or randomly placing a food_amount (if greater than zero) or the percentage of free cells otherwise number foods.""" world = GridWorld.from_string(map_desc) if len(foods) > 0: for pos in foods: if not world.isBlock(pos): world.addObject(Food(), pos) else: free_cells = list(world.empty_cells()) random.shuffle(free_cells) food_count = int(food_amount) if food_amount >= 1 else int(len(free_cells) * food_amount) for i in range(food_count): world.addObject(Food(), free_cells.pop()) return world def simpleEaterTest(player_class=RandomPlayer, horizon=20): MAP_STR = """ ################ # # # # # # # # # # # # # # ############# ## # # # # # # # # # # # # # # ############# ## # # # # # # # # # # # # # # ################ """ world = eaterWorld(MAP_STR, food_amount=0.1) free_cells = list(world.empty_cells(count_objects=True)) random.shuffle(free_cells) # place the agent in a random empty place agent = SimpleEater() world.addObject(agent, free_cells.pop()) player = player_class.player() world.run_episode(agent, player, horizon=horizon) ############################################################################ # # Testing the API if __name__ == "__main__": simpleEaterTest() simpleEaterTest(player_class=UserPlayer)