Submodule for general utilities.
This contains functions that are not essential for simulation but useful to specific analysis.
import networkx as nx
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.csgraph import dijkstra

[docs] def generate_grid_network(W, imax, jmax, **kwargs): """ Generate a grid network with imax x jmax nodes. Parameters ---------- W : World The world object to which the network will be added. imax : int The number of nodes in the x direction. jmax : int The number of nodes in the y direction. **kwargs : dict Additional keyword arguments to be passed to the addLink function. """ # Define the scenario #deploy nodes as an imax x jmax grid nodes = dict() for i in range(imax): for j in range(jmax): nodes[i,j] = W.addNode(f"n{(i,j)}", i, j, flow_capacity=1.6) #create links between neighborhood nodes links = dict() for i in range(imax): for j in range(jmax): if i != imax-1: links[i,j,i+1,j] = W.addLink(f"l{(i,j,i+1,j)}", nodes[i,j], nodes[i+1,j], **kwargs) if i != 0: links[i,j,i-1,j] = W.addLink(f"l{(i,j,i-1,j)}", nodes[i,j], nodes[i-1,j], **kwargs) if j != jmax-1: links[i,j,i,j+1] = W.addLink(f"l{(i,j,i,j+1)}", nodes[i,j], nodes[i,j+1], **kwargs) if j != 0: links[i,j,i,j-1] = W.addLink(f"l{(i,j,i,j-1)}", nodes[i,j], nodes[i,j-1], **kwargs) return nodes, links
[docs] def enumerate_k_shortest_routes(W, source, target, k=1, cost_function=lambda l: l.length/l.u, print_stats=0, return_cost=False): """ Enumerate the k shortest routes between two nodes in a network. By default, `enumerate_k_shortest_routes(W, "O", "D")` returns the shortest path from node "O" to "D" based on the free-flow travel time. Parameters ---------- W : World The world object containing the network. source : str | Node The source node. target : str | Node The target node. k : int The number of shortest routes to enumerate. Default is 1. cost_function : function A link cost function to compute shortest path. The argument is Link object. Default is the free-flow travel time, `lambda l: l.length/l.u`. print_stats : bool Print the statistics of the paths. return_cost : bool Return the cost of the paths. Returns ------- routes : list A list of k shortest routes. Each route is a list of link names. costs : list A list of costs of the routes if return_cost is True. """ G = nx.DiGraph() for l in W.LINKS: G.add_edge(,, weight=cost_function(l)) k_shortest_paths = list(nx.shortest_simple_paths(G, W.get_node(source).name, W.get_node(target).name, weight='weight'))[:k] routes = [] costs = [] for path in k_shortest_paths: route = [] for n in path[:-1]: for l in W.LINKS: if == n and == path[path.index(n)+1]: route.append( path_weight = sum([G[u][v]['weight'] for u, v in zip(path[:-1], path[1:])]) if print_stats: print(f"Route: {route}, Cost: {path_weight}") routes.append(route) costs.append(path_weight) if return_cost: return routes, costs else: return routes
[docs] def enumerate_k_shortest_routes_on_t(W, source, target, t, k=1, cost_function=lambda l, t: l.instant_travel_time(t), print_stats=0, return_cost=False): """ Enumerate the k shortest routes between two nodes in a network. By default, `enumerate_k_shortest_routes_on_t(W, "O", "D", t=t)` returns the shortest path from node "O" to "D" based on instantanious travel time on time t. Parameters ---------- W : World The world object containing the network. source : str | Node The source node. target : str | Node The target node. t : float The time point to compute shortest routes. k : int The number of shortest routes to enumerate. Default is 1. cost_function : function A link cost function to compute shortest path. The argument is Link object and time t. Default is the instantaneous travel time, `lambda l, t: l.instant_travel_time(t)`. print_stats : bool Print the statistics of the paths. return_cost : bool Return the cost of the paths. Returns ------- routes : list A list of k shortest routes. Each route is a list of link names. costs : list A list of costs of the routes if return_cost is True. """ G = nx.DiGraph() for l in W.LINKS: G.add_edge(,, weight=cost_function(l, t)) k_shortest_paths = list(nx.shortest_simple_paths(G, W.get_node(source).name, W.get_node(target).name, weight='weight'))[:k] routes = [] costs = [] for path in k_shortest_paths: route = [] for n in path[:-1]: for l in W.LINKS: if == n and == path[path.index(n)+1]: route.append( path_weight = sum([G[u][v]['weight'] for u, v in zip(path[:-1], path[1:])]) if print_stats: print(f"Route: {route}, Cost: {path_weight}") routes.append(route) costs.append(path_weight) if return_cost: return routes, costs else: return routes
[docs] def get_shortest_path_distance_between_all_nodes(W, return_matrix=False): """ Get the shortest distances (in meters) between all node pairs based on link lengths Parameters ---------- W : World The World object. return_matrix : bool, optional Whether to return the distance matrix as a numpy array. Default is False. Returns ------- dict or numpy array Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False. Returns a numpy array of distances between nodes whose index is pair if `return_matrix` is True. """ num_nodes = len(W.NODES) distances = np.full((num_nodes, num_nodes), np.inf) # Initialize with infinity # Fill in the distances based on the link lengths for link in W.LINKS: i = j = distances[i, j] = min(distances[i, j], link.length) # Use Dijkstra algorithm to compute shortest distances distances = dijkstra(csr_matrix(distances), directed=True, return_predecessors=False) if return_matrix == True: return distances else: distances_dict = dict() for node1 in W.NODES: for node2 in W.NODES: distances_dict[node1, node2] = distances[,] distances_dict[,] = distances[,] return distances_dict
[docs] def get_shortest_path_instantaneous_travel_time_between_all_nodes(W, return_matrix=False): """ Get the shortest instantaneous travel time (in seconds) between all node pairs based on the current instantaneous travel time of each link. Parameters ---------- W : World The World object. return_matrix : bool, optional Whether to return the distance matrix as a numpy array. Default is False. Returns ------- dict or numpy array Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False. Returns a numpy array of distances between nodes whose index is pair if `return_matrix` is True. """ distances = W.ROUTECHOICE.dist if return_matrix == True: return distances else: distances_dict = dict() for node1 in W.NODES: for node2 in W.NODES: distances_dict[node1, node2] = distances[,] distances_dict[,] = distances[,] return distances_dict
[docs] def get_shortest_path_instantaneous_travel_time_between_all_nodes_on_t(W, t, return_time=False, return_matrix=False): """ Get the shortest instantaneous travel time (in seconds) between all node pairs based on the instantaneous travel time of each link near time t (based on the route choice update interval). Parameters ---------- W : World The World object. t : float The time point to compute shortest travel time. return_time : bool, optional Whether to return the actual time of computing shortest path cost. Default is False. return_matrix : bool, optional Whether to return the distance matrix as a numpy array. Default is False. Returns ------- dict or numpy array or tuple Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False and `return_time` is False. Returns a numpy array of distances between nodes whose index is pair if `return_matrix` is True and `return_time` is False. Returns a tuple of the abode distances and the actual time of computing shortest path cost if `return_time` is True. """ if t >= W.TMAX: t = W.TMAX-W.DELTAT duo_t = t//W.DUO_UPDATE_TIME duo_timestep = int(duo_t*W.DUO_UPDATE_TIME/W.DELTAT) distances = W.ROUTECHOICE.dist_record[duo_timestep] if return_matrix == True: ret = distances else: distances_dict = dict() for node1 in W.NODES: for node2 in W.NODES: distances_dict[node1, node2] = distances[,] distances_dict[,] = distances[,] ret = distances_dict if return_time == True: return ret, duo_t else: return ret