Învățare prin Recompensă - rezolvarea proceselor de decizie Markov prin tehnici de programare dinamică (Value Iteration, Policy Iteration)
Autori:
- Tudor Berariu - 2018
- Alexandru Sorici - 2020
1. Scopul laboratorului
Scopul laboratorului îl reprezintă înțelegerea conceptelor de proces markov de decizie (MDP), politică, valoare de stare, precum și implementarea unor metode de programare dinamică pentru rezolvarea problemei de control a unui MDP.
În cadrul laboratorului veți:
- implementa algoritmul de iterare a politicilor
- implementa algoritmul de iterare a valorilor de stare
2. Workspace setup
Câteva bibioteci de care vom avea nevoie
Python
import sysimport os.pathfrom argparse import ArgumentParserfrom copy import copyfrom random import choicefrom typing import Dict, List, TupleDefinirea unui labirint
Python
class Maze: NORTH, EAST, SOUTH, WEST = 0, 1, 2, 3 # actions DYNAMICS = { # the stochastic effects of actions NORTH: {(0, -1): 0.1, (-1, 0): .8, (0, 1): .1}, EAST: {(-1, 0): 0.1, (0, 1): .8, (1, 0): .1}, SOUTH: {(0, 1): 0.1, (1, 0): .8, (0, -1): .1}, WEST: {(1, 0): 0.1, (0, -1): .8, (-1, 0): .1}, } WALL, EMPTY = "x", " " VISUALS = { (0, 0, 1, 1): "\N{BOX DRAWINGS HEAVY DOWN AND RIGHT}", (1, 0, 0, 1): "\N{BOX DRAWINGS HEAVY DOWN AND LEFT}", (1, 0, 1, 0): "\N{BOX DRAWINGS HEAVY HORIZONTAL}", (0, 1, 1, 0): "\N{BOX DRAWINGS HEAVY UP AND RIGHT}", (1, 1, 0, 0): "\N{BOX DRAWINGS HEAVY UP AND LEFT}", (0, 1, 0, 1): "\N{BOX DRAWINGS HEAVY VERTICAL}", (1, 1, 1, 1): "\N{BOX DRAWINGS HEAVY VERTICAL AND HORIZONTAL}", (1, 1, 1, 0): "\N{BOX DRAWINGS HEAVY UP AND HORIZONTAL}", (1, 1, 0, 1): "\N{BOX DRAWINGS HEAVY VERTICAL AND LEFT}", (1, 0, 1, 1): "\N{BOX DRAWINGS HEAVY DOWN AND HORIZONTAL}", (0, 1, 1, 1): "\N{BOX DRAWINGS HEAVY VERTICAL AND RIGHT}", (1, 0, 0, 0): "\N{BOX DRAWINGS HEAVY LEFT}", (0, 1, 0, 0): "\N{BOX DRAWINGS HEAVY UP}", (0, 0, 1, 0): "\N{BOX DRAWINGS HEAVY RIGHT}", (0, 0, 0, 1): "\N{BOX DRAWINGS HEAVY DOWN}", (0, 0, 0, 0): "\N{BOX DRAWINGS HEAVY VERTICAL AND HORIZONTAL}", WEST: "\N{LEFTWARDS ARROW}", NORTH: "\N{UPWARDS ARROW}", EAST: "\N{RIGHTWARDS ARROW}", SOUTH: "\N{DOWNWARDS ARROW}", } def __init__(self, map_name): self._rewards, self._cells = {}, [] with open(os.path.join("maps", map_name), "r") as map_file: for line in map_file.readlines(): if ":" in line: name, value = line.strip().split(":") self._rewards[name] = float(value) else: self._cells.append(list(line.strip())) self._states = [(i, j) for i, row in enumerate(self._cells) for j, cell in enumerate(row) if cell != Maze.WALL] @property def actions(self): return [Maze.NORTH, Maze.EAST, Maze.SOUTH, Maze.WEST] @property def states(self): return copy(self._states) def is_final(self, state): row, col = state return self._cells[row][col] != Maze.EMPTY def effects(self, state, action): if self.is_final(state): return [] row, col = state next_states = {} for (d_row, d_col), prob in Maze.DYNAMICS[action].items(): next_row, next_col = row + d_row, col + d_col if self._cells[next_row][next_col] == Maze.WALL: next_row, next_col = row, col if (next_row, next_col) in next_states: prev_prob, _ = next_states[(next_row, next_col)] prob += prev_prob cell = self._cells[next_row][next_col] reward = self._rewards["default" if cell == Maze.EMPTY else cell] next_states[(next_row, next_col)] = (prob, reward) return [(s, p, r) for s, (p, r) in next_states.items()] def print_policy(self, policy): last_row = [] height = len(self._cells) for row, row_cells in enumerate(self._cells): width = len(row_cells) for col, cell in enumerate(row_cells): if cell == Maze.WALL: north, south, west, east = 0, 0, 0, 0 if last_row and len(last_row) > col: north = last_row[col] == Maze.WALL if row + 1 < height: south = self._cells[row + 1][col] == Maze.WALL if col > 0: west = row_cells[col - 1] == Maze.WALL if col + 1 < width: east = row_cells[col + 1] == Maze.WALL sys.stdout.write(Maze.VISUALS[(west, north, east, south)]) elif self.is_final((row, col)): sys.stdout.write(cell) else: action = policy[(row, col)] sys.stdout.write(Maze.VISUALS[action]) sys.stdout.write("\n") last_row = row_cells sys.stdout.flush() def print_values(self, v): for r, row_cells in enumerate(self._cells): print(" | ".join(["%5.2f" % v[(r, c)] if cell == Maze.EMPTY else " " for c, cell in enumerate(row_cells)]))Cerințe
Python
MAP_NAME = 'complex' #@param ['simple', 'complex', 'be_careful', 'suffer']gamma = 0.9 #@param {type: "slider", min: 0.0, max: 1.0, step: 0.1}max_delta = 1e-8 #@param {type:"float"}.Cerința 1
Implementați funcția policy_iteration pentru calculul politicii optime și a tabelului de utilitate așteptată pentru fiecare stare (celulă din grid) a labirintului.
Python
def policy_iteration(game: Maze) -> Tuple[Dict[Tuple[int, int], int], Dict[Tuple[int, int], float]]: v = {s: 0 for s in game.states} policy = {s: choice(game.actions) for s in game.states if not game.is_final(s)} return policy, vCerința 2
Implementați funcția value_iteration pentru calculul politicii optime și a tabelului de utilitate așteptată pentru fiecare stare (celulă din grid) a labirintului.
Python
def value_iteration(game: Maze) -> Tuple[Dict[Tuple[int, int], int], Dict[Tuple[int, int], float]]: v = {s: 0 for s in game.states} policy = {s: choice(game.actions) for s in game.states if not game.is_final(s)} return policy, vEvaluare
Python
#@title game = Maze(MAP_NAME)print("Policy iteration:")policy, v = policy_iteration(game)game.print_values(v)game.print_policy(policy)print("Value iteration:")policy, v = value_iteration(game)game.print_values(v)game.print_policy(policy)Output:
Text
Policy iteration: | | | | | | | | | | | | | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | | | | | | | | | | | | | 0.00 | 0.00 | | | | | | | | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | | | | | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | | | | | | | 0.00 | | | | | | | | | | | 0.00 | | | | | | | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | | | | | | | | | | | | | | | | | | | ┏━━━━━━━━━━━┓┃→←←↓↓→↑←↑←↑┗┓┃A╺━━━━┳┳┳┓←↑┗━━━━┓┃B→←→←→┣╋╋┫↑→↑↑←↓↓┃┣┳┳┳┳┓↑┗┻┻┻━━━━━╸→┃┣╋╋╋╋┫↓←↓↑→←←↑↓↓←→┃┗┻┻┻┻┻━━━━━━━━━━━━┛Value iteration: | | | | | | | | | | | | | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | | | | | | | | | | | | | 0.00 | 0.00 | | | | | | | | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | | | | | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | | | | | | | 0.00 | | | | | | | | | | | 0.00 | | | | | | | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | 0.00 | | | | | | | | | | | | | | | | | | | ┏━━━━━━━━━━━┓┃↓↓↓↓↑↓→→↓←→┗┓┃A╺━━━━┳┳┳┓↑←┗━━━━┓┃B→↑↓←→┣╋╋┫→→↑→↓↑↑┃┣┳┳┳┳┓↓┗┻┻┻━━━━━╸↓┃┣╋╋╋╋┫←→↓←↓↑←↑↓↑←↑┃┗┻┻┻┻┻━━━━━━━━━━━━┛Policy Iteration Visual Demo
Python
import osimport sysimport pygamefrom random import choicefrom functools import reducefrom copy import copy# ConstantsCELL_SIZE = 160 FONT_SIZE = 18 ARROW_WIDTH = 6 # Thicker arrow lineARROW_HEAD = 10 # Arrowhead sizeVALUE_COLOR = (0, 0, 180)REWARD_COLOR = (0, 0, 0)POS_FINAL_COLOR = (100, 255, 100)NEG_FINAL_COLOR = (255, 100, 100)ARROW_COLOR = (0, 0, 0)# VALUE_COLOR = (0, 0, 128)BG_COLOR = (0, 0, 0)WALL_COLOR = (50, 50, 50)FPS = 30gamma = 0.99max_delta = 1e-3EQUATION_FONT_SIZE = 22EQUATION_COLOR_BELLMAN = (255, 255, 255) # WhiteEQUATION_COLOR_POLICY = (255, 255, 0) # Yellowclass Maze: NORTH, EAST, SOUTH, WEST = 0, 1, 2, 3 # actions DYNAMICS = { # the stochastic effects of actions NORTH: {(0, -1): 0.1, (-1, 0): .8, (0, 1): .1}, EAST: {(-1, 0): 0.1, (0, 1): .8, (1, 0): .1}, SOUTH: {(0, 1): 0.1, (1, 0): .8, (0, -1): .1}, WEST: {(1, 0): 0.1, (0, -1): .8, (-1, 0): .1}, } WALL, EMPTY = "x", " " VISUALS = { (0, 0, 1, 1): "\N{BOX DRAWINGS HEAVY DOWN AND RIGHT}", (1, 0, 0, 1): "\N{BOX DRAWINGS HEAVY DOWN AND LEFT}", (1, 0, 1, 0): "\N{BOX DRAWINGS HEAVY HORIZONTAL}", (0, 1, 1, 0): "\N{BOX DRAWINGS HEAVY UP AND RIGHT}", (1, 1, 0, 0): "\N{BOX DRAWINGS HEAVY UP AND LEFT}", (0, 1, 0, 1): "\N{BOX DRAWINGS HEAVY VERTICAL}", (1, 1, 1, 1): "\N{BOX DRAWINGS HEAVY VERTICAL AND HORIZONTAL}", (1, 1, 1, 0): "\N{BOX DRAWINGS HEAVY UP AND HORIZONTAL}", (1, 1, 0, 1): "\N{BOX DRAWINGS HEAVY VERTICAL AND LEFT}", (1, 0, 1, 1): "\N{BOX DRAWINGS HEAVY DOWN AND HORIZONTAL}", (0, 1, 1, 1): "\N{BOX DRAWINGS HEAVY VERTICAL AND RIGHT}", (1, 0, 0, 0): "\N{BOX DRAWINGS HEAVY LEFT}", (0, 1, 0, 0): "\N{BOX DRAWINGS HEAVY UP}", (0, 0, 1, 0): "\N{BOX DRAWINGS HEAVY RIGHT}", (0, 0, 0, 1): "\N{BOX DRAWINGS HEAVY DOWN}", (0, 0, 0, 0): "\N{BOX DRAWINGS HEAVY VERTICAL AND HORIZONTAL}", WEST: "\N{LEFTWARDS ARROW}", NORTH: "\N{UPWARDS ARROW}", EAST: "\N{RIGHTWARDS ARROW}", SOUTH: "\N{DOWNWARDS ARROW}", } def __init__(self, map_name): self._rewards, self._cells = {}, [] with open(os.path.join("maps", map_name), "r") as map_file: for line in map_file.readlines(): if ":" in line: name, value = line.strip().split(":") self._rewards[name] = float(value) else: self._cells.append(list(line.strip())) self._states = [(i, j) for i, row in enumerate(self._cells) for j, cell in enumerate(row) if cell != Maze.WALL] @property def actions(self): return [Maze.NORTH, Maze.EAST, Maze.SOUTH, Maze.WEST] @property def states(self): return copy(self._states) def is_final(self, state): row, col = state return self._cells[row][col] != Maze.EMPTY def effects(self, state, action): if self.is_final(state): return [] row, col = state next_states = {} for (d_row, d_col), prob in Maze.DYNAMICS[action].items(): next_row, next_col = row + d_row, col + d_col if self._cells[next_row][next_col] == Maze.WALL: next_row, next_col = row, col if (next_row, next_col) in next_states: prev_prob, _ = next_states[(next_row, next_col)] prob += prev_prob cell = self._cells[next_row][next_col] reward = self._rewards["default" if cell == Maze.EMPTY else cell] next_states[(next_row, next_col)] = (prob, reward) return [(s, p, r) for s, (p, r) in next_states.items()] def print_policy(self, policy): last_row = [] height = len(self._cells) for row, row_cells in enumerate(self._cells): width = len(row_cells) for col, cell in enumerate(row_cells): if cell == Maze.WALL: north, south, west, east = 0, 0, 0, 0 if last_row and len(last_row) > col: north = last_row[col] == Maze.WALL if row + 1 < height: south = self._cells[row + 1][col] == Maze.WALL if col > 0: west = row_cells[col - 1] == Maze.WALL if col + 1 < width: east = row_cells[col + 1] == Maze.WALL sys.stdout.write(Maze.VISUALS[(west, north, east, south)]) elif self.is_final((row, col)): sys.stdout.write(cell) else: action = policy[(row, col)] sys.stdout.write(Maze.VISUALS[action]) sys.stdout.write("\n") last_row = row_cells sys.stdout.flush() def print_values(self, v): for r, row_cells in enumerate(self._cells): print(" | ".join(["%5.2f" % v[(r, c)] if cell == Maze.EMPTY else " " for c, cell in enumerate(row_cells)]))class VisualPolicyIteration: def __init__(self, game: Maze): pygame.init() self.game = game self.v = {s: 0 for s in game.states} self.policy = {s: choice(game.actions) for s in game.states if not game.is_final(s)} self.width = len(game._cells[0]) self.height = len(game._cells) # self.screen = pygame.display.set_mode((CELL_SIZE * self.width, CELL_SIZE * self.height)) total_height = self.height * CELL_SIZE + 80 # extra for equations total_width = self.width * CELL_SIZE self.screen = pygame.display.set_mode((total_width, total_height)) pygame.display.set_caption("Policy Iteration Teaching Tool") self.font = pygame.font.SysFont("monospace", FONT_SIZE) self.clock = pygame.time.Clock() self.done = False self.iteration = 0 self.equation_font = pygame.font.SysFont("Arial", EQUATION_FONT_SIZE) def draw_grid(self): self.screen.fill(BG_COLOR) for row in range(self.height): for col in range(self.width): x, y = col * CELL_SIZE, row * CELL_SIZE rect = pygame.Rect(x, y, CELL_SIZE, CELL_SIZE) cell = self.game._cells[row][col] if cell == Maze.WALL: pygame.draw.rect(self.screen, WALL_COLOR, rect) else: state = (row, col) is_final = self.game.is_final(state) # Determine background color if is_final: reward = self.game._rewards.get(cell, 0) color = POS_FINAL_COLOR if reward > 0 else NEG_FINAL_COLOR pygame.draw.rect(self.screen, color, rect) else: pygame.draw.rect(self.screen, (255, 255, 255), rect) # Border pygame.draw.rect(self.screen, (0, 0, 0), rect, 1) # Draw policy arrow if not is_final: action = self.policy[state] self._draw_arrow(action, x + CELL_SIZE // 2, y + CELL_SIZE // 2) # Draw value if state in self.v: val_text = f"{self.v[state]:.2f}" val_surface = self.font.render(val_text, True, VALUE_COLOR) val_rect = val_surface.get_rect(center=(x + CELL_SIZE // 2, y + CELL_SIZE - 22)) self.screen.blit(val_surface, val_rect) # Draw reward in final state if is_final and cell in self.game._rewards: r_text = f"R={self.game._rewards[cell]:.0f}" r_surface = self.font.render(r_text, True, REWARD_COLOR) r_rect = r_surface.get_rect(center=(x + CELL_SIZE // 2, y + 20)) self.screen.blit(r_surface, r_rect) # Bellman update equation rendering bellman_eq = "V(s) <-- Σ_s' T(s, π(s), s') [ R(s, π(s), s') + γ·V(s') ]" bellman_surf = self.equation_font.render(bellman_eq, True, EQUATION_COLOR_BELLMAN) bellman_rect = bellman_surf.get_rect(midtop=(self.screen.get_width() // 2, self.height * CELL_SIZE + 10)) self.screen.blit(bellman_surf, bellman_rect) # Policy update equation rendering policy_eq = "π(s) <-- argmax_a Σ_s' T(s,a,s') [ R(s,a,s') + γ·V(s') ]" policy_surf = self.equation_font.render(policy_eq, True, EQUATION_COLOR_POLICY) policy_rect = policy_surf.get_rect(midtop=(self.screen.get_width() // 2, bellman_rect.bottom + 10)) self.screen.blit(policy_surf, policy_rect) # Draw the screen pygame.display.flip() # inside _draw_arrow method def _draw_arrow(self, action, cx, cy): length = CELL_SIZE // 3 if action == Maze.NORTH: end = (cx, cy - length) head1 = (cx - ARROW_HEAD, cy - length + ARROW_HEAD) head2 = (cx + ARROW_HEAD, cy - length + ARROW_HEAD) elif action == Maze.SOUTH: end = (cx, cy + length) head1 = (cx - ARROW_HEAD, cy + length - ARROW_HEAD) head2 = (cx + ARROW_HEAD, cy + length - ARROW_HEAD) elif action == Maze.EAST: end = (cx + length, cy) head1 = (cx + length - ARROW_HEAD, cy - ARROW_HEAD) head2 = (cx + length - ARROW_HEAD, cy + ARROW_HEAD) elif action == Maze.WEST: end = (cx - length, cy) head1 = (cx - length + ARROW_HEAD, cy - ARROW_HEAD) head2 = (cx - length + ARROW_HEAD, cy + ARROW_HEAD) pygame.draw.line(self.screen, ARROW_COLOR, (cx, cy), end, ARROW_WIDTH) pygame.draw.polygon(self.screen, ARROW_COLOR, [end, head1, head2]) def step_evaluation(self): delta = 0 new_v = {} for s in filter(lambda s: not self.game.is_final(s), self.game.states): v_old = self.v[s] new_v[s] = reduce(lambda x, y: x + y, map(lambda e: e[1] * (e[2] + gamma * self.v[e[0]]), self.game.effects(s, self.policy[s]))) delta = max(delta, abs(new_v[s] - v_old)) self.v.update(new_v) return delta def run_full_evaluation(self): while True: delta = self.step_evaluation() if delta < max_delta: break def improve_policy(self): stable = True for s in filter(lambda s: not self.game.is_final(s), self.game.states): a_old = self.policy[s] self.policy[s] = max( map(lambda a: (a, reduce(lambda x, y: x + y, map(lambda e: e[1] * (e[2] + gamma * self.v[e[0]]), self.game.effects(s, a)))), self.game.actions), key=lambda i: i[1])[0] if a_old != self.policy[s]: stable = False return stable def run(self): running = True while running: self.clock.tick(FPS) self.draw_grid() for event in pygame.event.get(): if event.type == pygame.QUIT: running = False elif event.type == pygame.KEYDOWN: if event.key == pygame.K_s: # Step evaluation self.step_evaluation() elif event.key == pygame.K_e: # Full evaluation self.run_full_evaluation() elif event.key == pygame.K_u: # Policy improvement self.improve_policy() elif event.key == pygame.K_f: # Full policy iteration self.run_full_evaluation() while not self.improve_policy(): self.run_full_evaluation() pygame.quit()if __name__ == "__main__": game = Maze("volcano-crossing") # replace with your actual maze filename vis = VisualPolicyIteration(game) vis.run()