Source code for uxsimpp.uxsimpp

import random
from collections.abc import Iterable
import numpy as np
import matplotlib.pyplot as plt
import sys

################################
# C++拡張モジュールからクラスと関数をインポート
from . import trafficppy

World = trafficppy.World
Link = trafficppy.Link
Node = trafficppy.Node
Vehicle = trafficppy.Vehicle
create_world = trafficppy.create_world
add_node = trafficppy.add_node
add_link = trafficppy.add_link
add_demand = trafficppy.add_demand

################################

from .analyzer import *
from .utils import *


#####################################################
## MARK: シナリオ定義関数

[docs] def newWorld(name="", tmax=3600, deltan=5, tau=1, duo_update_time=600, duo_update_weight=0.5, print_mode=True, random_seed=None, vehicle_detailed_log=1): """ Create a World (simulation environment). Parameters ---------- name : str, optional The name of the world, default is empty string. tmax : float, optional The simulation duration, default is 3600 seconds. deltan : int, optional The platoon size, default is 5 vehicles. tau : float, optional The reaction time, default is 1 second. duo_update_time : float, optional The time interval for route choice update, default is 600 seconds. duo_update_weight : float, optional The update weight for route choice, default is 0.5. print_mode : bool, optional Whether print the simulation progress or not, default is True. random_seed : int or None, optional The random seed, default is None. vehicle_detailed_log : int, optional Whether save vehicle data or not, default is 1. Returns ------- World World simulation object. """ if random_seed is None: random_seed = random.randint(0, 2**8) W = create_world( name, # world_name tmax, # t_max deltan, # delta_n tau, # tau duo_update_time, # duo_update_time duo_update_weight, # duo_update_weight 0, # route_choice_uncertainty int(print_mode), # print_mode random_seed, # random_seed vehicle_detailed_log, # vehicle_log_mode ) return W
[docs] def addNode(W, name, x, y, signal_intervals=[0], signal_offset=0): """ Add a node to the world. Parameters ---------- W : World The world to which the node belongs. name : str The name of the node. x : float The x-coordinate of the node. y : float The y-coordinate of the node. signal_intervals : list of float, optional A list representing the signal at the node, default is [0]. signal_offset : float, optional The offset of the signal, default is 0. Returns ------- Node The created node. """ add_node(W, name, x, y, signal_intervals, signal_offset) return W.get_node(name)
World.addNode = addNode World.addLink = addLink
[docs] def adddemand(W, origin, destination, start_time, end_time, flow, links_preferred_list=[]): """ Add demand (vehicle generation) to the world. Parameters ---------- W : World The world to which the demand belongs. origin : str or Node The origin node. destination : str or Node The destination node. start_time : float The start time of demand. end_time : float The end time of demand. flow : float The flow rate of vehicles. links_preferred_list : list of str or Link, optional The names of the links the vehicles prefer, default is empty list. """ if type(origin) == Node: origin = origin.name if type(destination) == Node: destination = destination.name for i in range(len(links_preferred_list)): if type(links_preferred_list[i]) == Link: links_preferred_list[i] = links_preferred_list[i].name add_demand(W, origin, destination, start_time, end_time, flow, links_preferred_list)
World.adddemand = adddemand Link.__repr__ = link__repr__
[docs] def node__repr__(s): return f"<Node `{s.name}`>"
Node.__repr__ = node__repr__
[docs] def veh__repr__(s): return f"<Vehicle `{s.name}`>"
Vehicle.__repr__ = veh__repr__ ##################################################### ## MARK: インスタンス-name-idの変換関数など World.Link_resolve = Link_resolve World.eq_Link = eq_Link #TODO: Link.__eq__とLink.__hash__をオーバーライドすべきか? ##################################################### ## MARK: シミュ実行関数
[docs] def exec_simulation(W, duration_t=-1, until_t=-1): """ Execute the simulation. Parameters ---------- W : World The world simulation object. duration_t : float, optional Duration to run simulation, default is -1 (until completion). until_t : float, optional Time to run simulation until, default is -1 (until completion). """ W.initialize_adj_matrix() W.main_loop(duration_t, until_t)
World.exec_simulation = exec_simulation ##################################################### ## MARK: 簡易状態分析関数
[docs] def inflow(l, t1, t2): return (l.arrival_curve[int(t2/l.W.delta_t)]-l.arrival_curve[int(t1/l.W.delta_t)])/(t2-t1)
Link.inflow = inflow
[docs] def outflow(l, t1, t2): return (l.departure_curve[int(t2/l.W.delta_t)]-l.departure_curve[int(t1/l.W.delta_t)])/(t2-t1)
Link.outflow = outflow World.link_inflow = link_inflow World.link_outflow = link_outflow ##################################################### ## MARK: 簡易可視化
[docs] def plot_cumcurves(l, col): plt.plot([t*l.W.delta_t for t in range(len(l.arrival_curve))], l.arrival_curve, color=col) plt.plot([t*l.W.delta_t for t in range(len(l.arrival_curve))], l.departure_curve, color=col)
##################################################### ## MARK: util
[docs] def eq_tol(val, check, rel_tol=0.1, abs_tol=0.0, print_mode=True): """ function for tests """ if check == 0 and abs_tol == 0: abs_tol = 0.1 if print_mode: print(val, check) return abs(val - check) <= abs(check*rel_tol) + abs_tol
[docs] def show_variables(): dir_list = [e for e in dir() if not e.startswith("_")] dir_list_World = [e for e in dir(World) if not e.startswith("_")] dir_list_Link = [e for e in dir(Link) if not e.startswith("_")] dir_list_Node = [e for e in dir(Node) if not e.startswith("_")] dir_list_Vehicle = [e for e in dir(Vehicle) if not e.startswith("_")] print("module:") for e in dir_list: print("\t", e) print("World:") for e in dir_list_World: print("\t", e) print("Node:") for e in dir_list_Node: print("\t", e) print("Link:") for e in dir_list_Link: print("\t", e) print("Vehicle:") for e in dir_list_Vehicle: print("\t", e)