Source code for uxsim.OSMImporter.OSMImporter

"""
OpenStreetMap importer using OSMnx.
Work in progress. Import from OSM is experimental and may not work as expected. It is functional but may produce inappropriate networks for simulation, such as too many nodes, too many deadends, fragmented networks.


Examples
--------
Import highway in Tokyo:
    >>> from uxsim.OSMImporter import OSMImporter
    >>> ... #define World object W
    >>> nodes, links = OSMImporter.import_osm_data(north=35.817, south=35.570, east=139.881, west=139.583, custom_filter='["highway"~"motorway"]')
    >>> nodes, links = OSMImporter.osm_network_postprocessing(nodes, links, node_merge_threshold=0.005, node_merge_iteration=5, >>> enforce_bidirectional=True)  # merge threshold distance: 0.005 degree ~= 500 m. `enforce_bidirectional` makes all links bidirectional, so >>> that network is not fragmented (but the original network topology is not preserved rigorously).
    >>> OSMImporter.osm_network_visualize(nodes, links, show_link_name=0)
    >>> OSMImporter.osm_network_to_World(W, nodes, links, default_jam_density=0.2, coef_degree_to_meter=111000)
    >>> ... #add demand
    >>> W.exec_simulation()
"""


import matplotlib.pyplot as plt
import math
import warnings

[docs] class OSMImporter: """ OpenStreetMap importer using OSMnx. Work in progress. Import from OSM is experimental and may not work as expected. It is functional but may produce inappropriate networks for simulation, such as too many nodes, too many deadends, fragmented networks. """
[docs] def import_osm_data(north=None, south=None, east=None, west=None, bbox=None, custom_filter='["highway"~"trunk|primary"]', default_number_of_lanes_mortorway=3, default_number_of_lanes_trunk=3, default_number_of_lanes_primary=2, default_number_of_lanes_secondary=2, default_number_of_lanes_residential=1, default_number_of_lanes_tertiary=1, default_number_of_lanes_others=1, default_maxspeed_mortorway=100, default_maxspeed_trunk=60, default_maxspeed_primary=50, default_maxspeed_secondary=50, default_maxspeed_residential=30, default_maxspeed_tertiary=30, default_maxspeed_others=30): """ Import road network data from OpenStreetMap using OSMnx. Parameters ---------- north, south, east, west: float The latitudes and longitudes of the area to be imported. bbox: list The bounding box of the area to be imported. The order is [north, south, east, west]. This is prioritized than north, south, east, west arguments. custom_filter: str The filter to be used for importing the data. The default is '["highway"~"trunk|primary"]', which means that only trunk and primary roads (usually correspond to major arterial roads) are imported. default_number_of_lanes_*: int The default number of lanes for * {road_type}. default_maxspeed_*: float The default maximum speed for * {road_type}. Returns ------- links: list A list of links, where each element is a list of [name, from, to, lanes, maxspeed]. nodes: dict A dictionary of nodes, where the key is the node ID and the value is a list of [node_id, x, y]. """ #experimental warning print("WARNING: Import from OSM is experimental and may not work as expected. It is functional but produces inappropriate networks for simulation, such as too many nodes, too many deadends, fragmented networks.") warnings.warn("Import from OSM is experimental and may not work as expected. It is functional but produces inappropriate networks for simulation, such as too many nodes, too many deadends, fragmented networks.") try: import osmnx as ox except: raise ImportError("Optional module 'osmnx' is not installed. Please install it by 'pip install osmnx' to use this function.") print("Start downloading OSM data. This may take some time.") if bbox is not None: try: G = ox.graph.graph_from_bbox(bbox=bbox, network_type="drive", custom_filter=custom_filter) except TypeError: #version issue? warnings.warn("OSMnx version may be too old. Update is recommended.") G = ox.graph.graph_from_bbox(north=bbox[0], south=bbox[1], east=bbox[2], west=bbox[3], network_type="drive", custom_filter=custom_filter) else: G = ox.graph.graph_from_bbox(north=north, south=south, east=east, west=west, network_type="drive", custom_filter=custom_filter) print("Download completed") """ motorway: 高速道路 trunk: 一般国道 primary: 主要地方道(2桁番号県道・市道) secondary: 一般地方道(3桁番号県道・市道) """ # データ抽出 node_dict = {} for n in G.nodes: nd = G.nodes[n] node_dict[n]=[n, nd["x"], nd["y"]] links = [] nodes = {} for e in G.edges: ed = G.get_edge_data(e[0], e[1])[0] if "highway" in ed: road_type = ed["highway"] try: name = ed["name"] if type(name) == list: name = name[0] osmid = ed["osmid"] if type(osmid) == list: osmid = osmid[0] name += "-"+str(osmid) except: name = "" osmid = "" try: lanes = int(ed["lanes"]) except: try: if "mortorway" in road_type: lanes = default_number_of_lanes_mortorway elif "trunk" in road_type: lanes = default_number_of_lanes_trunk elif "primary" in road_type: lanes = default_number_of_lanes_primary elif "secondary" in road_type: lanes = default_number_of_lanes_secondary elif "residential" in road_type: lanes = default_number_of_lanes_residential elif "tertiary" in road_type: lanes = default_number_of_lanes_tertiary else: lanes = default_number_of_lanes_others except: lanes = default_number_of_lanes_others if lanes < 1: lanes = 1 try: maxspeed = float(ed["maxspeed"])/3.6 except: try: if "mortorway" in road_type: maxspeed = default_maxspeed_mortorway/3.6 elif "trunk" in road_type: maxspeed = default_maxspeed_trunk/3.6 elif "primary" in road_type: maxspeed = default_maxspeed_primary/3.6 elif "secondary" in road_type: maxspeed = default_maxspeed_secondary/3.6 elif "residential" in road_type: maxspeed = default_maxspeed_residential/3.6 elif "tertiary" in road_type: maxspeed = default_maxspeed_tertiary/3.6 else: maxspeed = default_maxspeed_others/3.6 except: maxspeed = default_maxspeed_others/3.6 links.append([name, e[0], e[1], lanes, maxspeed]) # name, from, to, number_of_lanes, maxspeed #links.append([name, e[0], e[1], 1, maxspeed]) # name, from, to, number_of_lanes, maxspeed nodes[e[0]] = node_dict[e[0]] nodes[e[1]] = node_dict[e[1]] nodes = list(nodes.values()) print("imported network size:") print(" number of links:", len(links)) print(" number of nodes:", len(nodes)) return nodes, links
[docs] def osm_network_postprocessing(nodes, links, node_merge_threshold, node_merge_iteration=5, enforce_bidirectional=False): """ Postprocess the network to make it suitable for simulation. First, it aggregates the network by merging nodes that are closer than the threshold. Second, if `enforce_bidirectional` is True, it adds reverse links for each link to eliminate deadend nodes as much as possible. Parameters ---------- nodes: list A list of nodes, where each element is a list of [node_id, x, y]. links: list A list of links, where each element is a list of [name, from, to, lanes, maxspeed]. node_merge_threshold: float If two nodes are connected by a link that is shorter than this threshold, the nodes are merged and the link is removed. node_merge_iteration: int The number of iterations for the node merge. enforce_bidirectional: bool True if you want to enforce bidirectional links. It will automatically add a reverse link for each link. This will eliminate deadend nodes as much as possible, but the original network topology is not preserved rigorously. Returns ------- nodes: list A list of nodes, where each element is a list of [node_id, x, y]. links: list A list of links, where each element is a list of [name, from, to, lanes, maxspeed]. """ #ネットワーク縮約:リンクベース def distance(pos1, pos2): return math.sqrt((pos1[0] - pos2[0])**2 + (pos1[1] - pos2[1])**2) for i in range(node_merge_iteration): # ノード座標の辞書を作成 node_positions = {node[0]: (node[1], node[2]) for node in nodes} # 長さが node_merge_threshold 以下のリンクを省略し、起点ノードと終点ノードを融合 #縮約対象になったノードのマッピング:keyが縮約元,valueが縮約先 delete_node_map = {} number_of_deleted_links = 0 for link in links: start_node, end_node = link[1], link[2] start_pos = node_positions[start_node] end_pos = node_positions[end_node] link_length = distance(start_pos, end_pos) #print(link_length, link[0], start_node, end_node) if link_length <= node_merge_threshold: #縮約先に登録されていなければ登録 if end_node not in delete_node_map.values() and start_node not in delete_node_map.keys(): delete_node_map[end_node] = start_node if start_node not in delete_node_map.values() and end_node not in delete_node_map.keys(): delete_node_map[start_node] = end_node number_of_deleted_links += 1 # 新しいノードリストとリンクリストを作成 new_nodes = [] new_links = {} number_of_deleted_nodes = 0 number_of_new_nodes = 0 for node in nodes: # for group in delete_node_groups: # if node[0] in delete_node_map.keys(): # break if node[0] in delete_node_map.keys(): number_of_deleted_nodes += 1 continue else: new_nodes.append(node) number_of_new_nodes += 1 for link in links: name = link[0] start_node = link[1] end_node = link[2] lanes = link[3] maxspeed = link[4] if start_node in delete_node_map.keys(): start_node = delete_node_map[start_node] if end_node in delete_node_map.keys(): end_node = delete_node_map[end_node] length = distance(node_positions[start_node], node_positions[end_node]) if start_node != end_node: new_links[start_node, end_node] = [name, start_node, end_node, lanes, maxspeed, length] if enforce_bidirectional: for link in list(new_links.values()): name = link[0] start_node = link[1] end_node = link[2] lanes = link[3] maxspeed = link[4] length = link[5] if (end_node, start_node) not in new_links: new_links[end_node, start_node] = [name+"-reverse", end_node, start_node, lanes, maxspeed, length] new_links = list(new_links.values()) # 孤立したノードの除去 used_nodes = set() for l in new_links: used_nodes.add(l[1]) used_nodes.add(l[2]) new_nodes_used = [] for n in new_nodes: if n[0] in used_nodes: new_nodes_used.append(n) new_nodes = new_nodes_used nodes = new_nodes links = new_links print("aggregated network size:") print(" number of links:", len(links)) print(" number of nodes:", len(nodes)) return nodes, links
[docs] def osm_network_visualize(nodes, links, figsize=(12,12), xlim=[None,None], ylim=[None,None], show_link_name=False, show_mode=1, save_mode=0, save_fname="osm_network.png"): """ Visualize the imported network. Mainly for test purpose. """ node_positions = {node[0]: (node[1], node[2]) for node in nodes} # グラフを描画 plt.figure(figsize=figsize) plt.subplot(111, aspect="equal") # ノードをプロット for node, pos in node_positions.items(): plt.plot(pos[0], pos[1], 'ro', markersize=2) # リンクをプロット for link in links: start_node, end_node = link[1], link[2] start_pos = node_positions[start_node] end_pos = node_positions[end_node] x1 = start_pos[0] y1 = start_pos[1] x2 = end_pos[0] y2 = end_pos[1] vx, vy = (y1-y2)*0.025, (x2-x1)*0.025 xmid1, ymid1 = (2*x1+x2)/3+vx, (2*y1+y2)/3+vy xmid2, ymid2 = (x1+2*x2)/3+vx, (y1+2*y2)/3+vy plt.plot([x1, xmid1, xmid2, x2], [y1, ymid1, ymid2, y2], 'b-', linewidth=1) if show_link_name: plt.text((x1+x2)/2, (y1+y2)/2, link[0], fontsize=8) plt.xlim(xlim) plt.ylim(ylim) if show_mode: plt.show() if save_mode: plt.savefig(save_fname) plt.close()
[docs] def osm_network_to_World(W, nodes, links, default_jam_density=0.2, coef_degree_to_meter=111000): """ Load the imported network to the World object of UXsim. Parameters ---------- nodes: list A list of nodes, where each element is a list of [node_id, x, y]. links: list A list of links, where each element is a list of [name, from, to, lanes, maxspeed, length]. default_jam_density: float The default jam density for the links. coef_degree_to_meter: float The coefficient to convert lon/lat degree to meter. Default is 111000. """ for i, node in enumerate(nodes): nname = str(node[0]) if nname in [n.name for n in W.NODES]: nname + f"_osm{i}" W.addNode(str(node[0]), x=node[1], y=node[2], auto_rename=True) for i, link in enumerate(links): lname = str(link[0]) if lname in [l.name for l in W.LINKS]: lname + f"_osm{i}" W.addLink(lname, str(link[1]), str(link[2]), length=link[5]*coef_degree_to_meter, free_flow_speed=link[4], jam_density_per_lane=default_jam_density, number_of_lanes=link[3], auto_rename=True)