packages = ['numpy']

Exercise 8
Exploration with ergodic control

In this exercise, we will use ergodic control as an exploration mechanism for a point mass agent.

The goal is to analyze how ergodic control can be used to find a small object with an unknown location but with a prior knowledge on the possible location of this object, in the form of a distribution. The location of the object is made visible to you, but is unknown to the robot. The time took by ergodic control to find this object is shown below the animation.

Ergodic control uses a prior information on the location of the object in the form of a probability distribution. In this exercise, a mixture of Gaussians is used as distribution (represented as pink ellipsoids to represent the contours of two standard deviations).

Ergodic control computes control commands that will generate a movement so that the agent will spend time in the different location of the workspace in proportion to the given target spatial distribution. Namely, if we would discretize the workspace and count the number of times the agent passed in each cell of this grid, the controller will ensure that over time, the resulting histogram will match the desired spatial distribution (optimal coverage).

  • Change the Gaussian parameters param.nbGaussian, param.Mu, param.Sigma and the initial position of the agent param.x0 (point in black in the animation). Observe the resulting search behavior.
  • Set the target position param.target to be a sample from the mixture of Gaussians (with equal mixing coefficients), the means param.Mu and the covariance matrices param.Sigma.
    Hint: to sample from a mixture of Gaussians, you can first uniformly sample between 1 and param.nbGaussian to first choose the Gaussian component to sample from. You can then draw a random sample from this Gaussian distibution.

param.x0 = np.array([.2, .3]) param.nbGaussian = 2 param.Mu = np.ones((param.nbVar,param.nbGaussian)) * .5 # Implement here param.Sigma = np.zeros((param.nbVar,param.nbVar,param.nbGaussian)) # Implement here # Sampling from GMM to define the target param.target = np.array([.5, .5]) # implement here update_ergodic_control()
param.x0 = np.array([.2, .3]) param.nbGaussian = 2 param.Mu = np.zeros((param.nbVar,param.nbGaussian)) param.Sigma = np.zeros((param.nbVar,param.nbVar,param.nbGaussian)) for i in range(param.nbGaussian): param.Mu[:,i] = np.random.uniform(0.1,0.9,param.nbVar) sigma_v = np.random.uniform(-1.0, 1.0, param.nbVar) sigma_scale = np.random.uniform(0, 0.1, 1) sigma_regularization = np.random.uniform(0, 0.01, 1) sigma_v = sigma_v / np.linalg.norm(sigma_v) param.Sigma[:,:,i] = np.outer(sigma_v,sigma_v) * sigma_scale + sigma_regularization # Sampling from GMM to define the target gaussian_id = np.random.choice(np.arange(0,param.nbGaussian)) param.target = np.random.multivariate_normal(param.Mu[:,gaussian_id],param.Sigma[:,:,gaussian_id]) param.target = np.clip(param.target, 0.01, 0.99) # Target within [0,1] update_ergodic_control()




def print(x): display(x, target='output') from pyodide.ffi import create_proxy from js import Path2D, document, console import numpy as np import asyncio from math import sqrt ## Parameters # =============================== param = lambda: None # Lazy way to define an empty class in python param.dt = 1e-2 # Time step length param.nbFct = 10 # Number of basis functions along x and y param.nbVar = 2 # Dimension of datapoints param.nbGaussian = 2 # Number of Gaussians to represent the spatial distribution param.target = np.array([.5, .5]) param.target_radius = .03 param.x0 = np.array([.2, .3]) # Initial point canvas = document.getElementById('canvas') ctx = canvas.getContext('2d') cost_el = document.getElementById('cost') def hadamard_matrix(n: int) -> np.ndarray: if n == 1: return np.array([[1]]) # Recursively construct a Hadamard matrix of size n/2 half_size = n // 2 h_half = hadamard_matrix(half_size) # Construct a matrix of ones with size n/2 ones_matrix = np.ones((half_size, half_size), dtype=int) # Construct a matrix of minus ones with size n/2 minus_ones_matrix = -1 * ones_matrix # Combine the four sub-matrices to form a Hadamard matrix of size n h = np.empty((n, n), dtype=int) for i in range(half_size): h[i] = np.concatenate((h_half[i], ones_matrix[i])) h[i + half_size] = np.concatenate((h_half[i], minus_ones_matrix[i])) return h def line_segment_and_circle_intersect(cx, cy, radius, x1, y1, x2, y2): # First, we find the equation of the line that passes through the two points (x1, y1) and (x2, y2) # The equation of a line in the form y = mx + b is given by: # y - y1 = m(x - x1) # We can solve for m as follows: m = (y2 - y1) / ((x2 - x1)+1e-30) # The equation of the line can then be written as: # y = mx - mx1 + y1 # We can solve for b as follows: b = y1 - m * x1 # The distance between a point (x0, y0) and a line y = mx + b is given by: # distance = abs(y0 - mx0 - b) / sqrt(m**2 + 1) distance = abs(cy - m * cx - b) / sqrt(m**2 + 1) # If the distance is greater than the radius of the circle, the line segment and the circle do not intersect if distance > radius: return False else: # If the distance is less than the radius, we need to check if one of the endpoints of the line segment is inside the circle d1 = sqrt((cx - x1)**2 + (cy - y1)**2) d2 = sqrt((cx - x2)**2 + (cy - y2)**2) return d1 <= radius or d2 <= radius def clear_screen(): ctx.setTransform(canvas.width, 0, 0, -canvas.height, 0, canvas.height) ctx.fillStyle = 'white' ctx.fillRect(0, 0, 1, 1) cost_el.textContent = '' def draw_Gaussian(id, param, color, color2): ctx.setTransform(canvas.width, 0, 0, -canvas.height, 0, canvas.height) ctx.translate(param.Mu[0,id], param.Mu[1,id]) s, U = np.linalg.eig(param.Sigma[:2, :2, id]) # Draw Gaussian al = np.linspace(-np.pi, np.pi, 50) D = np.diag(s) * 2 # Draw contours with two standard deviations R = np.real(U @ np.sqrt(D+0j)) msh = (R @ np.array([np.cos(al), np.sin(al)])).T #+ param.Mu[:2,id] ctx.lineWidth = '0.01' ctx.fillStyle = color ctx.strokeStyle = color2 ctx.beginPath() ctx.moveTo(msh[0,0], msh[0,1]) for i in range(msh.shape[0]-1): ctx.lineTo(msh[i+1,0], msh[i+1,1]) ctx.closePath() ctx.fill() ctx.stroke() def draw_scene(param): clear_screen() # Draw initial point ctx.setTransform(canvas.width, 0, 0, -canvas.height, 0, canvas.height) ctx.fillStyle = 'black' ctx.lineWidth = '0.01' ctx.beginPath() ctx.arc(param.x0[0], param.x0[1], 0.006, 0, 2*np.pi) ctx.fill() # Draw Gaussians for k in range(param.nbGaussian): draw_Gaussian(k, param, '#FF3399', '#DD1177') # Draw target object ctx.setTransform(canvas.width, 0, 0, -canvas.height, 0, canvas.height) obj = Path2D.new() obj.arc(param.target[0], param.target[1], param.target_radius, 0, 2*np.pi) ctx.fillStyle = '#3399FF' ctx.fill(obj) return obj def errorHandler(e): msg = 'Error: ' + str(e) console.error(msg) el = document.getElementById('errors') el.innerText = msg def ergodic_control_command(x, t, wt, param): # Depends on the current position only here, outputs: dphi, phix, phiy ang = x[:,np.newaxis] * rg * omega phi1 = np.cos(ang) #Eq.(18) dphi1 = -np.sin(ang) * np.tile(rg,(param.nbVar,1)) * omega phix = phi1[0,xx-1].flatten() phiy = phi1[1,yy-1].flatten() dphix = dphi1[0,xx-1].flatten() dphiy = dphi1[1,yy-1].flatten() dphi = np.vstack([[dphix * phiy], [phix * dphiy]]) # Depends on wt, wt starts with zeros, then updates wt = wt + (phix * phiy).T / (L**param.nbVar) # Depends on dphi, wt, w_hat, t u = -dphi @ (Lambda * (wt/(t+1) - w_hat)) # Eq.(24) u = u * u_max / (np.linalg.norm(u)+u_norm_reg) # Velocity command return u, wt def update_ergodic_control(): global w_hat, wt, obj, x, t, found_flag, param Alpha = np.ones(param.nbGaussian) / param.nbGaussian # mixing coeffs. Priors ## Compute Fourier series coefficients w_hat of desired spatial distribution w_hat = np.zeros(param.nbFct**param.nbVar) for j in range(param.nbGaussian): for n in range(op.shape[1]): MuTmp = np.diag(op[:,n]) @ param.Mu[:,j] SigmaTmp = np.diag(op[:,n]) @ param.Sigma[:,:,j] @ np.diag(op[:,n]).T cos_term = np.cos(kk.T @ MuTmp) exp_term = np.exp(np.diag(-.5 * kk.T @ SigmaTmp @ kk)) w_hat = w_hat + Alpha[j] * cos_term * exp_term w_hat = w_hat / (L**param.nbVar) / (op.shape[1]) t = 0 found_flag = 0 wt = np.zeros(param.nbFct**param.nbVar) param.x0 = np.clip(param.x0, 0.01, 0.99) # x0 should be within [0,1] x = param.x0.copy() obj = draw_scene(param) ######################################################################################### # Gaussian centers param.Mu = np.zeros((param.nbVar,param.nbGaussian)) param.Mu[:,0] = np.array([.5, .7]) param.Mu[:,1] = np.array([.6, .3]) # Gaussian covariances # direction vectors Sigma1_v = np.array([.3,.1]) Sigma2_v = np.array([.1,.2]) # scale Sigma1_scale = 5E-1 Sigma2_scale = 3E-1 # regularization Sigma1_regularization = np.eye(param.nbVar)*5E-3 Sigma2_regularization = np.eye(param.nbVar)*1E-2 param.Sigma = np.zeros((param.nbVar,param.nbVar,param.nbGaussian)) # construct the cov. matrix using the outer product param.Sigma[:,:,0] = np.outer(Sigma1_v,Sigma1_v) * Sigma1_scale + Sigma1_regularization param.Sigma[:,:,1] = np.outer(Sigma2_v,Sigma2_v) * Sigma2_scale + Sigma2_regularization # mixing coeffs (sums to one) Alpha = np.ones(param.nbGaussian) / param.nbGaussian # mixing coeffs. Priors # Domain limit for each dimension (considered to be 1 for each dimension in this implementation) xlim = [0, 1] L = (xlim[1] - xlim[0]) * 2 # Size of [-xlim(2),xlim(2)] omega = 2 * np.pi / L u_max = 1E1 # Maximum speed allowed u_norm_reg = 1E-1 # not sure what is this not to divide by zero? # Range rg = np.arange(0, param.nbFct, dtype=float) KX = np.zeros((param.nbVar, param.nbFct, param.nbFct)) KX[0,:,:], KX[1,:,:] = np.meshgrid(rg, rg) # Weighting vector (Eq.(16)) sp = (param.nbVar + 1) / 2 # Sobolev norm parameter Lambda = np.array(KX[0,:].flatten()**2 + KX[1,:].flatten()**2 + 1).T**(-sp) op = hadamard_matrix(2**(param.nbVar-1)) op = np.array(op) kk = KX.reshape(param.nbVar,param.nbFct**2)*omega wt = np.zeros(param.nbFct**param.nbVar) w_hat = np.zeros(param.nbFct**param.nbVar) xx, yy = np.meshgrid(np.arange(1,param.nbFct+1), np.arange(1,param.nbFct+1)) x = param.x0.copy() obj = draw_scene(param) update_ergodic_control() ctx.setTransform(canvas.width, 0, 0, -canvas.height, 0, canvas.height) async def main(): global t, wt, x, found_flag, cost_el t = 0 while True: t += 1 u, wt = ergodic_control_command(x, t, wt, param) x_prev = x.copy() x += u * param.dt # Update of position # Draw ergodic control path ctx.lineWidth = '0.005' ctx.strokeStyle = 'rgba(0, 0, 0, 0.3)' ctx.beginPath() ctx.moveTo(x_prev[0], x_prev[1]) ctx.lineTo(x[0], x[1]) ctx.stroke() if line_segment_and_circle_intersect(param.target[0], param.target[1], \ param.target_radius, x_prev[0], x_prev[1], x[0], x[1]) and found_flag==0: cost_el.textContent = 'Target found in ' + '%.1f' % (t*param.dt) + ' seconds' found_flag = 1 await asyncio.sleep(1E-6) pyscript.run_until_complete(main())