Source code for uxsim.Utilities.Utilities

"""
Submodule for general utilities.
This contains functions that are not essential for simulation but useful to specific analysis.
"""
import networkx as nx
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.csgraph import dijkstra

[docs] def generate_grid_network(W, imax, jmax, **kwargs): """ Generate a grid network with imax x jmax nodes. Parameters ---------- W : World The world object to which the network will be added. imax : int The number of nodes in the x direction. jmax : int The number of nodes in the y direction. **kwargs : dict Additional keyword arguments to be passed to the addLink function. """ # Define the scenario #deploy nodes as an imax x jmax grid nodes = dict() for i in range(imax): for j in range(jmax): nodes[i,j] = W.addNode(f"n{(i,j)}", i, j, flow_capacity=1.6) #create links between neighborhood nodes links = dict() for i in range(imax): for j in range(jmax): if i != imax-1: links[i,j,i+1,j] = W.addLink(f"l{(i,j,i+1,j)}", nodes[i,j], nodes[i+1,j], **kwargs) if i != 0: links[i,j,i-1,j] = W.addLink(f"l{(i,j,i-1,j)}", nodes[i,j], nodes[i-1,j], **kwargs) if j != jmax-1: links[i,j,i,j+1] = W.addLink(f"l{(i,j,i,j+1)}", nodes[i,j], nodes[i,j+1], **kwargs) if j != 0: links[i,j,i,j-1] = W.addLink(f"l{(i,j,i,j-1)}", nodes[i,j], nodes[i,j-1], **kwargs) return nodes, links
[docs] def enumerate_k_shortest_routes(W, source, target, k=1, cost_function=lambda l: l.length/l.u, print_stats=0, return_cost=False): """ Enumerate the k shortest routes between two nodes in a network. By default, `enumerate_k_shortest_routes(W, "O", "D")` returns the shortest path from node "O" to "D" based on the free-flow travel time. Parameters ---------- W : World The world object containing the network. source : str | Node The source node. target : str | Node The target node. k : int The number of shortest routes to enumerate. Default is 1. cost_function : function A link cost function to compute shortest path. The argument is Link object. Default is the free-flow travel time, `lambda l: l.length/l.u`. print_stats : bool Print the statistics of the paths. return_cost : bool Return the cost of the paths. Returns ------- routes : list A list of k shortest routes. Each route is a list of link names. costs : list A list of costs of the routes if return_cost is True. """ G = nx.DiGraph() for l in W.LINKS: G.add_edge(l.start_node.name, l.end_node.name, weight=cost_function(l)) k_shortest_paths = list(nx.shortest_simple_paths(G, W.get_node(source).name, W.get_node(target).name, weight='weight'))[:k] routes = [] costs = [] for path in k_shortest_paths: route = [] for n in path[:-1]: for l in W.LINKS: if l.start_node.name == n and l.end_node.name == path[path.index(n)+1]: route.append(l.name) path_weight = sum([G[u][v]['weight'] for u, v in zip(path[:-1], path[1:])]) if print_stats: print(f"Route: {route}, Cost: {path_weight}") routes.append(route) costs.append(path_weight) if return_cost: return routes, costs else: return routes
[docs] def enumerate_k_shortest_routes_on_t(W, source, target, t, k=1, cost_function=lambda l, t: l.instant_travel_time(t), print_stats=0, return_cost=False): """ Enumerate the k shortest routes between two nodes in a network. By default, `enumerate_k_shortest_routes_on_t(W, "O", "D", t=t)` returns the shortest path from node "O" to "D" based on instantanious travel time on time t. Parameters ---------- W : World The world object containing the network. source : str | Node The source node. target : str | Node The target node. t : float The time point to compute shortest routes. k : int The number of shortest routes to enumerate. Default is 1. cost_function : function A link cost function to compute shortest path. The argument is Link object and time t. Default is the instantaneous travel time, `lambda l, t: l.instant_travel_time(t)`. print_stats : bool Print the statistics of the paths. return_cost : bool Return the cost of the paths. Returns ------- routes : list A list of k shortest routes. Each route is a list of link names. costs : list A list of costs of the routes if return_cost is True. """ G = nx.DiGraph() for l in W.LINKS: G.add_edge(l.start_node.name, l.end_node.name, weight=cost_function(l, t)) k_shortest_paths = list(nx.shortest_simple_paths(G, W.get_node(source).name, W.get_node(target).name, weight='weight'))[:k] routes = [] costs = [] for path in k_shortest_paths: route = [] for n in path[:-1]: for l in W.LINKS: if l.start_node.name == n and l.end_node.name == path[path.index(n)+1]: route.append(l.name) path_weight = sum([G[u][v]['weight'] for u, v in zip(path[:-1], path[1:])]) if print_stats: print(f"Route: {route}, Cost: {path_weight}") routes.append(route) costs.append(path_weight) if return_cost: return routes, costs else: return routes
[docs] def get_shortest_path_distance_between_all_nodes(W, return_matrix=False): """ Get the shortest distances (in meters) between all node pairs based on link lengths Parameters ---------- W : World The World object. return_matrix : bool, optional Whether to return the distance matrix as a numpy array. Default is False. Returns ------- dict or numpy array Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False. Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True. """ num_nodes = len(W.NODES) distances = np.full((num_nodes, num_nodes), np.inf) # Initialize with infinity # Fill in the distances based on the link lengths for link in W.LINKS: i = link.start_node.id j = link.end_node.id distances[i, j] = min(distances[i, j], link.length) # Use Dijkstra algorithm to compute shortest distances distances = dijkstra(csr_matrix(distances), directed=True, return_predecessors=False) if return_matrix == True: return distances else: distances_dict = dict() for node1 in W.NODES: for node2 in W.NODES: distances_dict[node1, node2] = distances[node1.id, node2.id] distances_dict[node1.name, node2.name] = distances[node1.id, node2.id] return distances_dict
[docs] def get_shortest_path_instantaneous_travel_time_between_all_nodes(W, return_matrix=False): """ Get the shortest instantaneous travel time (in seconds) between all node pairs based on the current instantaneous travel time of each link. Parameters ---------- W : World The World object. return_matrix : bool, optional Whether to return the distance matrix as a numpy array. Default is False. Returns ------- dict or numpy array Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False. Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True. """ distances = W.ROUTECHOICE.dist if return_matrix == True: return distances else: distances_dict = dict() for node1 in W.NODES: for node2 in W.NODES: distances_dict[node1, node2] = distances[node1.id, node2.id] distances_dict[node1.name, node2.name] = distances[node1.id, node2.id] return distances_dict
[docs] def get_shortest_path_instantaneous_travel_time_between_all_nodes_on_t(W, t, return_time=False, return_matrix=False): """ Get the shortest instantaneous travel time (in seconds) between all node pairs based on the instantaneous travel time of each link near time t (based on the route choice update interval). Parameters ---------- W : World The World object. t : float The time point to compute shortest travel time. return_time : bool, optional Whether to return the actual time of computing shortest path cost. Default is False. return_matrix : bool, optional Whether to return the distance matrix as a numpy array. Default is False. Returns ------- dict or numpy array or tuple Returns a dictionary of distances between nodes whose key is node pair if `return_matrix` is False and `return_time` is False. Returns a numpy array of distances between nodes whose index is node.id pair if `return_matrix` is True and `return_time` is False. Returns a tuple of the abode distances and the actual time of computing shortest path cost if `return_time` is True. """ if t >= W.TMAX: t = W.TMAX-W.DELTAT duo_t = t//W.DUO_UPDATE_TIME duo_timestep = int(duo_t*W.DUO_UPDATE_TIME/W.DELTAT) distances = W.ROUTECHOICE.dist_record[duo_timestep] if return_matrix == True: ret = distances else: distances_dict = dict() for node1 in W.NODES: for node2 in W.NODES: distances_dict[node1, node2] = distances[node1.id, node2.id] distances_dict[node1.name, node2.name] = distances[node1.id, node2.id] ret = distances_dict if return_time == True: return ret, duo_t else: return ret