Skip to content
Snippets Groups Projects
Commit 411e015e authored by Tessaris Sergio's avatar Tessaris Sergio
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
MIT License
Copyright (c) [2019] [Sergio Tessaris]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
# Wumpus World Simulator
This package implements a Python version of the [Hunt the Wumpus](https://en.wikipedia.org/wiki/Hunt_the_Wumpus) game as described in the book [Artificial Intelligence: A Modern Approach by Russell and Norvig](http://aima.cs.berkeley.edu/).
The package has been written to be used in the master course of AI taught at the Faculty of Computer Science of the Free University of Bozen-Bolzano.
## Install
You can download the source code from <https://gitlab.inf.unibz.it/tessaris/wumpus> and use `pip install .` or installing directly from the repository with
```
pip install git+https://gitlab.inf.unibz.it/tessaris/wumpus.git
```
## Usage
To write your own player you should create a subclass of [Player]() and then use an instance of it as a parameter of the `run_episode` method of `GridWorld` class. Instances of the `Player` subclasses should be created using its `player` class method, where you can decide whether to give the player access to the underlying environment or to rely on the state information provided by the `play` method.
Examples of the usage of the package can be found in the implementation of two players `RandomPlayer` and `UserPlayer`, and in the file [`wumpus-usage.py`]() in the `examples` directory of the repository.
\ No newline at end of file
#!/usr/bin/env python
# Examples demonstrating the use of the Wumpus package
import json
import random
import wumpus as wws
class MyPlayer(wws.UserPlayer):
"""Player demonstrating the use of the start episode method to inspect the world."""
def start_episode(self):
"""Print the description of the world and the agent before starting (if known)."""
if self.world is not None:
# I know the world
world_info = {k: [] for k in ('Hunter', 'Pits', 'Wumpus', 'Gold', 'Exits')}
world_info['Size'] = (self.world.size.x, self.world.size.y)
world_info['Blocks'] = [(c.x, c.y) for c in self.world.blocks]
for obj in self.world.objects:
if isinstance(obj, wws.Hunter):
world_info['Hunter'].append((obj.location.x, obj.location.y))
elif isinstance(obj, wws.Pit):
world_info['Pits'].append((obj.location.x, obj.location.y))
elif isinstance(obj, wws.Wumpus):
world_info['Wumpus'].append((obj.location.x, obj.location.y))
elif isinstance(obj, wws.Exit):
world_info['Exits'].append((obj.location.x, obj.location.y))
elif isinstance(obj, wws.Gold):
world_info['Gold'].append((obj.location.x, obj.location.y))
print('World details:')
for k in ('Size', 'Pits', 'Wumpus', 'Gold', 'Exits', 'Blocks'):
print(' {}: {}'.format(k, world_info.get(k, None)))
if self.agent is not None and isinstance(self.agent, wws.Hunter):
print('Controlling hunter in position ({}, {}) with direction {}'.format(self.agent.location.x, self.agent.location.y, self.agent.orientation))
def play_classic(size: int = 0):
"""Play the classic version of the wumpus."""
# create the world
world = wws.WumpusWorld.classic(size=size if size > 3 else random.randint(4, 8))
# get the hunter agent
hunter = next(iter(o for o in world.objects if isinstance(o, wws.Hunter)), None)
# Run a player without any knowledge about the world
world.run_episode(hunter, wws.UserPlayer.player())
def play_classic_informed(size: int = 0):
"""Play the classic version of the wumpus with a player knowing the world and the agent."""
# create the world
world = wws.WumpusWorld.classic(size=size if size > 3 else random.randint(4, 8))
# get the hunter agent
hunter = next(iter(o for o in world.objects if isinstance(o, wws.Hunter)), None)
# Run a player with knowledge about the world
world.run_episode(hunter, MyPlayer.player(world=world, agent=hunter))
WUMPUS_WORLD = '''
{
"id": "simple wumpus world",
"size": [7, 7],
"hunters": [[0, 0]],
"pits": [[4, 0], [3, 1], [2, 2], [6, 2], [4, 4], [3, 5], [4, 6], [5, 6]],
"wumpuses": [[1, 2]],
"exits": [[0, 0]],
"golds": [[6, 3]],
"blocks": []
}
'''
def play_fixed(world_json: str = WUMPUS_WORLD):
"""Play on a given world described in JSON format."""
# create the world
world = wws.WumpusWorld.from_JSON(json.loads(world_json))
# get the hunter agent
hunter = next(iter(o for o in world.objects if isinstance(o, wws.Hunter)), None)
# Run a player with knowledge about the world
world.run_episode(hunter, MyPlayer.player(world=world, agent=hunter))
EXAMPLES = (play_classic, play_classic_informed, play_fixed)
def main(*args):
# Randomly play one of the examples
ex = random.choice(EXAMPLES)
print('Example {}:'.format(ex.__name__))
print(' ' + ex.__doc__)
ex()
if __name__ == "__main__":
main()
setup.py 0 → 100644
from setuptools import setup, find_packages
import re
import codecs
import os
import subprocess
# Hack to allow non-normalised versions
# see <https://github.com/pypa/setuptools/issues/308>
from setuptools.extern.packaging import version
version.Version = version.LegacyVersion
_INCLUDE_GIT_REV_ = False
# see <https://stackoverflow.com/a/39671214> and
# <https://packaging.python.org/guides/single-sourcing-package-version>
def find_version(*pkg_path):
pkg_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), *pkg_path)
version_file = codecs.open(os.path.join(pkg_dir, '__init__.py'), 'r').read()
version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
version_file, re.M)
_git_revision_ = None
if _INCLUDE_GIT_REV_:
try:
_git_revision_ = subprocess.check_output(['git', 'describe', '--always', '--dirty'], encoding='utf-8').strip()
except subprocess.CalledProcessError:
pass
if version_match:
return version_match.group(1) + ('' if _git_revision_ is None else '+' + _git_revision_)
elif _git_revision_:
return _git_revision_
raise RuntimeError("Unable to find version string.")
def long_description_md(fname='README.md'):
this_directory = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(this_directory, fname), encoding='utf-8') as f:
long_description = f.read()
return long_description
setup(
name='wumpus',
version=find_version('wumpus'),
description='Wumpus world simulator',
long_description=long_description_md(),
long_description_content_type='text/markdown',
author='Sergio Tessaris',
author_email='tessaris@inf.unibz.it',
packages=find_packages(),
include_package_data=True,
install_requires=[
],
exclude_package_data={'': ['.gitignore']},
)
from wumpus.wumpus import WumpusWorld, Hunter, Wumpus, Pit, Gold, Exit
from wumpus.gridworld import Player, UserPlayer, RandomPlayer
__version__ = '0.1.0'
\ No newline at end of file
"""GridWorld implementation
This file implements the basics of a rectangular grid world, and the objects
that might populate it. Including a simple agent that can move in four directions.
"""
from enum import Enum
from inspect import cleandoc
import random
import textwrap
from typing import Set, NamedTuple, Iterable, Dict, Union, Any
def object_id(obj: Any, nbits: int = 32) -> str:
"""Build a string identifying the object using hashing anc class name. The object hash is reduced to the given number of bits nbits if it's greater than zero."""
# hash_id = hash(obj)
hash_id = (hash(obj) + (1 << nbits)) % (1 << nbits) if nbits else hash(obj)
return '{}_{:x}'.format(type(obj).__name__, hash_id)
class Coordinate(NamedTuple):
"""Cartesian 2D coordinates."""
x: int
y: int
def coord(x: int, y: int) -> Coordinate:
"""Return a Coordinates named tuple, first argument is horizontal and second vertical."""
return Coordinate(x=x, y=y)
class WorldObject(object):
"""An object, agent, or any other element that might be placed in a GridWorld."""
def __init__(self):
self._world: GridWorld = None
def _setWorld(self, world: 'GridWorld'):
self._world = world
@property
def location(self) -> Coordinate:
"""Return the location of the object in the world or None if it's not inside a world."""
return self.world.location_of(self) if self.world is not None else None
@property
def world(self) -> 'GridWorld':
"""Return the world in which the object is or None if it's not inside a world."""
return self._world
@property
def name(self) -> str:
"""Return the name of the object."""
return object_id(self)
def charSymbol(self) -> str:
"""Return the character for the object in the textual representation of the world."""
raise NotImplementedError
class Agent(WorldObject):
"""Is a special kind of world object that perform actions."""
class Actions(Enum):
"""The actions that the agent can perform."""
pass
@classmethod
def actions(cls) -> Iterable['Agent.Actions']:
"""Return the actions that the agent can execute as an iterable object."""
return cls.Actions
@property
def reward(self) -> int:
"""The current accumulated reward"""
raise NotImplementedError
@property
def isAlive(self):
"""Return true is the agent can still execute actions."""
return True
def percept(self) -> Any:
"""Return the perception of the environment. None by default."""
return None
def do(self, action: 'Agent.Actions') -> int:
"""Execute an action and return the reward of the action."""
raise NotImplementedError
def on_done(self):
"""Called when the episode terminate."""
pass
def success(self) -> bool:
"""Return true once the goal of the agent has been achieved."""
return False
class GridWorldException(Exception):
"""Root of the exceptions raised by the GridWorld code."""
pass
class OutOfBounds(GridWorldException):
"""Raised when an object is placed outside the bounds of the world."""
pass
class Collision(GridWorldException):
"""Raised when an object cannot be placed because is colliding with another object or block."""
pass
class GridWorld(object):
def __init__(self, size: Coordinate, blocks: Iterable[Coordinate]):
self._size = size
self._blocks = set(blocks)
self._objects: Dict[WorldObject, Coordinate] = {}
self._location: Dict[Coordinate, Iterable[Coordinate]] = {}
@classmethod
def from_string(cls, world_desc: str) -> 'GridWorld':
"""Create a new grid world from a string describing the layout.
Each line corresponds to a different row, and #s represent the position of a block, while any othe character is interpreted as an empty square. The size of the world is the number of lines (height) and the size of the longest line (width). E.g.:
....#
#....
#....
#....
.....
"""
BLOCK_STR = '#'
rows = cleandoc(world_desc).splitlines()
size = Coordinate(x=max(len(r) for r in rows), y=len(rows))
return cls(size, blocks=cls.find_coordinates(BLOCK_STR, rows))
@staticmethod
def find_coordinates(items: str, world_desc: Union[str, Iterable[str]]):
"""Return all the coordinates in which any of the characters appears in the world description. The decription can be a multiline string or the list of lines."""
coordinates = []
rows = world_desc.splitlines() if isinstance(world_desc, str) else world_desc
y = -1
for row in reversed(rows):
y += 1
for x in range(len(row)):
if row[x] in items:
coordinates.append(Coordinate(x=x, y=y))
return coordinates
@property
def size(self) -> Coordinate:
"""Return the size of the world."""
return self._size
@property
def blocks(self) -> Set[Coordinate]:
"""Return the set of coordinates where blocks are placed."""
return self._blocks
def isBlock(self, pos: Coordinate) -> bool:
"""Return true if in the coordinate there's a block."""
return pos in self.blocks
def isInside(self, pos: Coordinate) -> bool:
"""Return true if the coordinate is inside the world."""
return (0 <= pos.x < self.size.x) and (0 <= pos.y < self.size.y)
def objects_at(self, pos: Coordinate) -> Iterable[WorldObject]:
"""Return an iterable over the objects at the given coordinate."""
return self._location.get(pos, [])
def location_of(self, obj: WorldObject) -> Coordinate:
"""Return the coordinate of the object within the world, or none if it's not in it."""
return self._objects.get(obj, None)
def empty_cells(self, count_objects=False) -> Iterable[Coordinate]:
"""Return an iterable object over the cells without blocks. If count_objects is not False then also other objects are taken into account."""
all_cells = set([coord(x, y) for x in range(0, self.size.x) for y in range(0, self.size.y)])
all_cells.difference_update(self.blocks)
if count_objects:
all_cells.difference_update(self._location.keys())
return all_cells
@property
def objects(self) -> Iterable[WorldObject]:
"""Return an iterable over the objects within the world."""
return self._objects.keys()
def removeBlock(self, pos: Coordinate):
self._blocks.discard(pos)
def addBlock(self, pos: Coordinate):
self._blocks.add(pos)
def addObject(self, obj: WorldObject, pos: Coordinate):
if not self.isInside(pos):
raise OutOfBounds('Placing {} outside the world at {}'.format(obj, pos))
if self.isBlock(pos):
raise Collision('Placing {} inside a block {}'.format(obj, pos))
if pos in self._location:
self._location[pos].append(obj)
else:
self._location[pos] = [obj]
self._objects[obj] = pos
obj._setWorld(self)
def removeObject(self, obj: WorldObject):
try:
self._location[self.location_of(obj)].remove(obj)
del self._objects[obj]
obj._setWorld(None)
except KeyError:
pass
except ValueError:
pass
def moveObject(self, obj: WorldObject, pos: Coordinate):
if not self.isInside(pos):
raise OutOfBounds('Moving {} outside the world at {}'.format(obj, pos))
if self.isBlock(pos):
raise Collision('Moving {} inside a block {}'.format(obj, pos))
old_pos = self._objects.get(obj, None)
if old_pos in self._location:
self._location[old_pos].remove(obj)
if pos in self._location:
self._location[pos].append(obj)
else:
self._location[pos] = [obj]
self._objects[obj] = pos
obj._setWorld(self)
def __str__(self):
CELL_WIDTH = 2
BLANK = '.'.rjust(CELL_WIDTH)
BLOCK = '' * CELL_WIDTH
maze_strs = [[BLANK for j in range(self.size.x)] for i in range(self.size.y)]
for pos in self.blocks:
maze_strs[pos.y][pos.x] = BLOCK
for obj, pos in self._objects.items():
maze_strs[pos.y][pos.x] = obj.charSymbol().ljust(CELL_WIDTH)
top_frame = '' + '' * CELL_WIDTH * self.size.x + '' + '\n'
bottom_frame = '\n' + '' + '' * CELL_WIDTH * self.size.x + ''
side_frame = ''
return top_frame + "\n".join(reversed([side_frame + ''.join(maze_strs[i]) + side_frame for i in range(self.size.y)])) + bottom_frame
def run_episode(self, agent: Agent, player: 'Player', horizon: int = 0, show=True):
"""Run an episode on the world using the player to control the agent. The horizon specifies the maximun number of steps, 0 or None means no limit. If show is true then the world is printed ad each iteration before the player's turn.
Raise the exception GridWorldException is the agent is not in the world."""
if agent not in self.objects:
raise GridWorldException('Missing agent {}, cannot run the episode'.format(agent))
# inform the player of the start of the episode
player.start_episode()
step = 0
while not horizon or step < horizon:
if agent.success():
print('The agent {} succeded!'.format(agent.name))
break
if not agent.isAlive:
print('The agent {} died!'.format(agent.name))
break
if show:
print(self)
action = player.play(step, agent.percept(), agent.actions())
if action is None:
print('Episode terminated by the player {}.'.format(player.name))
break
print('Step {}: agent {} executing {}'.format(step, agent.name, action.name))
reward = agent.do(action)
player.feedback(action, reward, agent.percept())
step += 1
else:
print('Episode terminated by maximun number of steps ({}).'.format(horizon))
player.end_episode()
print(self)
print('Episode terminated with a reward of {} for agent {}'.format(agent.reward, agent.name))
class Player(object):
"""A player for a given agent. It implements the play method which should
return one of the actions for the agent or None to give up.
"""
def start_episode(self):
"""Method called at the beginning of the episode."""
pass
def end_episode(self):
"""Method called at the when an episode is completed."""
pass
def play(self, turn: int, state, actions: Iterable[Agent.Actions]) -> Agent.Actions:
"""Given a turn (integer) and a percept, which might differ according to the specific problem, returns an action, among the given list of possible actions, to play at the given turn or None to stop the episode."""
raise NotImplementedError
def feedback(self, action: Agent.Actions, reward: int, state):
"""Receive in input the reward of the last action and the resulting state. The function is called right after the execution of the action."""
pass
@property
def name(self) -> str:
"""Return the name of the player or a default value based on its type and hash."""
try:
return self._name
except AttributeError:
return object_id(self)
@property
def world(self) -> GridWorld:
"""Return the world the player is playing on. If it's not known then None is returned."""
try:
return self._world
except AttributeError:
return None
@property
def agent(self) -> Agent:
"""Return the agent the player is controlling. If it's not known then None is returned."""
try:
return self._agent
except AttributeError:
return None
@classmethod
def player(cls, name: str = None, world: GridWorld = None, agent: Agent = None, **args) -> 'Player':
"""Create a new player with a name, world is playing on and the agent is controlling. Additional args are passed to the default object constructor."""
ply = cls(**args)
if name is not None:
ply._name = name
if world is not None:
ply._world = world
if agent is not None:
ply._agent = agent
return ply
############################################################################
#
# Examples of the use of the API
#######################################
# Trivial players
#
class RandomPlayer(Player):
"""This player selects randomly one of the available actions."""
def play(self, turn: int, state, actions: Iterable[Agent.Actions]) -> Agent.Actions:
actions_lst = list(actions)
return actions_lst[random.randint(0, len(actions) - 1)]
def feedback(self, action: Agent.Actions, reward: int, state):
print('{}: action {} reward is {}'.format(self.name, action.name, reward))
class UserPlayer(Player):
def play(self, turn: int, state, actions: Iterable[Agent.Actions]) -> Agent.Actions:
actions_dict = {a.name: a for a in actions}
print('{} percept:'.format(self.name))
print(textwrap.indent(str(state), ' '))
while True:
answer = input('{}: select an action {} and press enter, or empty to stop: '.format(self.name, list(actions_dict.keys()))).strip()
if len(answer) < 1:
return None
elif answer in actions_dict:
return actions_dict[answer]
else:
options = [k for k in actions_dict.keys() if k.lower().startswith(answer.lower())]
if len(options) == 1:
return actions_dict[options[0]]
else:
print('Canot understand <{}>'.format(answer))
def feedback(self, action: Agent.Actions, reward: int, state):
print('{}: action {} reward is {}'.format(self.name, action.name, reward))
#######################################
# Simple agent moving in four directions that eats on the way.
# Its goal is to consume all the food
class Food(WorldObject):
def charSymbol(self):
return '🍌'
class SimpleEater(Agent):
class Actions(Agent.Actions):
N = (0, 1)
S = (0, -1)
E = (1, 0)
W = (-1, 0)
def __init__(self):
self._foodcount = 0
self._reward = 0
self.FOOD_BONUS = 10
def charSymbol(self):
return '🐒'
@property
def reward(self) -> int:
"""The current accumulated reward"""
return self._reward
def do(self, action: Agent.Actions) -> int:
delta = action.value
new_pos = coord(self.location.x + delta[0], self.location.y + delta[1])
try:
self.world.moveObject(self, new_pos)
except (OutOfBounds, Collision):
pass
# every action costs 1
cost = -1
for obj in self.world.objects_at(self.location):
if isinstance(obj, Food):
# eat the food
self.world.removeObject(obj)
self._foodcount += 1
# food give bonus!
cost += self.FOOD_BONUS
self._reward += cost
return cost
def on_done(self):
print('{} agent: Got food {} times.'.format(type(self).__name__, self._foodcount))
def success(self) -> bool:
"""Return true once all the food has been consumed."""
food = [o for o in self.world.objects if isinstance(o, Food)]
return len(food) == 0
def eaterWorld(map_desc: str, foods: Iterable[Coordinate] = [], food_amount: float = .1) -> GridWorld:
"""Create a new world using the decription and placing food in the given list foods or randomly placing a food_amount (if greater than zero) or the percentage of free cells otherwise number foods."""
world = GridWorld.from_string(map_desc)
if len(foods) > 0:
for pos in foods:
if not world.isBlock(pos):
world.addObject(Food(), pos)
else:
free_cells = list(world.empty_cells())
random.shuffle(free_cells)
food_count = int(food_amount) if food_amount >= 1 else int(len(free_cells) * food_amount)
for i in range(food_count):
world.addObject(Food(), free_cells.pop())
return world
def simpleEaterTest(player_class=RandomPlayer, horizon=20):
MAP_STR = """
################
# # # #
# # #
# # #
# # # #
############# ##
# # # #
# # # #
# # # #
# #
############# ##
# # # #
# # #
# # #
# # # #
################
"""
world = eaterWorld(MAP_STR, food_amount=0.1)
free_cells = list(world.empty_cells(count_objects=True))
random.shuffle(free_cells)
# place the agent in a random empty place
agent = SimpleEater()
world.addObject(agent, free_cells.pop())
player = player_class.player()
world.run_episode(agent, player, horizon=horizon)
############################################################################
#
# Testing the API
if __name__ == "__main__":
simpleEaterTest()
simpleEaterTest(player_class=UserPlayer)
from enum import Enum
import json
import random
from typing import Iterable, NamedTuple, Dict, Sequence, Tuple
from .gridworld import Agent, WorldObject, GridWorld, Coordinate, coord, GridWorldException, UserPlayer
class WumpusWorldObject(WorldObject):
def _setWorld(self, world: 'WumpusWorld'):
self._world = world
@property
def world(self) -> 'WumpusWorld':
return self._world
class Wumpus(WumpusWorldObject):
def charSymbol(self):
return 'W'
class Pit(WumpusWorldObject):
def charSymbol(self):
return 'P'
class Gold(WumpusWorldObject):
def charSymbol(self):
return 'G'
class Exit(WumpusWorldObject):
def charSymbol(self):
return ' '
class Hunter(Agent):
class Actions(Agent.Actions):
MOVE = 0
RIGHT = 1
LEFT = 2
SHOOT = 3
GRAB = 4
CLIMB = 5
class Orientation(Enum):
N = (0, 1)
S = (0, -1)
E = (1, 0)
W = (-1, 0)
class Percept(NamedTuple):
stench: bool
breeze: bool
bump: bool
scream: bool
glitter: bool
def __str__(self):
result = []
if self.stench:
result.append('Stench')
if self.breeze:
result.append('Breeze')
if self.bump:
result.append('Bump')
if self.scream:
result.append('Scream')
if self.glitter:
result.append('Glitter')
return ', '.join(result) if len(result) else 'none'
ROT_RIGHT = {
Orientation.N: Orientation.E,
Orientation.E: Orientation.S,
Orientation.S: Orientation.W,
Orientation.W: Orientation.N
}
ROT_LEFT = {v: k for k, v in ROT_RIGHT.items()}
def __init__(self, orientation: 'Hunter.Orientation' = None):
super().__init__()
self._orientation: self.Orientation = orientation if orientation is not None else self.Orientation.N
self._reward: int = 0
self._alive: bool = True
self._scream: bool = False
self._arrow: bool = True
self._done: bool = False
self._bump: bool = False
self._has_gold: bool = False
@property
def world(self) -> 'WumpusWorld':
return self._world
def success(self) -> bool:
"""Return true once the goal of the agent has been achieved."""
return self._done
def percept(self):
return self.Percept(
stench=self.stench,
breeze=self.breeze,
bump=self.bump,
scream=self.scream,
glitter=self.glitter
)
def _delta(self, direction: 'Hunter.Orientation') -> Coordinate:
return Coordinate(
x=self.location.x + direction.value[0],
y=self.location.y + direction.value[1])
def _rotate(self, direction: str) -> 'Hunter.Orientation':
new_dir = (self.ROT_RIGHT if direction[0].lower() == 'r' else self.ROT_LEFT)[self.orientation]
return new_dir
def _aligned(self, pos: Coordinate) -> bool:
if self.orientation == self.Orientation.N:
return self.location.x == pos.x and self.location.y < pos.y
elif self.orientation == self.Orientation.S:
return self.location.x == pos.x and self.location.y > pos.y
elif self.orientation == self.Orientation.W:
return self.location.x > pos.x and self.location.y == pos.y
else:
return self.location.x < pos.x and self.location.y == pos.y
@property
def orientation(self):
return self._orientation
@property
def reward(self) -> int:
"""The current accumulated reward"""
return self._reward
@property
def isAlive(self):
"""Return true is the hunter is still alive."""
return self._alive
@property
def scream(self) -> bool:
return self._scream
@property
def arrow(self) -> bool:
return self._arrow
@property
def stench(self):
return any(
self.world.isWumpus(self._delta(d)) for d in self.Orientation
)
@property
def breeze(self):
return any(
self.world.isPit(self._delta(d)) for d in self.Orientation
)
@property
def glitter(self):
return self.world.isGold(self.location)
@property
def bump(self):
return self._bump
def charSymbol(self):
return '@'
def do(self, action: 'Hunter.Actions') -> int:
"""Execute an action and return the reward of the action."""
if not self.isAlive or self._done:
raise ValueError('Cannot do anything while dead or task is completed')
reward = 0
self._scream = False
self._bump = False
if action == self.Actions.MOVE:
reward -= 1
new_pos = self._delta(self.orientation)
try:
self.world.moveObject(self, new_pos)
except GridWorldException:
self._bump = True
if self.world.isWumpus(self.location) or self.world.isPit(self.location):
self._alive = False
reward -= 1000
elif action == self.Actions.RIGHT:
reward -= 1
self._orientation = self._rotate('r')
elif action == self.Actions.LEFT:
reward -= 1
self._orientation = self._rotate('l')
elif action == self.Actions.SHOOT:
if self.arrow:
self._arrow = False
reward -= 10
for wumpus in [w for w in self.world.objects if isinstance(w, Wumpus)]:
if self._aligned(wumpus.location):
self._scream = True
self.world.removeObject(wumpus)
else:
reward -= 1
elif action == self.Actions.GRAB:
reward -= 1
for gold in self.world.isGold(self.location):
self._has_gold = True
self.world.removeObject(gold)
elif action == self.Actions.CLIMB:
reward -= 1
if self.world.isExit(self.location):
self._done = True
if self._has_gold:
reward += 1000
else:
raise ValueError('Unrecognised action {}'.format(action))
self._reward += reward
return reward
class WumpusWorld(GridWorld):
@classmethod
def classic(cls, size: int = 4, seed=None, pitProb: float = .2):
"""Create a classic wumpus world problem of the given size. The agent is placed in (0,0) facing north and there's exactly one wumpus and a gold ingot. The seed is used to initialise the random number generation and pits are placed with pitProb probability."""
world = cls(coord(size, size), [])
agentPos = coord(0, 0)
world.addObject(Hunter(orientation=Hunter.Orientation.N), agentPos)
world.addObject(Exit(), agentPos)
world.random_populate(pitProb=pitProb, seed=seed, wumpus=1, gold=1)
return world
@classmethod
def from_JSON(cls, json_obj: Dict):
"""Create a wumpus world from a JSON object"""
def getCoord(lst: Sequence[int]) -> Coordinate:
return coord(lst[0], lst[1])
def coordLst(key: str) -> Iterable[Coordinate]:
data = json_obj.get(key, [])
if len(data) < 1:
return []
elif isinstance(data[0], Sequence):
# list of coordinates
return [getCoord(c) for c in data]
else:
return [getCoord(data)]
size = coordLst('size')[0]
blocks = coordLst('blocks')
hunters = coordLst('hunters')
pits = coordLst('pits')
wumpuses = coordLst('wumpuses')
exits = coordLst('exits')
golds = coordLst('golds')
world = cls(size, blocks)
for pos in hunters:
world.addObject(Hunter(orientation=Hunter.Orientation.N), pos)
for pos in (exits or hunters or [coord(0, 0)]):
world.addObject(Exit(), pos)
for pos in wumpuses:
world.addObject(Wumpus(), pos)
for pos in pits:
world.addObject(Pit(), pos)
for pos in golds:
world.addObject(Gold(), pos)
return world
@classmethod
def randomWorld(cls, size: int = 4, pitProb: float = .2, seed=None, blockProb: float = 0, world_desc: str = None) -> 'WumpusWorld':
def randomPlace(world: GridWorld) -> Coordinate:
pos = coord(random.randint(0, world.size.x - 1), random.randint(0, world.size.y - 1))
while world.isBlock(pos):
pos = coord(random.randint(0, world.size.x - 1), random.randint(0, world.size.y - 1))
return pos
# initialise random generator
random.seed(seed)
world = cls(coord(size, size), []) if world_desc is None else cls.from_string(world_desc)
agentPos = coord(0, 0)
assert not world.isBlock(agentPos)
if blockProb > 0 and world_desc is None:
# randomly place blocks
for pos in [coord(x, y) for x in range(world.size.x) for y in range(world.size.y)]:
if pos != agentPos and random.random() < blockProb:
world.addBlock(pos)
world.addObject(Hunter(), agentPos)
world.addObject(Exit(), agentPos)
goldPos = randomPlace(world)
world.addObject(Gold(), goldPos)
wumpusPos = randomPlace(world)
while wumpusPos == agentPos:
wumpusPos = randomPlace(world)
world.addObject(Wumpus(), wumpusPos)
# place the pits
for row in range(world.size.y):
for col in range(world.size.x):
pos = coord(col, row)
# place a pit with probability pitProb
# but not in the same place as the agent
if pos != agentPos and not world.isBlock(pos) and random.random() < pitProb:
world.addObject(Pit(), pos)
return world
def random_populate(self, pitProb: float = .2, seed=None, wumpus: int = 1, gold: int = 1):
"""Randomly popolate a wumpus world with pits, gold, and wumpus. Pits and wumpus are not placed in the same place where the agent is."""
def randomPlace(skipPos) -> Coordinate:
pos = None
while pos is None or self.isBlock(pos) or pos in skipPos:
pos = coord(random.randint(0, self.size.x - 1), random.randint(0, self.size.y - 1))
return pos
agentPos = (o.location for o in self.objects if isinstance(o, Hunter))
random.seed(seed)
# place the wumpus
for i in range(wumpus):
self.addObject(Wumpus(), randomPlace(agentPos))
# place the gold
for i in range(gold):
self.addObject(Gold(), randomPlace(agentPos))
# place the pits
for row in range(self.size.y):
for col in range(self.size.x):
pos = coord(col, row)
# place a pit with probability pitProb
# but not in the same place as the agent
if pos not in agentPos and not self.isBlock(pos) and random.random() < pitProb:
self.addObject(Pit(), pos)
def _objAt(self, cls, pos: Coordinate) -> Iterable[WumpusWorldObject]:
"""Return an iterable over the objects at the coordinate filtered by the given class cls (accepts the same class specification as the isinstance function)."""
return [o for o in self.objects_at(pos) if isinstance(o, cls)]
def isWumpus(self, pos: Coordinate) -> Wumpus:
"""Return a wumpus if it's at the coordinate or None."""
return next(iter(self._objAt(Wumpus, pos)), None)
def isHunter(self, pos: Coordinate) -> Hunter:
"""Return a hunter if it's at the coordinate or None."""
return next(iter(self._objAt(Hunter, pos)), None)
def isPit(self, pos: Coordinate) -> Pit:
"""Return a pit if it's at the coordinate or None."""
return next(iter(self._objAt(Pit, pos)), None)
def isGold(self, pos: Coordinate) -> Gold:
"""Return a gold if it's at the coordinate or None."""
return next(iter(self._objAt(Gold, pos)), None)
def isExit(self, pos: Coordinate) -> Iterable[Exit]:
"""Return a exit if it's at the coordinate or None."""
return next(iter(self._objAt(Exit, pos)), None)
def to_JSON(self) -> Dict:
"""Return a JSON serialisable object with the description of the world."""
def coord_tuple(c: Coordinate) -> Tuple[int]:
return (c.x, c.y)
def add_to_json(obj: Dict, key: str, lst: Sequence):
if len(lst) > 0:
obj[key] = lst
size = coord_tuple(self.size)
blocks = [coord_tuple(c) for c in self.blocks]
hunters = []
pits = []
wumpuses = []
exits = []
golds = []
for obj in self.objects:
if isinstance(obj, Hunter):
hunters.append(coord_tuple(obj.location))
elif isinstance(obj, Pit):
pits.append(coord_tuple(obj.location))
elif isinstance(obj, Wumpus):
wumpuses.append(coord_tuple(obj.location))
elif isinstance(obj, Exit):
exits.append(coord_tuple(obj.location))
elif isinstance(obj, Gold):
golds.append(coord_tuple(obj.location))
json_obj = {'size': size}
add_to_json(json_obj, 'blocks', blocks)
add_to_json(json_obj, 'hunters', hunters)
add_to_json(json_obj, 'pits', pits)
add_to_json(json_obj, 'wumpuses', wumpuses)
add_to_json(json_obj, 'exits', exits)
add_to_json(json_obj, 'golds', golds)
return json_obj
def __str__(self):
def objChar(obj: WumpusWorldObject) -> str:
return obj.charSymbol() if obj is not None else ' '
BLOCK_W = 3
L_BORDER = R_BORDER = ''
DIRECTION = {
Hunter.Orientation.N: '^',
Hunter.Orientation.S: 'V',
Hunter.Orientation.W: '<',
Hunter.Orientation.E: '>'
}
def top_line() -> str:
return '' + ''.join('' * BLOCK_W for i in range(self.size.x)) + ''
def bottom_line() -> str:
return '' + ''.join('' * BLOCK_W for i in range(self.size.x)) + ''
def mid_line():
return '' + ''.join('' * BLOCK_W for i in range(self.size.x)) + ''
lines = [top_line()]
for row in range(self.size.y - 1, -1, -1):
cell = [L_BORDER, L_BORDER]
for col in range(self.size.x):
pos = coord(col, row)
if self.isBlock(pos):
cell[0] += '' * BLOCK_W
cell[1] += '' * BLOCK_W
else:
cell[0] += objChar(self.isWumpus(pos))
cell[0] += objChar(self.isGold(pos))
cell[0] += objChar(self.isPit(pos))
hunter = self.isHunter(pos)
if hunter is not None:
cell[1] += ' ' + objChar(hunter)
cell[1] += DIRECTION[hunter.orientation]
else:
cell[1] += ' ' * BLOCK_W
cell[0] += R_BORDER
cell[1] += R_BORDER
lines.extend(cell)
if row > 0:
lines.append(mid_line())
lines.append(bottom_line())
return '\n'.join(lines)
if __name__ == "__main__":
WORLD_MAP = '\n'.join([
'....#',
'.....',
'.....',
'.....',
'.....',
])
# world = WumpusWorld.randomWorld(size=7, blockProb=0.1, world_desc=WORLD_MAP)
world = WumpusWorld.classic(size=7)
hunter = next(iter(o for o in world.objects if isinstance(o, Hunter)), None)
world.run_episode(hunter, UserPlayer.player())
print(json.dumps(world.to_JSON()))
JSON_STRING = '{"size": [7, 7], "hunters": [[0, 0]], "pits": [[4, 0], [3, 1], [2, 2], [6, 2], [4, 4], [3, 5], [4, 6], [5, 6]], "wumpuses": [[1, 2]], "exits": [[0, 0]], "golds": [[6, 3]]}'
world = WumpusWorld.from_JSON(json.loads(JSON_STRING))
print(world)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment