Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""GridWorld implementation
This file implements the basics of a rectangular grid world, and the objects
that might populate it. Including a simple agent that can move in four directions.
"""
from enum import Enum
from inspect import cleandoc
import random
import textwrap
from typing import Set, NamedTuple, Iterable, Dict, Union, Any
def object_id(obj: Any, nbits: int = 32) -> str:
"""Build a string identifying the object using hashing anc class name. The object hash is reduced to the given number of bits nbits if it's greater than zero."""
# hash_id = hash(obj)
hash_id = (hash(obj) + (1 << nbits)) % (1 << nbits) if nbits else hash(obj)
return '{}_{:x}'.format(type(obj).__name__, hash_id)
class Coordinate(NamedTuple):
"""Cartesian 2D coordinates."""
x: int
y: int
def coord(x: int, y: int) -> Coordinate:
"""Return a Coordinates named tuple, first argument is horizontal and second vertical."""
return Coordinate(x=x, y=y)
class WorldObject(object):
"""An object, agent, or any other element that might be placed in a GridWorld."""
def __init__(self):
self._world: GridWorld = None
def _setWorld(self, world: 'GridWorld'):
self._world = world
@property
def location(self) -> Coordinate:
"""Return the location of the object in the world or None if it's not inside a world."""
return self.world.location_of(self) if self.world is not None else None
@property
def world(self) -> 'GridWorld':
"""Return the world in which the object is or None if it's not inside a world."""
return self._world
@property
def name(self) -> str:
"""Return the name of the object."""
return object_id(self)
def charSymbol(self) -> str:
"""Return the character for the object in the textual representation of the world."""
raise NotImplementedError
class Agent(WorldObject):
"""Is a special kind of world object that perform actions."""
class Actions(Enum):
"""The actions that the agent can perform."""
pass
@classmethod
def actions(cls) -> Iterable['Agent.Actions']:
"""Return the actions that the agent can execute as an iterable object."""
return cls.Actions
@property
def reward(self) -> int:
"""The current accumulated reward"""
raise NotImplementedError
@property
def isAlive(self):
"""Return true is the agent can still execute actions."""
return True
def percept(self) -> Any:
"""Return the perception of the environment. None by default."""
return None
def do(self, action: 'Agent.Actions') -> int:
"""Execute an action and return the reward of the action."""
raise NotImplementedError
def on_done(self):
"""Called when the episode terminate."""
pass
def success(self) -> bool:
"""Return true once the goal of the agent has been achieved."""
return False
class GridWorldException(Exception):
"""Root of the exceptions raised by the GridWorld code."""
pass
class OutOfBounds(GridWorldException):
"""Raised when an object is placed outside the bounds of the world."""
pass
class Collision(GridWorldException):
"""Raised when an object cannot be placed because is colliding with another object or block."""
pass
class GridWorld(object):
def __init__(self, size: Coordinate, blocks: Iterable[Coordinate]):
self._size = size
self._blocks = set(blocks)
self._objects: Dict[WorldObject, Coordinate] = {}
self._location: Dict[Coordinate, Iterable[WorldObject]] = {}
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@classmethod
def from_string(cls, world_desc: str) -> 'GridWorld':
"""Create a new grid world from a string describing the layout.
Each line corresponds to a different row, and #s represent the position of a block, while any othe character is interpreted as an empty square. The size of the world is the number of lines (height) and the size of the longest line (width). E.g.:
....#
#....
#....
#....
.....
"""
BLOCK_STR = '#'
rows = cleandoc(world_desc).splitlines()
size = Coordinate(x=max(len(r) for r in rows), y=len(rows))
return cls(size, blocks=cls.find_coordinates(BLOCK_STR, rows))
@staticmethod
def find_coordinates(items: str, world_desc: Union[str, Iterable[str]]):
"""Return all the coordinates in which any of the characters appears in the world description. The decription can be a multiline string or the list of lines."""
coordinates = []
rows = world_desc.splitlines() if isinstance(world_desc, str) else world_desc
y = -1
for row in reversed(rows):
y += 1
for x in range(len(row)):
if row[x] in items:
coordinates.append(Coordinate(x=x, y=y))
return coordinates
@property
def size(self) -> Coordinate:
"""Return the size of the world."""
return self._size
@property
def blocks(self) -> Set[Coordinate]:
"""Return the set of coordinates where blocks are placed."""
return self._blocks
@property
def object_locations(self) -> Dict[WorldObject, Coordinate]:
"""Return a dictionary associating objects to their coordinate."""
return self._objects
@property
def location_objects(self) -> Dict[Coordinate, Iterable[WorldObject]]:
"""Return a dictionary associating locations to the objects at the given coordinate."""
return self._location
def isBlock(self, pos: Coordinate) -> bool:
"""Return true if in the coordinate there's a block."""
return pos in self.blocks
def isInside(self, pos: Coordinate) -> bool:
"""Return true if the coordinate is inside the world."""
return (0 <= pos.x < self.size.x) and (0 <= pos.y < self.size.y)
def objects_at(self, pos: Coordinate) -> Iterable[WorldObject]:
"""Return an iterable over the objects at the given coordinate."""
return self.location_objects.get(pos, [])
def location_of(self, obj: WorldObject) -> Coordinate:
"""Return the coordinate of the object within the world, or none if it's not in it."""
return self.object_locations.get(obj, None)
def empty_cells(self, count_objects=False) -> Iterable[Coordinate]:
"""Return an iterable object over the cells without blocks. If count_objects is not False then also other objects are taken into account."""
all_cells = set([coord(x, y) for x in range(0, self.size.x) for y in range(0, self.size.y)])
all_cells.difference_update(self.blocks)
if count_objects:
all_cells.difference_update(self.location_objects.keys())
return all_cells
@property
def objects(self) -> Iterable[WorldObject]:
"""Return an iterable over the objects within the world."""
return self.object_locations.keys()
def removeBlock(self, pos: Coordinate):
self._blocks.discard(pos)
def addBlock(self, pos: Coordinate):
self._blocks.add(pos)
def addObject(self, obj: WorldObject, pos: Coordinate):
if not self.isInside(pos):
raise OutOfBounds('Placing {} outside the world at {}'.format(obj, pos))
if self.isBlock(pos):
raise Collision('Placing {} inside a block {}'.format(obj, pos))
if pos in self.location_objects:
self.location_objects[pos].append(obj)
self.location_objects[pos] = [obj]
self.object_locations[obj] = pos
obj._setWorld(self)
def removeObject(self, obj: WorldObject):
try:
self.location_objects[self.location_of(obj)].remove(obj)
del self.object_locations[obj]
obj._setWorld(None)
except KeyError:
pass
except ValueError:
pass
def moveObject(self, obj: WorldObject, pos: Coordinate):
if not self.isInside(pos):
raise OutOfBounds('Moving {} outside the world at {}'.format(obj, pos))
if self.isBlock(pos):
raise Collision('Moving {} inside a block {}'.format(obj, pos))
old_pos = self.object_locations.get(obj, None)
if old_pos in self.location_objects:
self.location_objects[old_pos].remove(obj)
if pos in self.location_objects:
self.location_objects[pos].append(obj)
self.location_objects[pos] = [obj]
self.object_locations[obj] = pos
obj._setWorld(self)
def __str__(self):
CELL_WIDTH = 2
BLANK = '.'.rjust(CELL_WIDTH)
BLOCK = '█' * CELL_WIDTH
maze_strs = [[BLANK for j in range(self.size.x)] for i in range(self.size.y)]
for pos in self.blocks:
maze_strs[pos.y][pos.x] = BLOCK
for obj, pos in self.object_locations.items():
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
maze_strs[pos.y][pos.x] = obj.charSymbol().ljust(CELL_WIDTH)
top_frame = '┌' + '─' * CELL_WIDTH * self.size.x + '┐' + '\n'
bottom_frame = '\n' + '└' + '─' * CELL_WIDTH * self.size.x + '┘'
side_frame = '│'
return top_frame + "\n".join(reversed([side_frame + ''.join(maze_strs[i]) + side_frame for i in range(self.size.y)])) + bottom_frame
def run_episode(self, agent: Agent, player: 'Player', horizon: int = 0, show=True):
"""Run an episode on the world using the player to control the agent. The horizon specifies the maximun number of steps, 0 or None means no limit. If show is true then the world is printed ad each iteration before the player's turn.
Raise the exception GridWorldException is the agent is not in the world."""
if agent not in self.objects:
raise GridWorldException('Missing agent {}, cannot run the episode'.format(agent))
# inform the player of the start of the episode
player.start_episode()
step = 0
while not horizon or step < horizon:
if agent.success():
print('The agent {} succeded!'.format(agent.name))
break
if not agent.isAlive:
print('The agent {} died!'.format(agent.name))
break
if show:
print(self)
action = player.play(step, agent.percept(), agent.actions())
if action is None:
print('Episode terminated by the player {}.'.format(player.name))
break
print('Step {}: agent {} executing {}'.format(step, agent.name, action.name))
reward = agent.do(action)
player.feedback(action, reward, agent.percept())
step += 1
else:
print('Episode terminated by maximun number of steps ({}).'.format(horizon))
player.end_episode()
print(self)
print('Episode terminated with a reward of {} for agent {}'.format(agent.reward, agent.name))
class Player(object):
"""A player for a given agent. It implements the play method which should
return one of the actions for the agent or None to give up.
"""
def start_episode(self):
"""Method called at the beginning of the episode."""
pass
def end_episode(self):
"""Method called at the when an episode is completed."""
pass
def play(self, turn: int, state, actions: Iterable[Agent.Actions]) -> Agent.Actions:
"""Given a turn (integer) and a percept, which might differ according to the specific problem, returns an action, among the given list of possible actions, to play at the given turn or None to stop the episode."""
raise NotImplementedError
def feedback(self, action: Agent.Actions, reward: int, state):
"""Receive in input the reward of the last action and the resulting state. The function is called right after the execution of the action."""
pass
@property
def name(self) -> str:
"""Return the name of the player or a default value based on its type and hash."""
try:
return self._name
except AttributeError:
return object_id(self)
@property
def world(self) -> GridWorld:
"""Return the world the player is playing on. If it's not known then None is returned."""
try:
return self._world
except AttributeError:
return None
@property
def agent(self) -> Agent:
"""Return the agent the player is controlling. If it's not known then None is returned."""
try:
return self._agent
except AttributeError:
return None
@classmethod
def player(cls, name: str = None, world: GridWorld = None, agent: Agent = None, **args) -> 'Player':
"""Create a new player with a name, world is playing on and the agent is controlling. Additional args are passed to the default object constructor."""
ply = cls(**args)
if name is not None:
ply._name = name
if world is not None:
ply._world = world
if agent is not None:
ply._agent = agent
return ply
############################################################################
#
# Examples of the use of the API
#######################################
# Trivial players
#
class RandomPlayer(Player):
"""This player selects randomly one of the available actions."""
def play(self, turn: int, state, actions: Iterable[Agent.Actions]) -> Agent.Actions:
actions_lst = list(actions)
return actions_lst[random.randint(0, len(actions) - 1)]
def feedback(self, action: Agent.Actions, reward: int, state):
print('{}: action {} reward is {}'.format(self.name, action.name, reward))
class UserPlayer(Player):
"""This player asks the user for the next move, if it's not ambiguous it accepts also commands initials and ignores the case."""
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
def play(self, turn: int, state, actions: Iterable[Agent.Actions]) -> Agent.Actions:
actions_dict = {a.name: a for a in actions}
print('{} percept:'.format(self.name))
print(textwrap.indent(str(state), ' '))
while True:
answer = input('{}: select an action {} and press enter, or empty to stop: '.format(self.name, list(actions_dict.keys()))).strip()
if len(answer) < 1:
return None
elif answer in actions_dict:
return actions_dict[answer]
else:
options = [k for k in actions_dict.keys() if k.lower().startswith(answer.lower())]
if len(options) == 1:
return actions_dict[options[0]]
else:
print('Canot understand <{}>'.format(answer))
def feedback(self, action: Agent.Actions, reward: int, state):
print('{}: action {} reward is {}'.format(self.name, action.name, reward))
#######################################
# Simple agent moving in four directions that eats on the way.
# Its goal is to consume all the food
class Food(WorldObject):
def charSymbol(self):
return '🍌'
class SimpleEater(Agent):
class Actions(Agent.Actions):
N = (0, 1)
S = (0, -1)
E = (1, 0)
W = (-1, 0)
def __init__(self):
self._foodcount = 0
self._reward = 0
self.FOOD_BONUS = 10
def charSymbol(self):
return '🐒'
@property
def reward(self) -> int:
"""The current accumulated reward"""
return self._reward
def do(self, action: Agent.Actions) -> int:
delta = action.value
new_pos = coord(self.location.x + delta[0], self.location.y + delta[1])
try:
self.world.moveObject(self, new_pos)
except (OutOfBounds, Collision):
pass
# every action costs 1
cost = -1
for obj in self.world.objects_at(self.location):
if isinstance(obj, Food):
# eat the food
self.world.removeObject(obj)
self._foodcount += 1
# food give bonus!
cost += self.FOOD_BONUS
self._reward += cost
return cost
def on_done(self):
print('{} agent: Got food {} times.'.format(type(self).__name__, self._foodcount))
def success(self) -> bool:
"""Return true once all the food has been consumed."""
food = [o for o in self.world.objects if isinstance(o, Food)]
return len(food) == 0
def eaterWorld(map_desc: str, foods: Iterable[Coordinate] = [], food_amount: float = .1) -> GridWorld:
"""Create a new world using the decription and placing food in the given list foods or randomly placing a food_amount (if greater than zero) or the percentage of free cells otherwise number foods."""
world = GridWorld.from_string(map_desc)
if len(foods) > 0:
for pos in foods:
if not world.isBlock(pos):
world.addObject(Food(), pos)
else:
free_cells = list(world.empty_cells())
random.shuffle(free_cells)
food_count = int(food_amount) if food_amount >= 1 else int(len(free_cells) * food_amount)
for i in range(food_count):
world.addObject(Food(), free_cells.pop())
return world
def simpleEaterTest(player_class=RandomPlayer, horizon=20):
MAP_STR = """
################
# # # #
# # #
# # #
# # # #
############# ##
# # # #
# # # #
# # # #
# #
############# ##
# # # #
# # #
# # #
# # # #
################
"""
world = eaterWorld(MAP_STR, food_amount=0.1)
free_cells = list(world.empty_cells(count_objects=True))
random.shuffle(free_cells)
# place the agent in a random empty place
agent = SimpleEater()
world.addObject(agent, free_cells.pop())
player = player_class.player()
world.run_episode(agent, player, horizon=horizon)
############################################################################
#
# Testing the API
if __name__ == "__main__":
simpleEaterTest()
simpleEaterTest(player_class=UserPlayer)