A. Main class

"""

 Created on 17-Oct-21
 @author: Kiril Zelenkovski

 Multiprocessing example for simulating FATD using Langevin Eq.

"""

from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
import math
from matplotlib.ticker import (AutoMinorLocator)
from multiprocessing import Pool

# Define parameters for BM process
global n, V, dt, x0, simulation_time, Dcx, X, d, sqrt_2D_dt, sqrt_dt
n = 100_000           # Size of Ensemble
V = 0                 # Drift for the diffusion process, v - external force (drift)
dt = 0.001            # dt = interval_size
x0 = 2                # y0 - starting point
simulation_time = 15  # tt - total time for each simulation
Dcx = 1               # Diffusion coefficient
X = 0                 # Arrival destination


# Arrival confidence interval (+-d)
d = (2 * Dcx * dt) ** 0.5  

# Calculate for faster usage
sqrt_2D_dt = (2 * Dcx * dt) ** 0.5
sqrt_dt = dt ** 0.5


def brownian_search(i=0):
    s = [x0]
    t = 0
    time_current = 0

    # Loop over time
    while True:
        if X - d <= s[t] <= X + d and len(s) > 1:
            # print(np.round(s[t], 5))
            return time_current

        if time_current > 100:
            return time_current

        s.insert(t + 1, s[t] + sqrt_2D_dt * np.random.normal())
        t += 1
        time_current += dt

B. Simulate FATD

Simulate the FATD by using Langevin equation approach:

  • \(N=10^5\) - Ensemble size

  • \(dt=0.01\) - Interval size

  • \(D_x=1\) - Diffusion coefficient

  • \(x_0=2\) - Starting point for each process

  • \(X=0\) - Arrival/stopping point that we are intrested in

# Create pool for multiprocessing
print("------------------------Started calculating FATD, X------------------------")
pool = Pool(5)
# fatd_s = np.array(pool.map(brownian_search, range(n)))
fatd_s = list(tqdm(pool.imap(brownian_search, range(n)), total=n))
pool.terminate()
print("\n")

time = np.arange(0.01, simulation_time + 1, 0.01)

print("\nd: ", d)
print("Length: ", len(fatd_s))
print("AVG: ", np.average(fatd_s))
print("MAX: ", np.max(fatd_s))
print("MIN: ", np.min(fatd_s))
print("time: ", len(time))
------------------------Started calculating FATD, X------------------------
100%|██████████| 100000/100000 [2:14:45<00:00, 12.37it/s]
d:  0.044721359549995794
Length:  100000
AVG:  20.668476360012917
MAX:  100.00000000011343
MIN:  0.08800000000000006
time:  1599

Results

# FATD figure
fig, (ax) = plt.subplots(1, 1, figsize=(6, 4), dpi=180)
time = np.arange(0.01, simulation_time + 1, 0.01)

# First Arrival Time Distribution - analytical
fatd_a = [(np.abs(X - x0) / (4 * np.pi * Dcx) ** 0.5) * np.exp(-np.square(X - x0) / (4 * Dcx * x)) * (
        1 / math.sqrt(math.pow(x, 3))) for x in time]
ax.plot(time, fatd_a, linewidth=0.7, alpha=0.8, label=r" Analytical (PDF)", color="#2C2891")

# Calculate bins
step = 0.1
start = np.floor(min(fatd_s) / step) * step
stop = max(fatd_s) + step
bin_edges = np.arange(start, stop, step=step)

# Calculate bin centers
hist, edges = np.histogram(fatd_s, bins=bin_edges, density=True)
centers = 0.5 * (edges[1:] + edges[:-1])
# print(centers)

# Plot histogram, markers
markers_on = [0, 1, 2, 4, 6, 8, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140]
plt.plot(centers, hist, color='#FB9300', label=r"Simulation (PDF)", marker="D", markerfacecolor='None', linestyle='None', markersize=5, markevery=markers_on)

# Plot actual data bins
plt.hist(fatd_s, bins=bin_edges, histtype='stepfilled', density=True, facecolor='sandybrown', alpha=0.35,
         label=r"Monte carlo sim.")

# Figure tweaks
# plt.axvline(x=0.72, linewidth=.5, alpha=1., color="black", linestyle='--')

plt.legend(loc='upper right',
            fancybox=True,
            shadow=True,
            fontsize='x-small')

ax.yaxis.set_minor_locator(AutoMinorLocator())
ax.yaxis.set_ticks_position('both')
plt.ylabel(r"$FATD$")
plt.ylim(0.0, 0.30)

ax.xaxis.set_minor_locator(AutoMinorLocator())
ax.xaxis.set_ticks_position('both')
plt.xlim(-0.1, simulation_time)
plt.xlabel(r"time, $t$")

plt.title(
    r"FATD Monte Carlo; $x_0=2$, $X$={x}, ".format(x=X) + r"$\mathcal{D}_x=$" + r"{D} $, \Delta t=${delt}".format(D=Dcx,
                                                                                                            delt=dt),
    fontdict={'size': 10}, pad=10)
plt.tight_layout(pad=2)
plt.show()
../_images/1_d_search_8_0.png