Source code for uxsim.analyzer

"""
Analyzer for a UXsim simulation result.
This module is automatically loaded when you import the `uxsim` module.
"""

import numpy as np
import matplotlib.pyplot as plt
import glob, os, csv, time
import pandas as pd
from PIL import Image, ImageDraw, ImageFont
from PIL.Image import Resampling
from tqdm.auto import tqdm
from collections import defaultdict as ddict
from importlib.resources import files
from pathlib import Path
import io
from scipy.sparse.csgraph import floyd_warshall

from .utils import *

def load_font_data(font_path=None):
    if font_path is None:
        font_resource = files('uxsim.files').joinpath('HackGen-Regular.ttf')
        with font_resource.open('rb') as f:
            font_data = f.read()
    else:
        font_file = Path(font_path)
        with font_file.open('rb') as f:
            font_data = f.read()
    
    return font_data

# def load_font_data():
#     font_data = read_binary("uxsim.files", "HackGen-Regular.ttf")
#     return font_data

[docs] class Analyzer: """ Class for analyzing and visualizing a simulation result. """
[docs] def __init__(s, W, font_pillow=None, font_matplotlib=None): """ Create result analysis object. Parameters ---------- W : object The world to which this belongs. font_pillow : str, optional The path to the font file for Pillow. If not provided, the default font for English and Japanese is used. font_matplotlib : str, optional The font name for Matplotlib. If not provided, the default font for English and Japanese is used. """ s.W = W os.makedirs(f"out{s.W.name}", exist_ok=True) #基礎統計量 s.average_speed = 0 s.average_speed_count = 0 s.trip_completed = 0 s.trip_all = 0 s.total_travel_time = 0 s.average_travel_time = 0 #フラグ s.flag_edie_state_computed = 0 s.flag_trajectory_computed = 0 s.flag_pandas_convert = 0 s.flag_od_analysis = 0 # visualization data s.font_data = load_font_data(font_pillow) s.font_file_like = io.BytesIO(s.font_data) plt.rcParams["font.family"] = get_font_for_matplotlib(font_matplotlib)
[docs] def basic_analysis(s): """ Analyze basic stats. """ df = s.W.analyzer.od_to_pandas() s.trip_completed = np.sum(df["completed_trips"]) s.trip_all = np.sum(df["total_trips"]) s.total_distance_traveled = np.sum(df["average_distance_traveled_per_veh"]*df["total_trips"]) if s.trip_completed: s.total_travel_time = np.sum(df["completed_trips"]*df["average_travel_time"]) s.average_travel_time = s.total_travel_time/s.trip_completed s.total_delay = np.sum(df["completed_trips"]*(df["average_travel_time"]-df["free_travel_time"])) s.average_delay = s.total_delay/s.trip_completed else: s.total_travel_time = -1 s.average_travel_time = -1 s.total_delay = -1 s.average_delay = -1
[docs] def od_analysis(s): """ Analyze OD-specific stats: number of trips, number of completed trips, free-flow travel time, average travel time, its std, total distance traveled """ if s.flag_od_analysis: return 0 else: s.flag_od_analysis = 1 s.od_trips = ddict(lambda: 0) s.od_trips_comp = ddict(lambda: 0) s.od_tt_free = ddict(lambda: 0) s.od_tt = ddict(lambda: []) s.od_tt_ave = ddict(lambda: 0) s.od_tt_std = ddict(lambda: 0) s.od_dist = ddict(lambda: []) s.od_dist_total = ddict(lambda: 0) s.od_dist_ave = ddict(lambda: 0) s.od_dist_std = ddict(lambda: 0) s.od_dist_min = ddict(lambda: 0) dn = s.W.DELTAN #自由旅行時間と最短距離 adj_mat_time = np.zeros([len(s.W.NODES), len(s.W.NODES)]) adj_mat_dist = np.zeros([len(s.W.NODES), len(s.W.NODES)]) for link in s.W.LINKS: i = link.start_node.id j = link.end_node.id if s.W.ADJ_MAT[i,j]: adj_mat_time[i,j] = link.length/link.u adj_mat_dist[i,j] = link.length if link.capacity_in == 0: #流入禁止の場合は通行不可 adj_mat_time[i,j] = np.inf adj_mat_dist[i,j] = np.inf else: adj_mat_time[i,j] = np.inf adj_mat_dist[i,j] = np.inf dist_time = floyd_warshall(adj_mat_time) dist_space = floyd_warshall(adj_mat_dist) for veh in s.W.VEHICLES.values(): o = veh.orig d = veh.dest if d != None: s.od_trips[o,d] += dn veh_links = [rec[1] for rec in veh.log_t_link if hasattr(rec[1], "length")] veh_dist_traveled = sum([l.length for l in veh_links]) if veh.state == "run": veh_dist_traveled += veh.x veh.distance_traveled = veh_dist_traveled s.od_dist[o,d].append(veh.distance_traveled) if veh.travel_time != -1: s.od_trips_comp[o,d] += dn s.od_tt[o,d].append(veh.travel_time) for o,d in s.od_tt.keys(): s.od_tt_ave[o,d] = np.average(s.od_tt[o,d]) s.od_tt_std[o,d] = np.std(s.od_tt[o,d]) s.od_tt_free[o,d] = dist_time[o.id, d.id] s.od_dist_total[o,d] = np.sum(s.od_dist[o,d]) s.od_dist_min[o,d] = dist_space[o.id, d.id] s.od_dist_ave[o,d] = np.average(s.od_dist[o,d]) s.od_dist_std[o,d] = np.std(s.od_dist[o,d])
[docs] def compute_accurate_traj(s): """ Generate more complete vehicle trajectories for each link by extrapolating recorded trajectories. It is assumed that vehicles are in free-flow travel at the end of the link. """ if s.W.vehicle_logging_timestep_interval != 1: warnings.warn("vehicle_logging_timestep_interval is not 1. The trajecotries are not exactly accurate.", LoggingWarning) if s.flag_trajectory_computed: return 0 else: s.flag_trajectory_computed = 1 for veh in s.W.VEHICLES.values(): l_old = None for i in lange(veh.log_t): if veh.log_link[i] != -1: l = s.W.get_link(veh.log_link[i]) if l_old != l: l.tss.append([]) l.xss.append([]) l.ls.append(veh.log_lane[i]) l.cs.append(veh.color) l.names.append(veh.name) l_old = l l.tss[-1].append(veh.log_t[i]) l.xss[-1].append(veh.log_x[i]) for l in s.W.LINKS: #端部を外挿 for i in lange(l.xss): if len(l.xss[i]): if l.xss[i][0] != 0: x_remain = l.xss[i][0] if x_remain/l.u > s.W.DELTAT*0.01: l.xss[i].insert(0, 0) l.tss[i].insert(0, l.tss[i][0]-x_remain/l.u) if l.length-l.u*s.W.DELTAT <= l.xss[i][-1] < l.length: x_remain = l.length-l.xss[i][-1] if x_remain/l.u > s.W.DELTAT*0.01: l.xss[i].append(l.length) l.tss[i].append(l.tss[i][-1]+x_remain/l.u)
[docs] def compute_edie_state(s): """ Compute Edie's traffic state for each link. """ if s.flag_edie_state_computed: return 0 else: s.flag_edie_state_computed = 1 s.compute_accurate_traj() for l in s.W.LINKS: DELTAX = l.edie_dx DELTATE = l.edie_dt MAXX = l.length MAXT = s.W.TMAX dt = DELTATE dx = DELTAX tn = [[ddict(lambda: 0) for i in range(int(MAXX/DELTAX))] for j in range(int(MAXT/DELTATE))] dn = [[ddict(lambda: 0) for i in range(int(MAXX/DELTAX))] for j in range(int(MAXT/DELTATE))] l.k_mat = np.zeros([int(MAXT/DELTATE), int(MAXX/DELTAX)]) l.q_mat = np.zeros([int(MAXT/DELTATE), int(MAXX/DELTAX)]) l.v_mat = np.zeros([int(MAXT/DELTATE), int(MAXX/DELTAX)]) for v in lange(l.xss): for i in lange(l.xss[v][:-1]): i0 = l.names[v] x0 = l.xss[v][i] x1 = l.xss[v][i+1] t0 = l.tss[v][i] t1 = l.tss[v][i+1] if t1-t0 != 0: v0 = (x1-x0)/(t1-t0) else: #compute_accurate_traj()の外挿で極稀にt1=t0になったのでエラー回避(もう起きないはずだが念のため) v0 = 0 tt = int(t0//dt) xx = int(x0//dx) if v0 > 0: #残り xl0 = dx-x0%dx xl1 = x1%dx tl0 = xl0/v0 tl1 = xl1/v0 if tt < int(MAXT/DELTATE) and xx < int(MAXX/DELTAX): if xx == x1//dx: #(x,t) dn[tt][xx][i0] += x1-x0 tn[tt][xx][i0] += t1-t0 else: #(x+n, t) jj = int(x1//dx-xx+1) for j in range(jj): if xx+j < int(MAXX/DELTAX): if j == 0: dn[tt][xx+j][i0] += xl0 tn[tt][xx+j][i0] += tl0 elif j == jj-1: dn[tt][xx+j][i0] += xl1 tn[tt][xx+j][i0] += tl1 else: dn[tt][xx+j][i0] += dx tn[tt][xx+j][i0] += dx/v0 else: if tt < int(MAXT/DELTATE) and xx < int(MAXX/DELTAX): dn[tt][xx][i0] += 0 tn[tt][xx][i0] += t1-t0 for i in lange(tn): for j in lange(tn[i]): t = list(tn[i][j].values())*s.W.DELTAN d = list(dn[i][j].values())*s.W.DELTAN l.tn_mat[i,j] = sum(t) l.dn_mat[i,j] = sum(d) l.k_mat[i,j] = l.tn_mat[i,j]/DELTATE/DELTAX l.q_mat[i,j] = l.dn_mat[i,j]/DELTATE/DELTAX with np.errstate(invalid="ignore"): l.v_mat = l.q_mat/l.k_mat l.v_mat = np.nan_to_num(l.v_mat, nan=l.u)
[docs] @catch_exceptions_and_warn() def print_simple_stats(s, force_print=False): """ Prints basic statistics of simulation result. Parameters ---------- force_print : bool, optional print the stats regardless of the value of `print_mode` """ s.W.print("results:") s.W.print(f" average speed:\t {s.average_speed:.1f} m/s") s.W.print(" number of completed trips:\t", s.trip_completed, "/", s.trip_all) #s.W.print(" number of completed trips:\t", s.trip_completed, "/", len(s.W.VEHICLES)*s.W.DELTAN) if s.trip_completed > 0: s.W.print(f" average travel time of trips:\t {s.average_travel_time:.1f} s") s.W.print(f" average delay of trips:\t {s.average_delay:.1f} s") s.W.print(f" delay ratio:\t\t\t {s.average_delay/s.average_travel_time:.3f}") s.W.print(f" total distance traveled:\t {s.total_distance_traveled:.1f} m") if force_print == 1 and s.W.print_mode == 0: print("results:") print(f" average speed:\t {s.average_speed:.1f} m/s") print(" number of completed trips:\t", s.trip_completed, "/", s.trip_all) #print(" number of completed trips:\t", s.trip_completed, "/", len(s.W.VEHICLES)*s.W.DELTAN) if s.trip_completed > 0: print(f" average travel time of trips:\t {s.average_travel_time:.1f} s") print(f" average delay of trips:\t {s.average_delay:.1f} s") print(f" delay ratio:\t\t\t {s.average_delay/s.average_travel_time:.3f}") print(f" total distance traveled:\t {s.total_distance_traveled:.1f} m")
def comp_route_travel_time(s, t, route): pass
[docs] @catch_exceptions_and_warn() def time_space_diagram_traj(s, links=None, figsize=(12,4), plot_signal=True, xlim=None, ylim=None): """ Draws the time-space diagram of vehicle trajectories for vehicles on specified links. Parameters ---------- links : list of link or link, optional The names of the links for which the time-space diagram is to be plotted. If None, the diagram is plotted for all links in the network. Default is None. figsize : tuple of int, optional The size of the figure to be plotted, default is (12,4). plot_signal : bool, optional Plot the downstream signal red light. """ if s.W.vehicle_logging_timestep_interval != 1: warnings.warn("vehicle_logging_timestep_interval is not 1. The plot is not exactly accurate.", LoggingWarning) s.compute_accurate_traj() #対象がlistであればOKで,単一な場合にはlistに変換する.未指定であれば全部にする. if links == None: links = s.W.LINKS try: iter(links) if type(links) == str: links = [links] except TypeError: links = [links] for lll in links: s.time_space_diagram_traj_links(linkslist=[lll], figsize=figsize, plot_signal=plot_signal, xlim=xlim, ylim=ylim)
[docs] @catch_exceptions_and_warn() def time_space_diagram_density(s, links=None, figsize=(12,4), plot_signal=True, xlim=None, ylim=None): """ Draws the time-space diagram of traffic density on specified links. Parameters ---------- links : list of link or link, optional The names of the links for which the time-space diagram is to be plotted. If None, the diagram is plotted for all links in the network. Default is None. figsize : tuple of int, optional The size of the figure to be plotted, default is (12,4). plot_signal : bool, optional Plot the downstream signal red light. """ if s.W.vehicle_logging_timestep_interval != 1: warnings.warn("vehicle_logging_timestep_interval is not 1. The plot is not exactly accurate.", LoggingWarning) #リンク密度の時空間図 s.W.print(" drawing traffic states...") s.compute_edie_state() #対象がlistであればOKで,単一な場合にはlistに変換する.未指定であれば全部にする. if links == None: links = s.W.LINKS try: iter(links) if type(links) == str: links = [links] except TypeError: links = [links] s.compute_edie_state() for lll in tqdm(links, disable=(s.W.print_mode==0)): l = s.W.get_link(lll) plt.figure(figsize=figsize) plt.title(l) plt.imshow(l.k_mat.T, origin="lower", aspect="auto", extent=(0, int(s.W.TMAX/l.edie_dt)*l.edie_dt, 0, int(l.length/l.edie_dx)*l.edie_dx), interpolation="none", vmin=0, vmax=1/l.delta, cmap="inferno") if plot_signal: signal_log = [i*s.W.DELTAT for i in lange(l.end_node.signal_log) if (l.end_node.signal_log[i] not in l.signal_group and len(l.end_node.signal)>1)] plt.plot(signal_log, [l.length for i in lange(signal_log)], "r.") plt.colorbar().set_label("density (veh/m)") plt.xlabel("time (s)") plt.ylabel("space (m)") if xlim == None: plt.xlim([0, s.W.TMAX]) else: plt.xlim(xlim) if ylim == None: plt.ylim([0, l.length]) else: plt.ylim(ylim) plt.tight_layout() if s.W.save_mode: plt.savefig(f"out{s.W.name}/tsd_k_{l.name}.png") if s.W.show_mode: plt.show() else: plt.close("all")
[docs] @catch_exceptions_and_warn() def cumulative_curves(s, links=None, figsize=(6,4)): """ Plots the cumulative curves and travel times for the provided links. Parameters ---------- links : list or object, optional A list of links or a single link for which the cumulative curves and travel times are to be plotted. If not provided, the cumulative curves and travel times for all the links in the network will be plotted. figsize : tuple of int, optional The size of the figure to be plotted. Default is (6, 4). Notes ----- This method plots the cumulative curves for vehicle arrivals and departures, as well as the instantaneous and actual travel times. The plots are saved to the directory `out<W.name>` with the filename format `cumulative_curves_<link.name>.png`. """ #対象がlistであればOKで,単一な場合にはlistに変換する.未指定であれば全部にする. if links == None: links = s.W.LINKS try: iter(links) if type(links) == str: flag_single = 1 except TypeError: links = [links] for link in links: link = s.W.get_link(link) fig, ax1 = plt.subplots(figsize=figsize) plt.title(link) ax2 = ax1.twinx() ax1.plot(range(0, s.W.TMAX, s.W.DELTAT), link.cum_arrival, "r-", label="arrival") ax1.plot(range(0, s.W.TMAX, s.W.DELTAT), link.cum_departure, "b-", label="departure") ax2.plot(range(0, s.W.TMAX, s.W.DELTAT), link.traveltime_instant, ".", c="gray", ms=0.5, label="instantaneous") ax2.plot(range(0, s.W.TMAX, s.W.DELTAT), link.traveltime_actual, "k.", ms=1, label="actual") ax1.set_ylim(ymin=0) ax2.set_ylim(ymin=0) ax1.set_xlabel("time(s)") ax1.set_ylabel("cumlative count (veh)") ax2.set_ylabel("travel time (s)") ax1.legend(loc="upper left") ax2.legend(loc="upper right") ax1.grid() plt.tight_layout() if s.W.save_mode: plt.savefig(f"out{s.W.name}/cumlative_curves_{link.name}.png") if s.W.show_mode: plt.show() else: plt.close("all")
[docs] @catch_exceptions_and_warn() def network(s, t=None, detailed=1, minwidth=0.5, maxwidth=12, left_handed=1, tmp_anim=0, figsize=(6,6), network_font_size=4, node_size=2): """ Visualizes the entire transportation network and its current traffic conditions. Parameters ---------- t : float, optional The current time for which the traffic conditions are visualized. detailed : int, optional Determines the level of detail in the visualization. If set to 1, the link internals (cell) are displayed in detail. If set to 0, the visualization is simplified to link-level. Default is 1. minwidth : float, optional The minimum width of the link visualization. Default is 0.5. maxwidth : float, optional The maximum width of the link per lane visualization. Default is 12. left_handed : int, optional If set to 1, the left-handed traffic system (e.g., Japan, UK) is used. If set to 0, the right-handed one is used. Default is 1. tmp_anim : int, optional If set to 1, the visualization will be saved as a temporary animation frame. Default is 0. figsize : tuple of int, optional The size of the figure to be plotted. Default is (6, 6). network_font_size : int, optional The font size for the network labels. Default is 4. node_size : int, optional The size of the nodes in the visualization. Default is 2. Notes ----- This method visualizes the entire transportation network and its current traffic conditions. The visualization provides information on vehicle density, velocity, link names, node locations, and more. The plots are saved to the directory `out<W.name>` with filenames depending on the `detailed` and `t` parameters. """ s.compute_edie_state() plt.figure(figsize=figsize) plt.subplot(111, aspect="equal") plt.title(f"t = {t :>8} (s)") for n in s.W.NODES: plt.plot(n.x, n.y, "ko", ms=node_size, zorder=10) if network_font_size > 0: plt.text(n.x, n.y, n.name, c="g", horizontalalignment="center", verticalalignment="top", zorder=20, fontsize=network_font_size) for l in s.W.LINKS: x1, y1 = l.start_node.x, l.start_node.y x2, y2 = l.end_node.x, l.end_node.y vx, vy = (y1-y2)*0.05, (x2-x1)*0.05 if not left_handed: vx, vy = -vx, -vy if detailed: #詳細モード xsize = l.k_mat.shape[1]-1 c = ["k" for i in range(xsize)] lw = [1 for i in range(xsize)] for i in range(xsize): try: k = l.k_mat[int(t/l.edie_dt), i+1] v = l.v_mat[int(t/l.edie_dt), i+1] except: warnings.warn(f"invalid time {t} is specified for network visualization", UserWarning) return -1 lw[i] = k*l.delta*(maxwidth*l.number_of_lanes-minwidth)+minwidth c[i] = plt.colormaps["viridis"](v/l.u) xmid = [((xsize-i)*x1+(i+1)*x2)/(xsize+1)+vx for i in range(xsize)] ymid = [((xsize-i)*y1+(i+1)*y2)/(xsize+1)+vy for i in range(xsize)] plt.plot([x1]+xmid+[x2], [y1]+ymid+[y2], "k--", lw=0.25, zorder=5) for i in range(xsize-1): plt.plot([xmid[i], xmid[i+1]], [ymid[i], ymid[i+1]], c=c[i], lw=lw[i], zorder=6) if network_font_size > 0: plt.text(xmid[int(len(xmid)/2)], ymid[int(len(xmid)/2)], l.name, c="b", zorder=20, fontsize=network_font_size) else: #簡略モード k = (l.cum_arrival[int(t/s.W.DELTAT)]-l.cum_departure[int(t/s.W.DELTAT)])/l.length v = l.length/l.traveltime_instant[int(t/s.W.DELTAT)] width = k*l.delta*(maxwidth*l.number_of_lanes-minwidth)+minwidth c = plt.colormaps["viridis"](v/l.u) xmid1, ymid1 = (2*x1+x2)/3+vx, (2*y1+y2)/3+vy xmid2, ymid2 = (x1+2*x2)/3+vx, (y1+2*y2)/3+vy plt.plot([x1, xmid1, xmid2, x2], [y1, ymid1, ymid2, y2], "k--", lw=0.25, zorder=5) plt.plot([x1, xmid1, xmid2, x2], [y1, ymid1, ymid2, y2], c=c, lw=width, zorder=6, solid_capstyle="butt") if network_font_size > 0: plt.text(xmid1, ymid1, l.name, c="b", zorder=20, fontsize=network_font_size) maxx = max([n.x for n in s.W.NODES]) minx = min([n.x for n in s.W.NODES]) maxy = max([n.y for n in s.W.NODES]) miny = min([n.y for n in s.W.NODES]) buffx, buffy = (maxx-minx)/10, (maxy-miny)/10 if buffx == 0: buffx = buffy if buffy == 0: buffy = buffx plt.xlim([minx-buffx, maxx+buffx]) plt.ylim([miny-buffy, maxy+buffy]) plt.tight_layout() if tmp_anim: plt.savefig(f"out{s.W.name}/tmp_anim_{t}.png") plt.close("all") else: if s.W.save_mode: plt.savefig(f"out{s.W.name}/network{detailed}_{t}.png") if s.W.show_mode: plt.show() else: plt.close("all")
[docs] @catch_exceptions_and_warn() def network_pillow(s, t=None, detailed=1, minwidth=0.5, maxwidth=12, left_handed=1, tmp_anim=0, figsize=6, network_font_size=20, node_size=2, image_return=0): """ Visualizes the entire transportation network and its current traffic conditions. Faster implementation using Pillow. Parameters ---------- t : float, optional The current time for which the traffic conditions are visualized. detailed : int, optional Determines the level of detail in the visualization. If set to 1, the link internals (cell) are displayed in detail. If set to 0, the visualization is simplified to link-level. Default is 1. minwidth : float, optional The minimum width of the link visualization. Default is 0.5. maxwidth : float, optional The maximum width of the link visualization. Default is 12. left_handed : int, optional If set to 1, the left-handed traffic system (e.g., Japan, UK) is used. If set to 0, the right-handed one is used. Default is 1. tmp_anim : int, optional If set to 1, the visualization will be saved as a temporary animation frame. Default is 0. figsize : tuple of int, optional The size of the figure to be plotted. Default is (6, 6). network_font_size : int, optional The font size for the network labels. Default is 4. node_size : int, optional The size of the nodes in the visualization. Default is 2. Notes ----- This method visualizes the entire transportation network and its current traffic conditions. The visualization provides information on vehicle density, velocity, link names, node locations, and more. The plots are saved to the directory `out<W.name>` with filenames depending on the `detailed` and `t` parameters. """ maxx = max([n.x for n in s.W.NODES]) minx = min([n.x for n in s.W.NODES]) maxy = max([n.y for n in s.W.NODES]) miny = min([n.y for n in s.W.NODES]) scale = 2 try: coef = figsize*100*scale/(maxx-minx) except: coef = figsize[0]*100*scale/(maxx-minx) maxx *= coef minx *= coef maxy *= coef miny *= coef minwidth *= scale maxwidth *= scale buffer = (maxx-minx)/10 maxx += buffer minx -= buffer maxy += buffer miny -= buffer img = Image.new("RGBA", (int(maxx-minx), int(maxy-miny)), (255, 255, 255, 255)) draw = ImageDraw.Draw(img) font_data = load_font_data() font_file_like = io.BytesIO(font_data) if network_font_size > 0: font = ImageFont.truetype(font_file_like, int(network_font_size)) def flip(y): return img.size[1]-y for l in s.W.LINKS: x1, y1 = l.start_node.x*coef-minx, l.start_node.y*coef-miny x2, y2 = l.end_node.x*coef-minx, l.end_node.y*coef-miny vx, vy = (y1-y2)*0.05, (x2-x1)*0.05 if not left_handed: vx, vy = -vx, -vy k = (l.cum_arrival[int(t/s.W.DELTAT)]-l.cum_departure[int(t/s.W.DELTAT)])/l.length v = l.length/l.traveltime_instant[int(t/s.W.DELTAT)] width = k*l.delta*(maxwidth-minwidth)+minwidth c = plt.colormaps["viridis"](v/l.u) xmid1, ymid1 = (2*x1+x2)/3+vx, (2*y1+y2)/3+vy xmid2, ymid2 = (x1+2*x2)/3+vx, (y1+2*y2)/3+vy draw.line([(x1, flip(y1)), (xmid1, flip(ymid1)), (xmid2, flip(ymid2)), (x2, flip(y2))], fill=(int(c[0]*255), int(c[1]*255), int(c[2]*255)), width=int(width), joint="curve") if network_font_size > 0: draw.text((xmid1, flip(ymid1)), l.name, font=font, fill="blue", anchor="mm") for n in s.W.NODES: if network_font_size > 0: draw.text(((n.x)*coef-minx, flip((n.y)*coef-miny)), n.name, font=font, fill="green", anchor="mm") draw.text(((n.x)*coef-minx, flip((n.y)*coef-miny)), n.name, font=font, fill="green", anchor="mm") font_data = load_font_data() font_file_like = io.BytesIO(font_data) font = ImageFont.truetype(font_file_like, int(30)) draw.text((img.size[0]/2,20), f"t = {t :>8} (s)", font=font, fill="black", anchor="mm") img = img.resize((int((maxx-minx)/scale), int((maxy-miny)/scale)), resample=Resampling.LANCZOS) if image_return: return img elif tmp_anim: img.save(f"out{s.W.name}/tmp_anim_{t}.png") else: if s.W.save_mode: img.save(f"out{s.W.name}/network{detailed}_{t}.png")
[docs] @catch_exceptions_and_warn() def show_simulation_progress(s): """ Print simulation progress. """ if s.W.print_mode: vehs = [l.density*l.length for l in s.W.LINKS] sum_vehs = sum(vehs) vs = [l.density*l.length*l.speed for l in s.W.LINKS] if sum_vehs > 0: avev = sum(vs)/sum_vehs else: avev = 0 print(f"{s.W.TIME:>8.0f} s| {sum_vehs:>8.0f} vehs| {avev:>4.1f} m/s| {time.time()-s.W.sim_start_time:8.2f} s", flush=True)
[docs] @catch_exceptions_and_warn() def network_anim(s, animation_speed_inverse=10, detailed=0, minwidth=0.5, maxwidth=12, left_handed=1, figsize=(6,6), node_size=2, network_font_size=20, timestep_skip=24, file_name=None): """ Generates an animation of the entire transportation network and its traffic states over time. Parameters ---------- animation_speed_inverse : int, optional The inverse of the animation speed. A higher value will result in a slower animation. Default is 10. detailed : int, optional Determines the level of detail in the animation. If set to 1, the link internals (cell) are displayed in detail. Under some conditions, the detailed mode will produce inappropriate visualization. If set to 0, the visualization is simplified to link-level. Default is 0. minwidth : float, optional The minimum width of the link visualization in the animation. Default is 0.5. maxwidth : float, optional The maximum width of the link visualization in the animation. Default is 12. left_handed : int, optional If set to 1, the left-handed traffic system (e.g., Japan, UK) is used. If set to 0, the right-handed one is used. Default is 1. figsize : tuple of int, optional The size of the figures in the animation. Default is (6, 6). node_size : int, optional The size of the nodes in the animation. Default is 2. network_font_size : int, optional The font size for the network labels in the animation. Default is 20. timestep_skip : int, optional How many timesteps are skipped per frame. Large value means coarse and lightweight animation. Default is 8. file_name : str, optional The name of the file to which the animation is saved. It overrides the defauld name. Default is None. Notes ----- This method generates an animation visualizing the entire transportation network and its traffic conditions over time. The animation provides information on vehicle density, velocity, link names, node locations, and more. The generated animation is saved to the directory `out<W.name>` with a filename based on the `detailed` parameter. Temporary images used to create the animation are removed after the animation is generated. """ s.W.print(" generating animation...") pics = [] for t in tqdm(range(0, s.W.TMAX, s.W.DELTAT*timestep_skip), disable=(s.W.print_mode==0)): if int(t/s.W.LINKS[0].edie_dt) < s.W.LINKS[0].k_mat.shape[0]: if detailed: #todo_later: 今後はこちらもpillowにする s.network(int(t), detailed=detailed, minwidth=minwidth, maxwidth=maxwidth, left_handed=left_handed, tmp_anim=1, figsize=figsize, node_size=node_size, network_font_size=network_font_size) else: s.network_pillow(int(t), detailed=detailed, minwidth=minwidth, maxwidth=maxwidth, left_handed=left_handed, tmp_anim=1, figsize=figsize, node_size=node_size, network_font_size=network_font_size) pics.append(Image.open(f"out{s.W.name}/tmp_anim_{t}.png")) fname = f"out{s.W.name}/anim_network{detailed}.gif" if file_name != None: fname = file_name pics[0].save(fname, save_all=True, append_images=pics[1:], optimize=False, duration=animation_speed_inverse*timestep_skip, loop=0) for f in glob.glob(f"out{s.W.name}/tmp_anim_*.png"): os.remove(f)
[docs] @catch_exceptions_and_warn() def network_fancy(s, animation_speed_inverse=10, figsize=6, sample_ratio=0.3, interval=5, network_font_size=0, trace_length=3, speed_coef=2, file_name=None, antialiasing=True): """ Generates a visually appealing animation of vehicles' trajectories across the entire transportation network over time. Parameters ---------- animation_speed_inverse : int, optional The inverse of the animation speed. A higher value will result in a slower animation. Default is 10. figsize : int or tuple of int, optional The size of the figures in the animation. Default is 6. sample_ratio : float, optional The fraction of vehicles to be visualized. Default is 0.3. interval : int, optional The interval at which vehicle positions are sampled. Default is 5. network_font_size : int, optional The font size for the network labels in the animation. Default is 0. trace_length : int, optional The length of the vehicles' trajectory trails in the animation. Default is 3. speed_coef : int, optional A coefficient that adjusts the animation speed. Default is 2. file_name : str, optional The name of the file to which the animation is saved. It overrides the defauld name. Default is None. antialiasing : bool, optional If set to True, antialiasing is applied to the animation. Default is True. Notes ----- This method generates a visually appealing animation that visualizes vehicles' trajectories across the transportation network over time. The animation provides information on vehicle positions, speeds, link names, node locations, and more, with Bezier curves used for smooth transitions. The generated animation is saved to the directory `out<W.name>` with a filename `anim_network_fancy.gif`. Temporary images used to create the animation are removed after the animation is generated. """ if s.W.vehicle_logging_timestep_interval != 1: warnings.warn("vehicle_logging_timestep_interval is not 1. The animation is not exactly accurate.", LoggingWarning) s.W.print(" generating animation...") # ベジエ補間 from scipy.interpolate import make_interp_spline #{t: ["xs":[], "ys":[], "v": v, "c":c]} draw_dict = ddict(lambda: []) maxx = max([n.x for n in s.W.NODES]) minx = min([n.x for n in s.W.NODES]) dcoef = (maxx-minx)/20 for veh in s.W.VEHICLES.values(): if s.W.rng.random() > sample_ratio: continue ts = [] xs = [] ys = [] vs = [] dx = (s.W.rng.random()-0.5)*dcoef dy = (s.W.rng.random()-0.5)*dcoef for i in range(0, len(veh.log_t), interval): if veh.log_state[i] == "run": link = veh.log_link[i] x0 = link.start_node.x+dx y0 = link.start_node.y+dy x1 = link.end_node.x+dx y1 = link.end_node.y+dy alpha = veh.log_x[i]/link.length ts.append(veh.log_t[i]) xs.append(x0*(1-alpha)+x1*alpha) ys.append(y0*(1-alpha)+y1*alpha) c = veh.color for i in range(0, len(veh.log_t)): if veh.log_state[i] == "run": vs.append(veh.log_v[i]/veh.log_link[i].u) if len(ts) <= interval: continue # 点列 points = np.array([xs, ys]).T # x, y 座標を取得 x = points[:, 0] y = points[:, 1] # ベジエ曲線による補間 t = np.linspace(0, 1, len(points)) interp_size = len(ts)*interval t_smooth = np.linspace(0, 1, interp_size) bezier_spline = make_interp_spline(t, points, k=3) smooth_points = bezier_spline(t_smooth) for i in lange(t_smooth): ii = max(0, i-trace_length) if i < len(vs): v = vs[i] else: v = vs[-1] draw_dict[int(ts[0]+i*s.W.DELTAT)].append({ "xs": smooth_points[ii:i+1, 0], "ys": smooth_points[ii:i+1, 1], "c": veh.color, "v": v }) # 可視化 maxx = max([n.x for n in s.W.NODES]) minx = min([n.x for n in s.W.NODES]) maxy = max([n.y for n in s.W.NODES]) miny = min([n.y for n in s.W.NODES]) if antialiasing: scale = 2 else: scale = 1 try: coef = figsize*100*scale/(maxx-minx) except: coef = figsize[0]*100*scale/(maxx-minx) maxx *= coef minx *= coef maxy *= coef miny *= coef buffer = (maxx-minx)/10 maxx += buffer minx -= buffer maxy += buffer miny -= buffer pics = [] for t in tqdm(range(int(s.W.TMAX*0), int(s.W.TMAX*1), s.W.DELTAT*speed_coef)): img = Image.new("RGBA", (int(maxx-minx), int(maxy-miny)), (255, 255, 255, 255)) draw = ImageDraw.Draw(img) if network_font_size > 0: font = ImageFont.truetype(s.font_file_like, int(network_font_size)) def flip(y): return img.size[1]-y for l in s.W.LINKS: x1, y1 = l.start_node.x*coef-minx, l.start_node.y*coef-miny x2, y2 = l.end_node.x*coef-minx, l.end_node.y*coef-miny n_lane = l.number_of_lanes draw.line([(x1, flip(y1)), (x2, flip(y2))], fill=(200,200,200), width=int(n_lane*scale), joint="curve") if network_font_size > 0: draw.text(((x1+x2)/2, flip((y1+y2)/2)), l.name, font=font, fill="blue", anchor="mm") traces = draw_dict[t] for trace in traces: xs = trace["xs"]*coef-minx ys = trace["ys"]*coef-miny size = 1.5*(1-trace["v"])*scale coords = [(l[0], flip(l[1])) for l in list(np.vstack([xs, ys]).T)] try: draw.line(coords, fill=(int(trace["c"][0]*255), int(trace["c"][1]*255), int(trace["c"][2]*255)), width=scale, joint="curve") draw.ellipse((xs[-1]-size, flip(ys[-1])-size, xs[-1]+size, flip(ys[-1])+size), fill=(int(trace["c"][0]*255), int(trace["c"][1]*255), int(trace["c"][2]*255))) except: pass #draw.line([(x1, flip(y1)), (xmid1, flip(ymid1)), (xmid2, flip(ymid2)), (x2, flip(y2))] font_file_like = io.BytesIO(s.font_data) #for unknown reason, s.font_file_like cannot be resued font = ImageFont.truetype(font_file_like, int(30)) draw.text((img.size[0]/2,20), f"t = {t :>8} (s)", font=font, fill="black", anchor="mm") if antialiasing: img = img.resize((int((maxx-minx)/scale), int((maxy-miny)/scale)), resample=Resampling.LANCZOS) byte_stream = io.BytesIO() img.save(byte_stream, format='PNG') byte_stream.seek(0) pics.append(Image.open(byte_stream)) #img.save(f"out{s.W.name}/tmp_anim_{t}.png") #pics.append(Image.open(f"out{s.W.name}/tmp_anim_{t}.png")) fname = f"out{s.W.name}/anim_network_fancy.gif" if file_name != None: fname = file_name pics[0].save(fname, save_all=True, append_images=pics[1:], optimize=False, duration=animation_speed_inverse*speed_coef, loop=0)
# for f in glob.glob(f"out{s.W.name}/tmp_anim_*.png"): # os.remove(f)
[docs] def compute_mfd(s, links=None): """ Compute network average flow and density for MFD. """ s.compute_edie_state() if links == None: links = s.W.LINKS links = [s.W.get_link(link) for link in links] links = frozenset(links) for i in range(len(s.W.Q_AREA[links])): tn = sum([l.tn_mat[i,:].sum() for l in s.W.LINKS if l in links]) dn = sum([l.dn_mat[i,:].sum() for l in s.W.LINKS if l in links]) an = sum([l.length*s.W.EULAR_DT for l in s.W.LINKS if l in links]) s.W.K_AREA[links][i] = tn/an s.W.Q_AREA[links][i] = dn/an
[docs] @catch_exceptions_and_warn() def macroscopic_fundamental_diagram(s, kappa=0.2, qmax=1, figtitle="", links=None, fname="", figsize=(4,4)): """ Plots the Macroscopic Fundamental Diagram (MFD) for the provided links. Parameters ---------- kappa : float, optional The maximum network average density for the x-axis of the MFD plot. Default is 0.2. qmax : float, optional The maximum network average flow for the y-axis of the MFD plot. Default is 1. links : list or object, optional A list of links or a single link for which the MFD is to be plotted. If not provided, the MFD for all the links in the network will be plotted. fname : str File name for saving (postfix). Default is "". figsize : tuple of int, optional The size of the figure to be plotted. Default is (4, 4). Notes ----- This method plots the Macroscopic Fundamental Diagram (MFD) for the provided links. The MFD provides a relationship between the network average density and the network average flow. The plot is saved to the directory `out<W.name>` with the filename `mfd<fname>.png`. """ if links == None: links = s.W.LINKS links = [s.W.get_link(link) for link in links] links = frozenset(links) s.compute_mfd(links) plt.figure(figsize=figsize) plt.title(f"{figtitle} (# of links: {len(links)})") plt.plot(s.W.K_AREA[links], s.W.Q_AREA[links], "ko-") plt.xlabel("network average density (veh/m)") plt.ylabel("network average flow (veh/s)") plt.xlim([0, kappa]) plt.ylim([0, qmax]) plt.grid() plt.tight_layout() if s.W.save_mode: plt.savefig(f"out{s.W.name}/mfd{fname}.png") if s.W.show_mode: plt.show() else: plt.close("all")
[docs] @catch_exceptions_and_warn() def plot_vehicle_log(s, vehname): """ Plots the driving link and speed for a single vehicle. Parameters ---------- vehname : str The name of the vehicle for which the driving link and speed are to be plotted. Notes ----- This method visualizes the speed profile and the links traversed by a specific vehicle over time. The speed is plotted on the primary y-axis, and the links are plotted on the secondary y-axis. The plot is saved to the directory `out<W.name>` with the filename `vehicle_<vehname>.png`. """ if s.W.vehicle_logging_timestep_interval != 1: warnings.warn("vehicle_logging_timestep_interval is not 1. The plot is not exactly accurate.", LoggingWarning) veh = s.W.VEHICLES[vehname] fig, ax1 = plt.subplots() plt.title(f"vehicle: {veh.name}") ax1.fill_between(veh.log_t, 0, veh.log_v, color="c", zorder=10) ax1.set_ylabel('speed (m/s)', color='c') plt.ylim([0, None]) plt.xlabel("time (s)") ax2 = ax1.twinx() vehlinks = [str(l.name) if l != -1 else "not in network" for l in veh.log_link] ax2.plot([veh.log_t[i] for i in lange(veh.log_t) if veh.log_state[i] != "home"], [vehlinks[i] for i in lange(vehlinks) if veh.log_state[i] != "home"], 'k-', zorder=20) ax2.grid() ax2.set_ylabel('link', color='k') plt.ylim([0, None]) plt.tight_layout() if s.W.save_mode: plt.savefig(f"out{s.W.name}/vehicle_{vehname}.png") if s.W.show_mode: plt.show() else: plt.close("all")
[docs] @catch_exceptions_and_warn() def plot_vehicles_log(s, vehnamelist): """ Plots the driving link and speed for a single vehicle. Parameters ---------- vehname : str The name of the vehicle for which the driving link and speed are to be plotted. Notes ----- This method visualizes the speed profile and the links traversed by a specific vehicle over time. The speed is plotted on the primary y-axis, and the links are plotted on the secondary y-axis. The plot is saved to the directory `out<W.name>` with the filename `vehicle_<vehname>.png`. """ if s.W.vehicle_logging_timestep_interval != 1: warnings.warn("vehicle_logging_timestep_interval is not 1. The plot is not exactly accurate.", LoggingWarning) vehs = [s.W.VEHICLES[vehname] for vehname in vehnamelist] plt.figure() for veh in vehs: vehlinks = [str(l.name) if l != -1 else "not in network" for l in veh.log_link] plt.plot([veh.log_t[i] for i in lange(veh.log_t) if veh.log_state[i] != "home"], [vehlinks[i] for i in lange(vehlinks) if veh.log_state[i] != "home"], c=veh.color, label=veh.name) plt.grid() plt.ylabel('link') plt.legend() plt.ylim([0, None]) plt.tight_layout() if s.W.save_mode: plt.savefig(f"out{s.W.name}/vehicles_{vehnamelist}.png") if s.W.show_mode: plt.show() else: plt.close("all")
[docs] def vehicles_to_pandas(s): """ Compute the detailed vehicle travel logs and return as a pandas DataFrame. Returns ------- pd.DataFrame A DataFrame containing the travel logs of vehicles, with the columns: - 'name': the name of the vehicle (platoon). - 'dn': the platoon size. - 'orig': the origin node of the vehicle's trip. - 'dest': the destination node of the vehicle's trip. - 't': the timestep. - 'link': the link the vehicle is on (or relevant status). - 'x': the position of the vehicle on the link. - 's': the spacing of the vehicle. - 'v': the speed of the vehicle. """ if s.W.vehicle_logging_timestep_interval != 1: warnings.warn("vehicle_logging_timestep_interval is not 1. The output data is not exactly accurate.", LoggingWarning) if s.flag_pandas_convert == 0: out = [["name", "dn", "orig", "dest", "t", "link", "x", "s", "v"]] for veh in s.W.VEHICLES.values(): for i in range(len(veh.log_t)): if veh.log_state[i] in ("wait", "run", "end", "abort"): if veh.log_link[i] != -1: linkname = veh.log_link[i].name else: if veh.log_state[i] == "wait": linkname = "waiting_at_origin_node" elif veh.log_state[i] == "abort": linkname = "trip_aborted" else: linkname = "trip_end" veh_dest_name = None if veh.dest != None: veh_dest_name = veh.dest.name out.append([veh.name, s.W.DELTAN, veh.orig.name, veh_dest_name, veh.log_t[i], linkname, veh.log_x[i], veh.log_s[i], veh.log_v[i]]) s.df_vehicles = pd.DataFrame(out[1:], columns=out[0]) s.flag_pandas_convert = 1 return s.df_vehicles
[docs] def log_vehicles_to_pandas(s): """ same to `vehicles_to_pandas`, just for backward compatibility """ return s.vehicles_to_pandas()
[docs] def vehicle_trip_to_pandas(s): """ Compute the vehicle trip summary and return as a pandas DataFrame. Returns ------- pd.DataFrame A DataFrame containing the trip summary of the vehicle trip logs, with the columns: - 'name': the name of the vehicle (platoon). - 'orig': the origin node of the vehicle's trip. - 'dest': the destination node of the vehicle's trip. - 'departure_time': the departure time of the vehicle. - 'final_state': the final state of the vehicle. - 'travel_time': the travel time of the vehicle. - 'average_speed': the average speed of the vehicle. - 'distance_traveled': the distance traveled by the vehicle. """ out = [["name", "orig", "dest", "departure_time", "final_state", "travel_time", "average_speed", "distance_traveled"]] for veh in s.W.VEHICLES.values(): veh_dest_name = veh.dest.name if veh.dest != None else None veh_state = veh.log_state[-1] veh_ave_speed = np.average([v for v in veh.log_v if v != -1]) veh_dist_traveled = veh.distance_traveled out.append([veh.name, veh.orig.name, veh_dest_name, veh.departure_time*s.W.DELTAT, veh_state, veh.travel_time, veh_ave_speed, veh_dist_traveled]) s.df_vehicle_trip = pd.DataFrame(out[1:], columns=out[0]) return s.df_vehicle_trip
[docs] def gps_like_log_to_pandas(s): """ Generate GPS-like log (x and y in the coordinate system used for Node) of vehicles and return as a pandas DataFrame. Returns ------- pd.DataFrame """ out = [["name", "t", "x", "y", "v"]] for veh in s.W.VEHICLES.values(): for i,t in enumerate(veh.log_t): x, y = veh.get_xy_coords(t) if (x, y) == (-1, -1): continue v = veh.log_v[i] out.append([veh.name, t, x, y, v]) s.df_gps_like_log = pd.DataFrame(out[1:], columns=out[0]) return s.df_gps_like_log
[docs] def basic_to_pandas(s): """ Comutes the basic stats and return as a pandas DataFrame. Returns ------- pd.DataFrame """ out = [["total_trips", "completed_trips", "total_travel_time", "average_travel_time", "total_delay", "average_delay"], [s.trip_all, s.trip_completed, s.total_travel_time, s.average_travel_time, s.total_delay, s.average_delay]] s.df_basic = pd.DataFrame(out[1:], columns=out[0]) return s.df_basic
[docs] def od_to_pandas(s): """ Compute the OD-specific stats and return as a pandas DataFrame. Returns ------- pd.DataFrame """ s.od_analysis() out = [["orig", "dest", "total_trips", "completed_trips", "free_travel_time", "average_travel_time", "stddiv_travel_time", "shortest_distance", "average_distance_traveled_per_veh", "stddiv_distance_traveled_per_veh"]] for o,d in s.od_trips.keys(): out.append([o.name, d.name, s.od_trips[o,d], s.od_trips_comp[o,d], s.od_tt_free[o,d], s.od_tt_ave[o,d], s.od_tt_std[o,d], s.od_dist_min[o,d], s.od_dist_ave[o,d], s.od_dist_std[o,d]]) s.df_od = pd.DataFrame(out[1:], columns=out[0]) return s.df_od
[docs] def areas2areas_to_pandas(s, areas, area_names=None): """ Compute the area-wise OD-specific stats and return a pandas DataFrame. It analyzes travel stats between areas (set of nodes). Parameters ---------- areas : list The list of areas. Each area is defined as a list of nodes. The items of area can be Node objects or names of Nodes. area_names : list, optional The list of names of areas. Returns ------- pd.DataFrame """ df = s.od_to_pandas() o_name_rec = [] d_name_rec = [] total_trips_rec = [] completed_trips_rec = [] average_travel_time_rec = [] stddiv_travel_time_rec = [] average_distance_traveled_rec = [] stddiv_distance_traveled_rec = [] average_free_travel_time_rec = [] average_shortest_distance_rec = [] areas = [[s.W.get_node(n).name for n in area] for area in areas] if area_names == None: area_names = [f"area {i} including {areas[i][0]}" for i in range(len(areas))] for i, origs in enumerate(areas): for j, dests in enumerate(areas): o_name = area_names[i] d_name = area_names[j] # print(o_name, d_name) # group by area: average travel time from origs to dests rows = df["orig"].isin(origs) & df["dest"].isin(dests) total_tripss = np.array(df["total_trips"][rows]) average_travel_times = np.array(df["average_travel_time"][rows]) completed_tripss = np.array(df["completed_trips"][rows]) var_travel_times = np.array(df["stddiv_travel_time"][rows])**2 distance_traveleds = np.array(df["average_distance_traveled_per_veh"][rows]) var_distance_traveleds = np.array(df["stddiv_distance_traveled_per_veh"][rows])**2 free_travel_time_times = np.array(df["free_travel_time"][rows]) shortest_distances = np.array(df["shortest_distance"][rows]) # print(f"{total_tripss = }") # print(f"{average_travel_times = }") # print(f"{completed_tripss = }") # print(f"{var_travel_times = }") # print(f"{distance_traveleds = }") # print(f"{var_distance_traveleds = }") total_trips = total_tripss.sum() completed_trips = completed_tripss.sum() if total_trips: average_travel_time = (completed_tripss*average_travel_times).sum()/completed_trips var_travel_time = (completed_tripss*var_travel_times).sum()/completed_trips #wrong! there is a correct formula. TODO: implement stddiv_travel_time = np.sqrt(var_travel_time) average_shortest_distance = (total_tripss*shortest_distances).sum()/total_trips else: continue # average_travel_time = np.nan # var_travel_time = np.nan # stddiv_travel_time = np.nan # average_shortest_distance = np.nan if completed_trips: average_distance_traveled = (total_tripss*distance_traveleds).sum()/total_trips var_distance_traveled = (total_tripss*distance_traveleds).sum()/total_trips #wrong! stddiv_distance_traveled = np.sqrt(var_distance_traveled) average_free_travel_time = (completed_tripss*free_travel_time_times).sum()/completed_trips else: average_distance_traveled = np.nan var_distance_traveled = np.nan stddiv_distance_traveled = np.nan average_free_travel_time = np.nan # print(f"{total_trips = }") # print(f"{completed_trips = }") # print(f"{average_travel_time = }") # print(f"{stddiv_travel_time = }") # print(f"{average_distance_traveled = }") # print(f"{stddiv_distance_traveled = }") o_name_rec.append(o_name) d_name_rec.append(d_name) total_trips_rec.append(total_trips) completed_trips_rec.append(completed_trips) average_travel_time_rec.append(average_travel_time) stddiv_travel_time_rec.append(stddiv_travel_time) average_distance_traveled_rec.append(average_distance_traveled) stddiv_distance_traveled_rec.append(stddiv_distance_traveled) average_free_travel_time_rec.append(average_free_travel_time) average_shortest_distance_rec.append(average_shortest_distance) out = [["origin_area", "destination_area", "total_trips", "completed_trips", "average_travel_time", "average_free_travel_time", "average_distance_traveled", "average_shortest_distance"]] out += [[o_name_rec[i], d_name_rec[i], total_trips_rec[i], completed_trips_rec[i], average_travel_time_rec[i], average_free_travel_time_rec[i], average_distance_traveled_rec[i], average_shortest_distance_rec[i]] for i in range(len(o_name_rec))] s.df_areas2areas = pd.DataFrame(out[1:], columns=out[0]) return s.df_areas2areas
[docs] def area_to_pandas(s, areas, area_names=None, border_include=True): """ Compute traffic stats in area and return as pandas.DataFrame. Parameters ---------- areas : list The list of areas. Each area is defined as a list of nodes. The items of area can be Node objects or names of Nodes. area_names : list, optional The list of names of areas. border_include : bool, optional If set to True, the links on the border of the area are included in the analysis. Default is True. Returns ------- pd.DataFrame """ # Precompute DataFrames df_links = s.W.analyzer.link_to_pandas() df_veh_link = s.W.analyzer.vehicles_to_pandas().drop_duplicates(subset=['name', 'link']) # Prepare areas as sets for fast lookup areas_set = [{s.W.get_node(n).name for n in area} for area in areas] # Initialize result lists n_links_rec = [] traffic_volume_rec = [] vehicles_remain_rec = [] total_travel_time_rec = [] average_delay_rec = [] average_speed_rec = [] vehicle_density_rec = [] # Vectorized approach to process all areas at once for area_set in areas_set: if border_include: rows = df_links["start_node"].isin(area_set) | df_links["end_node"].isin(area_set) else: rows = df_links["start_node"].isin(area_set) & df_links["end_node"].isin(area_set) links = df_links.loc[rows, "link"].unique() n_links = links.size traffic_volume = df_veh_link[df_veh_link["link"].isin(links)]["name"].nunique() * s.W.DELTAN vehicles_remain = df_links.loc[rows, "vehicles_remain"].sum() if traffic_volume > 0: traffic_volume_rows = ( df_links.loc[rows, "traffic_volume"] - df_links.loc[rows, "vehicles_remain"]).values total_travel_time = np.sum(df_links.loc[rows, "average_travel_time"].values * traffic_volume_rows) total_free_time = np.sum(df_links.loc[rows, "free_travel_time"].values * traffic_volume_rows) average_delay = max(total_travel_time / total_free_time - 1, 0) # Average speed calculation: total distance / total time total_distance = np.sum(df_links.loc[rows, "length"].values * traffic_volume_rows) average_speed = total_distance / total_travel_time if total_travel_time > 0 else np.nan # Vehicle density calculation: total number of vehicles / total link length total_link_length = df_links.loc[rows, "length"].sum() vehicle_density = traffic_volume / total_link_length if total_link_length > 0 else np.nan else: total_travel_time = 0 total_free_time = 0 average_delay = np.nan average_speed = np.nan vehicle_density = np.nan # Append the results to lists n_links_rec.append(n_links) traffic_volume_rec.append(traffic_volume) vehicles_remain_rec.append(vehicles_remain) total_travel_time_rec.append(total_travel_time) average_delay_rec.append(average_delay) average_speed_rec.append(average_speed) vehicle_density_rec.append(vehicle_density) # Create DataFrame from the results df_result = pd.DataFrame({ "area": area_names, "n_links": n_links_rec, "traffic_volume": traffic_volume_rec, "vehicles_remain": vehicles_remain_rec, "total_travel_time": total_travel_time_rec, "average_delay": average_delay_rec, "average_speed": average_speed_rec, "vehicle_density": vehicle_density_rec, }) s.df_area = df_result return s.df_area
[docs] def vehicle_groups_to_pandas(s, groups, group_names=None): """ Computes the stats of vehicle group and return as a pandas DataFrame. Parameters ---------- groups : list The list of vehicle groups. Each group is defined as a list of vehicle object. group_names : list, optional The list of names of vehicle groups. Returns ------- pd.DataFrame """ df_od = s.W.analyzer.od_to_pandas() if group_names == None: group_names = [f"group {i} including {groups[0].name}" for i in range(len(groups))] total_trip_rec = [] completed_trip_rec = [] average_travel_time_rec = [] average_delay_rec = [] std_delay_rec = [] average_traveled_distance_rec = [] average_detour_rec = [] std_detour_rec = [] averae_speed_rec = [] std_speed_rec = [] for i, group in enumerate(groups): total_trips = 0 completed_trips = 0 travel_times = [] delays = [] traveled_distances = [] detours = [] speeds = [] for veh in group: total_trips += 1 if veh.state == "end": completed_trips += 1 travel_times.append(veh.travel_time) traveled_distances.append(veh.distance_traveled) free_travel_time = df_od["free_travel_time"][(df_od["orig"]==veh.orig.name) & (df_od["dest"]==veh.dest.name)].values[0] shortest_distance = df_od["shortest_distance"][(df_od["orig"]==veh.orig.name) & (df_od["dest"]==veh.dest.name)].values[0] delays.append(veh.travel_time/free_travel_time) detours.append(veh.distance_traveled/shortest_distance) speeds.append(veh.distance_traveled/veh.travel_time) #print(f"{group_names[i]=}, {np.average(travel_times)=}, {np.average(traveled_distances)=}, {np.average(delays)=}, {np.average(detours)=}, {np.std(delays)=}, {np.std(detours)=}, {np.average(speeds)}, {np.std(speeds)}") total_trip_rec.append(total_trips) completed_trip_rec.append(completed_trips) if completed_trips > 0: average_travel_time_rec.append(np.average(travel_times)) average_delay_rec.append(np.average(delays)) std_delay_rec.append(np.std(delays)) average_traveled_distance_rec.append(np.average(traveled_distances)) average_detour_rec.append(np.average(detours)) std_detour_rec.append(np.std(detours)) averae_speed_rec.append(np.average(speeds)) std_speed_rec.append(np.std(speeds)) else: average_travel_time_rec.append(np.nan) average_delay_rec.append(np.nan) std_delay_rec.append(np.nan) average_traveled_distance_rec.append(np.nan) average_detour_rec.append(np.nan) std_detour_rec.append(np.nan) averae_speed_rec.append(np.nan) std_speed_rec.append(np.nan) df = pd.DataFrame({ "group": group_names, "total_trips": total_trip_rec, "completed_trips": completed_trip_rec, "average_travel_time": average_travel_time_rec, "average_delay_ratio": average_delay_rec, "std_delay_ratio": std_delay_rec, "average_traveled_distance": average_traveled_distance_rec, "average_detour_ratio": average_detour_rec, "std_detour_ratio": std_detour_rec, "average_speed": averae_speed_rec, "std_speed": std_speed_rec, }) s.df_vehicle_groups = df return s.df_vehicle_groups
[docs] def mfd_to_pandas(s, links=None): """ Compute the MFD-like stats and return as a pandas DataFrame. Returns ------- pd.DataFrame """ if links == None: links = s.W.LINKS s.compute_mfd(links) links = [s.W.get_link(link) for link in links] links = frozenset(links) out = [["t", "network_k", "network_q"]] for i in lange(s.W.K_AREA): out.append([i*s.W.EULAR_DT, s.W.K_AREA[links][i], s.W.Q_AREA[links][i]]) s.df_mfd = pd.DataFrame(out[1:], columns=out[0]) return s.df_mfd
[docs] @catch_exceptions_and_warn() def output_data(s, fname=None): """ Save all results to CSV files. This is obsolute; not all functions are implemented. """ if fname == None: fname = f"out{s.W.name}/data" s.basic_to_pandas().to_csv(fname+"_basic.csv", index=False) s.od_to_pandas().to_csv(fname+"_od.csv", index=False) s.mfd_to_pandas().to_csv(fname+"_mfd.csv", index=False) s.link_to_pandas().to_csv(fname+"_link.csv", index=False) s.link_traffic_state_to_pandas().to_csv(fname+"_link_traffic_state.csv", index=False) s.vehicles_to_pandas().to_csv(fname+"_vehicles.csv", index=False)