import gymnasium as gym
import numpy as np
from tqdm import tqdm
from gymnasium.wrappers import RecordEpisodeStatistics, RecordVideo
import matplotlib.pyplot as plt
import copy
import matplotlib.pyplot as plt
import seaborn as sns
from mpl_toolkits.mplot3d import Axes3D
from IPython.display import YouTubeVideo, Video, Image
The Under-powered Mountain Car¶
07 April 2025
rng = np.random.default_rng()
How things all start¶
Video("Videos/agent-tiny-alpha-episode-0.mp4", embed=True)
The Goal¶
vid = YouTubeVideo('Ot-0cz7ji6Y', width=800, height=500, start=87)
display(vid)
Progress in 2000 episodes¶
Video("Videos/agent-episode-2000.mp4", embed=True)
8000 episodes: Peak behavior¶
Video("Videos/agent-episode-8000.mp4", embed=True)
ENV = gym.make("MountainCar-v0", render_mode="rgb_array", max_episode_steps=1000, goal_velocity=0.07)
ENV = RecordEpisodeStatistics(ENV)
Problem¶
State: position $x$, velocity $\dot{x}$
Action: forward throttle $1$, still $0$, reverse throttle $-1$
A continuous state space, with real numbers instead of just a grid!
Solution¶
Construct a feature space that maps to the continuous state space and use a weight vector $\mathbf{w}$ to weight the features
$q: S \times A \times \mathbb{R}^d \rightarrow \mathbb{R}$
The action is included because I use SARSA (State Action Reward State Action) and thus must measure value by action, not just by state
obs_space = ENV.observation_space
x_pos_bounds = obs_space.low[0], obs_space.high[0]
x_vel_bounds = obs_space.low[1], obs_space.high[1]
Tile Coding: Defining the tiles in our feature vector $\mathbf{x}$¶
Image(filename='tilings.png')
We have a k = 2 dim space.
Recommended number of tilings is then 2^n >= 4k, which is n >= root(8). So any number of tilings n >= 3 is recommended. Example 10.1 uses n=8, which we will do.
That means each tiling is an 8x8 grid. 8 of them means there are d = 8 x 8 x 8 = 512 tiles, or features.
These are not square tiles, because the continuous spaces are bounded differently. We will say $s_0 = x$ and $s_1 = \dot{x}$. With that
$$-1.2 <= s_0 <= 0.6$$ $$-0.07 <= s_1 <= 0.07$$
Each tile covers 1/8th the bounded distance, so the widths are: $$w_0 = \frac{0.6-(-1.2)}{8} = 0.225$$ $$w_1 = \frac{0.07-(-0.07)}{8} = 0.0175$$
How to Offset Tiling¶
The fundamental units of the feature space (dimensions of non-overlapping tile space) are: $$\frac{w_0}{n} = \frac{0.225}{8} = 0.028125$$ $$\frac{w_1}{n} = \frac{0.0175}{8} = 0.0021875$$
Every state now lights up the tiles in the $(\frac{w_0}{n},\frac{w_1}{n})$ unit uniquely, such that moving the state in any single cartesion direction creates a different feature space, and thus a different action-value.
But how do you actually offset the tiles?
Image(filename='./offsets.png')
Finally, we can calculate the displacement vector for each tiling. Because k=2, we take the first 2 odd integers, $(1,3)$ as our "unit" displacement. Then we calculate our final displacement vector with the unit $(\frac{w_0}{n},\frac{w_1}{n})$: $$(1,3) \times (\frac{w_0}{n},\frac{w_1}{n})$$ $$(1,3) \times (0.028125,0.0021875)$$ $$(0.028125,0.0065625)$$
That is our displacement vector with which to calculate all tilings.
x_pos_tile_width = (x_pos_bounds[1] - x_pos_bounds[0])/8
print(f"x_pos_tile_width \n {x_pos_tile_width}")
x_vel_tile_width = (x_vel_bounds[1] - x_vel_bounds[0])/8
print(f"x_vel_tile_width \n {x_vel_tile_width}")
x_pos_tile_width 0.22500000894069672 x_vel_tile_width 0.017500000074505806
First we start with the base tiling. "Base" because it starts at what we start this tiling at the start of the bounds.
x_pos_tiling = np.arange(x_pos_bounds[0], x_pos_bounds[1], x_pos_tile_width)
x_vel_tiling = np.arange(x_vel_bounds[0], x_vel_bounds[1], x_vel_tile_width)
print(f"x_pos_tiling \n {x_pos_tiling}")
print(f"y_vel_tiling \n {x_vel_tiling}")
x_pos_tiling [-1.20000005 -0.97500002 -0.75 -0.52499998 -0.29999995 -0.07499993 0.1500001 0.37500012] y_vel_tiling [-7.00000003e-02 -5.25000021e-02 -3.50000039e-02 -1.75000057e-02 -7.45058060e-09 1.74999908e-02 3.49999890e-02 5.24999872e-02]
Our first 2d tiling:
tiling = np.stack((x_pos_tiling,x_vel_tiling), axis=1)
tiling
array([[-1.20000005e+00, -7.00000003e-02], [-9.75000024e-01, -5.25000021e-02], [-7.50000000e-01, -3.50000039e-02], [-5.24999976e-01, -1.75000057e-02], [-2.99999952e-01, -7.45058060e-09], [-7.49999285e-02, 1.74999908e-02], [ 1.50000095e-01, 3.49999890e-02], [ 3.75000119e-01, 5.24999872e-02]])
Now we calculate our displacement:
k = 2
n = 8
unit_displacement = np.array([1,3])
displacement = unit_displacement * np.array([x_pos_tile_width/n, x_vel_tile_width/n])
displacement
array([0.028125 , 0.0065625])
Then create all tilings. While technically there are 8 tilings of 64 squares (8x8) we really are just storing the start intervals of all tilings. Again, actually being "on or off" is handled in a bit.
multipliers = np.arange(0,n)
tilings = tiling + multipliers[:, None, None] * displacement
print(tilings.shape)
(8, 8, 2)
Tiling interval starts for all 8 tilings are now:
[tiling[0] for tiling in tilings]
[array([-1.20000005, -0.07 ]), array([-1.17187505, -0.0634375 ]), array([-1.14375005, -0.056875 ]), array([-1.11562504, -0.0503125 ]), array([-1.08750004, -0.04375 ]), array([-1.05937504, -0.0371875 ]), array([-1.03125004, -0.030625 ]), array([-1.00312504, -0.0240625 ])]
Tiling interval ends:
tilings
array([[[-1.20000005e+00, -7.00000003e-02], [-9.75000024e-01, -5.25000021e-02], [-7.50000000e-01, -3.50000039e-02], [-5.24999976e-01, -1.75000057e-02], [-2.99999952e-01, -7.45058060e-09], [-7.49999285e-02, 1.74999908e-02], [ 1.50000095e-01, 3.49999890e-02], [ 3.75000119e-01, 5.24999872e-02]], [[-1.17187505e+00, -6.34375003e-02], [-9.46875023e-01, -4.59375021e-02], [-7.21874999e-01, -2.84375038e-02], [-4.96874975e-01, -1.09375056e-02], [-2.71874951e-01, 6.56249258e-03], [-4.68749274e-02, 2.40624908e-02], [ 1.78125096e-01, 4.15624890e-02], [ 4.03125120e-01, 5.90624872e-02]], [[-1.14375005e+00, -5.68750002e-02], [-9.18750022e-01, -3.93750020e-02], [-6.93749998e-01, -2.18750038e-02], [-4.68749974e-01, -4.37500561e-03], [-2.43749950e-01, 1.31249926e-02], [-1.87499262e-02, 3.06249908e-02], [ 2.06250098e-01, 4.81249890e-02], [ 4.31250121e-01, 6.56249872e-02]], [[-1.11562504e+00, -5.03125002e-02], [-8.90625020e-01, -3.28125020e-02], [-6.65624997e-01, -1.53125038e-02], [-4.40624973e-01, 2.18749442e-03], [-2.15624949e-01, 1.96874926e-02], [ 9.37507488e-03, 3.71874908e-02], [ 2.34375099e-01, 5.46874891e-02], [ 4.59375123e-01, 7.21874873e-02]], [[-1.08750004e+00, -4.37500002e-02], [-8.62500019e-01, -2.62500020e-02], [-6.37499996e-01, -8.75000376e-03], [-4.12499972e-01, 8.74999445e-03], [-1.87499948e-01, 2.62499927e-02], [ 3.75000760e-02, 4.37499909e-02], [ 2.62500100e-01, 6.12499891e-02], [ 4.87500124e-01, 7.87499873e-02]], [[-1.05937504e+00, -3.71875002e-02], [-8.34375018e-01, -1.96875019e-02], [-6.09374994e-01, -2.18750373e-03], [-3.84374971e-01, 1.53124945e-02], [-1.59374947e-01, 3.28124927e-02], [ 6.56250771e-02, 5.03124909e-02], [ 2.90625101e-01, 6.78124891e-02], [ 5.15625125e-01, 8.53124873e-02]], [[-1.03125004e+00, -3.06250001e-02], [-8.06250017e-01, -1.31250019e-02], [-5.81249993e-01, 4.37499629e-03], [-3.56249969e-01, 2.18749945e-02], [-1.31249946e-01, 3.93749927e-02], [ 9.37500782e-02, 5.68749909e-02], [ 3.18750102e-01, 7.43749891e-02], [ 5.43750126e-01, 9.18749874e-02]], [[-1.00312504e+00, -2.40625001e-02], [-7.78125016e-01, -6.56250189e-03], [-5.53124992e-01, 1.09374963e-02], [-3.28124968e-01, 2.84374945e-02], [-1.03124944e-01, 4.59374927e-02], [ 1.21875079e-01, 6.34374910e-02], [ 3.46875103e-01, 8.09374892e-02], [ 5.71875127e-01, 9.84374874e-02]]])
[tiling[len(tiling)-1] for tiling in tilings]
[array([0.37500012, 0.05249999]), array([0.40312512, 0.05906249]), array([0.43125012, 0.06562499]), array([0.45937512, 0.07218749]), array([0.48750012, 0.07874999]), array([0.51562512, 0.08531249]), array([0.54375013, 0.09187499]), array([0.57187513, 0.09843749])]
Now we design a function to ask what the feature vector looks like given a certain state vector (a.k.a. which featurs are "on" or "off" given the total state)
def get_features(s,a):
"""
Let it be known that this is an attempt at a possibly vectorized 2d tiling impl
that is BUGGY. It requires much larger alpha to not get stuck in training somewhere
due to an issue with tiling holes.
"""
s0, s1 = s
print(tilings.shape)
s0_features = (s0 >= tilings[:,:,0]) & ((s0 - x_pos_tile_width) <= tilings[:,:,0])
s0_features[np.where(np.all(s0_features == False, axis=1)),0] = True # ceiling application for too far offset tiles
s0_features = np.where(s0_features)
s1_features = (s1 >= tilings[:,:,1]) & ((s1 - x_vel_tile_width) <= tilings[:,:,1])
s1_features[np.where(np.all(s1_features == False, axis=1)),0] = True # ceiling application for too far offset tiles
s1_features = np.where(s1_features)
print(s0_features)
print(s1_features)
# a * 512 + tile * 64 + s1 * 8 + s2
i = a * n*n*n + s1_features[0] * (n*n) + s1_features[1] * n + s0_features[1]
print(i)
x = np.zeros((n*n*n*3), dtype=int)
x[i] = 1
return x
Sample observation and response
s = np.array([0.6, 0.07])
print(f"State \n {s}")
get_features(s,0).shape
State [0.6 0.07] (8, 8, 2) (array([0, 1, 2, 3, 4, 5, 6, 7]), array([7, 7, 7, 7, 7, 7, 7, 7])) (array([0, 1, 2, 3, 4, 5, 6, 7]), array([0, 7, 7, 6, 6, 6, 5, 5])) [ 7 127 191 247 311 375 431 495]
(1536,)
Actual Action-Value Equation¶
Now we create a linear action-value function of the car's state: $$q = \textbf{w}^\top\textbf{x}(s,a) = \sum_{i=1}^{d} w_i x_i(s)$$ We already have the vector $\textbf{x}$ defined, so we just need a vector $\textbf{w}$ that represents the weights associated to each feature. This is trivial, as it is 1D vector of the same size as $\textbf{x}$.
Now we build our agent that learns how to reach the goal. We will use semi-gradient SARSA (semi-gradient because the gradient is built off a bootstrapped, or esptimated, understanding of the full return).
class Agent:
def __init__(self, env):
self.env = env
self.epsilon = 0.1
self.n = 8 # number of tiles per dimension
self.alpha = 1/(2*n)
self.gamma = 1.0
self.data = []
self.w = np.zeros(3*n*n*n)
obs_space = env.observation_space
self.x_pos_bounds = obs_space.low[0], obs_space.high[0]
self.x_vel_bounds = obs_space.low[1], obs_space.high[1]
self.x_pos_tile_width = (self.x_pos_bounds[1] - self.x_pos_bounds[0]) / self.n
self.x_vel_tile_width = (self.x_vel_bounds[1] - self.x_vel_bounds[0]) / self.n
displacement = np.array([1, 3])
self.tilings = []
for i in range(self.n):
offset = i * (displacement * np.array([self.x_pos_tile_width, self.x_vel_tile_width]) / self.n)
x_offsets = np.linspace(self.x_pos_bounds[0] + offset[0],
self.x_pos_bounds[1] + offset[0] - self.x_pos_tile_width,
self.n)
v_offsets = np.linspace(self.x_vel_bounds[0] + offset[1],
self.x_vel_bounds[1] + offset[1] - self.x_vel_tile_width,
self.n)
grid = np.array(np.meshgrid(x_offsets, v_offsets)).T # shape (n, n, 2)
self.tilings.append(grid)
self.tilings = np.array(self.tilings) # shape (num_tilings, n, n, 2)
def _policy(self, s):
if rng.random() < self.epsilon:
return self.env.action_space.sample()
else:
qs = np.array([self._q(s,a) for a in [0,1,2]])
return np.argmax(qs)
def _q(self, s, a):
return np.sum(self.w[np.where(self._x(s, a))])
def _x(self, s, a):
"""This solution works but is extremely slow
Please use the tiling provided at
http://incompleteideas.net/tiles/tiles3.html
for an extremely fast hashing solution
"""
x = np.zeros(self.n * self.n * self.n * 3, dtype=int)
s0, s1 = s
for f in range(self.n):
tiling = self.tilings[f] # shape (n, n, 2)
for i in range(self.n):
for j in range(self.n):
tile_pos = tiling[i, j]
if (tile_pos[0] <= s0 < tile_pos[0] + self.x_pos_tile_width and
tile_pos[1] <= s1 < tile_pos[1] + self.x_vel_tile_width):
idx = a * self.n * self.n * self.n + f * self.n * self.n + i * self.n + j
x[idx] = 1
break
else:
continue
break
return x
def data(self):
return self.data
def learn(self, episodes=10000):
episode_tracker = [0,300, 1000, 5000, 9000]
self.epsilon = 0.3
for ep in tqdm(range(0,episodes)):
S = self.env.reset()[0]
A = self._policy(S)
truncated = False
terminated = False
while not truncated and not terminated:
S_p, R, terminated, truncated, info = self.env.step(A)
if terminated:
self.w += self.alpha * (R - self._q(S, A)) * self._x(S,A)
else:
A_p = self._policy(S_p)
self.w += self.alpha * (R + self.gamma * self._q(S_p, A_p) - self._q(S, A)) * self._x(S,A)
S = S_p
A = A_p
if truncated or terminated:
if ep in episode_tracker:
info["episode"]["q"] = self._cost_to_go()
self.data = np.append(self.data, info)
def _cost_to_go(self):
self.epsilon = 0
x_vals = np.linspace(-1.2, 0.6, num=50) # position
xdot_vals = np.linspace(-0.07, 0.07, num=50) # velocity
grid = np.array([[x, xdot] for x in x_vals for xdot in xdot_vals])
z = np.array([
max(self._q(state, a) for a in [0, 1, 2])
for state in grid
])
return z
EPISODES=10000
trigger = lambda t: t % (EPISODES/5) == 0
#trigger = False
video_folder = "./Videos"
RecordedEnv = RecordVideo(ENV, video_folder=video_folder, name_prefix="agent", episode_trigger=trigger)
agent = Agent(RecordedEnv)
agent.learn(EPISODES)
100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10000/10000 [18:08<00:00, 9.19it/s]
zs = np.array([d["episode"]["q"] for d in agent.data if "q" in d["episode"]])
def plot_cost_to_go_contour(agent, x_range=(-1.2, 0.6), xdot_range=(-0.07, 0.07), resolution=50):
x_vals = np.linspace(*x_range, resolution)
xdot_vals = np.linspace(*xdot_range, resolution)
X, Y = np.meshgrid(x_vals, xdot_vals)
Z = np.zeros_like(X)
fig = plt.figure(figsize=(18, 10))
episode_tracker = [0,300, 1000, 5000, 9000]
for idx in range(0,zs.shape[0]):
Z = np.zeros_like(X)
for i in range(resolution):
for j in range(resolution):
s = (X[i, j], Y[i, j])
Z[i, j] = -1 * zs[idx][i * resolution + j]
ax = fig.add_subplot(2, 3, idx + 1, projection='3d')
surf = ax.plot_surface(X, Y, Z, cmap='viridis', edgecolor='none')
ax.set_xlabel('Position')
ax.set_ylabel('Velocity')
ax.set_zlabel('Cost-to-Go')
ax.set_title(f'Episode {episode_tracker[idx]}')
plt.tight_layout()
plt.show()
Cost-to-go Over Training¶
plot_cost_to_go_contour(agent)
def plot_rewards():
rewards = [r["episode"]["r"] for r in agent.data]
rewards = np.convolve(rewards, np.ones(10)/10, mode='valid')
plt.plot(range(len(rewards)), rewards, label="Rewards")
# Labels
plt.xlabel("Episodes")
plt.ylabel("Rewards")
plt.title("Rewards per Episode")
plt.legend() # Show legend
# Show the plot
plt.show()
Rewards Per Episode¶
plot_rewards()