# lib/sim.py class World(object): """World is a structure that holds the state of N bodies and additional variables. threads : (int) The number of threads to use for multithreaded implementations. STATE OF THE WORLD: N : (int) The number of bodies in the simulation. m : (1D ndarray) The mass of each body. r : (2D ndarray) The position of each body. v : (2D ndarray) The velocity of each body. F : (2D ndarray) The force on each body. TEMPORARY VARIABLES: Ft : (3D ndarray) A 2D force array for each thread's local storage. s : (2D ndarray) The vectors from one body to all others. s3 : (1D ndarray) The norm of each s vector. NOTE: Ft is used by parallel algorithms for thread-local storage. s and s3 are only used by the Python implementation. """ def __init__(self, N, threads=1, m_min=1, m_max=30.0, r_max=50.0, v_max=4.0, dt=1e-3): self.threads = threads self.N = N self.m = np.random.uniform(m_min, m_max, N) self.r = np.random.uniform(-r_max, r_max, (N, 2)) self.v = np.random.uniform(-v_max, v_max, (N, 2)) self.F = np.zeros_like(self.r) self.Ft = np.zeros((threads, N, 2)) self.s = np.zeros_like(self.r) self.s3 = np.zeros_like(self.m) self.dt = dt |
在開始模擬時,N體被隨機分配質量m,位置r和速度v。對于每個時間步長,接下來的計算有:
合力F,每個體上的合力根據所有其他體的計算。
速度v,由于力的作用每個體的速度被改變。
位置R,由于速度每個體的位置被改變。
第一步是計算合力F,這將是我們的瓶頸。由于世界上存在的其他物體,單一物體上的力是所有作用力的總和。這導致復雜度為O(N^2)。速度v和位置r更新的復雜度都是O(N)。
如果你有興趣,這篇維基百科的文章介紹了一些可以加快力的計算的近似方法。
純Python
在純Python中,使用NumPy數組是時間演變函數的一種實現方式,它為優化提供了一個起點,并涉及測試其他實現方式。
# lib/sim.py def compute_F(w): """Compute the force on each body in the world, w.""" for i in xrange(w.N): w.s[:] = w.r - w.r[i] w.s3[:] = (w.s[:,0]**2 + w.s[:,1]**2)**1.5 w.s3[i] = 1.0 # This makes the self-force zero. w.F[i] = (w.m[i] * w.m[:,None] * w.s / w.s3[:,None]).sum(0) def evolve(w, steps): """Evolve the world, w, through the given number of steps.""" for _ in xrange(steps): compute_F(w) w.v += w.F * w.dt / w.m[:,None] w.r += w.v * w.dt |
合力計算的復雜度為O(N^2)的現象被NumPy的數組符號所掩蓋。每個數組操作遍歷數組元素。
可視化
這里是7個物體從隨機初始狀態開始演化的路徑圖:
性能
為了實現這個基準,我們在項目目錄下創建了一個腳本,包含如下內容:
import lib
w = lib.World(101)
lib.evolve(w, 4096)
我們使用cProfile模塊來測試衡量這個腳本。
python -m cProfile -scum bench.py
前幾行告訴我們,compute_F確實是我們的瓶頸,它占了超過99%的運行時間。
428710 function calls (428521 primitive calls) in 16.836 seconds Ordered by: cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 16.837 16.837 bench.py:2(<module>) 1 0.062 0.062 16.756 16.756 sim.py:60(evolve) 4096 15.551 0.004 16.693 0.004 sim.py:51(compute_F) 413696 1.142 0.000 1.142 0.000 {method 'sum' ... 3 0.002 0.001 0.115 0.038 __init__.py:1(<module>) ... |
在Intel i5臺式機上有101體,這種實現能夠通過每秒257個時間步長演化世界。
簡單的C擴展 1
在本節中,我們將看到一個C擴展模塊實現演化的功能。當看完這一節時,這可能幫助我們獲得一個C文件的副本。文件src/simple1.c,可以在GitHub上獲得。
關于NumPy的C API的其他文檔,請參閱NumPy的參考。Python的C API的詳細文檔在這里。
樣板
文件中的第一件事情是先聲明演化函數。這將直接用于下面的方法列表。
static PyObject *evolve(PyObject *self, PyObject *args);
接下來是方法列表。
static PyMethodDef methods[] = {
{ "evolve", evolve, METH_VARARGS, "Doc string."},
{ NULL, NULL, 0, NULL } /* Sentinel */
};
這是為擴展模塊的一個導出方法列表。這只有一個名為evolve方法。
樣板的最后一部分是模塊的初始化。
PyMODINIT_FUNC initsimple1(void) {
(void) Py_InitModule("simple1", methods);
import_array();
}
另外,正如這里顯示,initsimple1中的名稱必須與Py_InitModule中的第一個參數匹配。對每個使用NumPy API的擴展而言,調用import_array是有必要的。
數組訪問宏
數組訪問的宏可以在數組中被用來正確地索引,無論數組被如何重塑或分片。這些宏也使用如下的代碼使它們有更高的可讀性。
#define m(x0) (*(npy_float64*)((PyArray_DATA(py_m) + \
(x0) * PyArray_STRIDES(py_m)[0])))
#define m_shape(i) (py_m->dimensions[(i)])
#define r(x0, x1) (*(npy_float64*)((PyArray_DATA(py_r) + \
(x0) * PyArray_STRIDES(py_r)[0] + \
(x1) * PyArray_STRIDES(py_r)[1])))
#define r_shape(i) (py_r->dimensions[(i)])
在這里,我們看到訪問宏的一維和二維數組。具有更高維度的數組可以以類似的方式被訪問。
在這些宏的幫助下,我們可以使用下面的代碼循環r:
for(i = 0; i < r_shape(0); ++i) {
for(j = 0; j < r_shape(1); ++j) {
r(i, j) = 0; // Zero all elements.
}
}
命名標記
上面定義的宏,只在匹配NumPy的數組對象定義了正確的名稱時才有效。在上面的代碼中,數組被命名為py_m和py_r。為了在不同的方法中使用相同的宏,NumPy數組的名稱需要保持一致。
計算力
特別是與上面五行的Python代碼相比,計算力數組的方法顯得頗為繁瑣。
static inline void compute_F(npy_int64 N, PyArrayObject *py_m, PyArrayObject *py_r, PyArrayObject *py_F) { npy_int64 i, j; npy_float64 sx, sy, Fx, Fy, s3, tmp; // Set all forces to zero. for(i = 0; i < N; ++i) { F(i, 0) = F(i, 1) = 0; } // Compute forces between pairs of bodies. for(i = 0; i < N; ++i) { for(j = i + 1; j < N; ++j) { sx = r(j, 0) - r(i, 0); sy = r(j, 1) - r(i, 1); s3 = sqrt(sx*sx + sy*sy); s3 *= s3 * s3; tmp = m(i) * m(j) / s3; Fx = tmp * sx; Fy = tmp * sy; F(i, 0) += Fx; F(i, 1) += Fy; F(j, 0) -= Fx; F(j, 1) -= Fy; } } } |
請注意,我們使用牛頓第三定律(成對出現的力大小相等且方向相反)來降低內環范圍。不幸的是,它的復雜度仍然為O(N^2)。
演化函數
該文件中的最后一個函數是導出的演化方法。
static PyObject *evolve(PyObject *self, PyObject *args) { // Declare variables. npy_int64 N, threads, steps, step, i; npy_float64 dt; PyArrayObject *py_m, *py_r, *py_v, *py_F; // Parse arguments. if (!PyArg_ParseTuple(args, "ldllO!O!O!O!", &threads, &dt, &steps, &N, &PyArray_Type, &py_m, &PyArray_Type, &py_r, &PyArray_Type, &py_v, &PyArray_Type, &py_F)) { return NULL; } // Evolve the world. for(step = 0; step< steps; ++step) { compute_F(N, py_m, py_r, py_F); for(i = 0; i < N; ++i) { v(i, 0) += F(i, 0) * dt / m(i); v(i, 1) += F(i, 1) * dt / m(i); r(i, 0) += v(i, 0) * dt; r(i, 1) += v(i, 1) * dt; } } Py_RETURN_NONE; } |
在這里,我們看到了Python參數如何被解析。在該函數底部的時間步長循環中,我們看到的速度和位置向量的x和y分量的顯式計算。
性能
C版本的演化方法比Python版本更快,這應該不足為奇。在上面提到的相同的i5臺式機中,C實現的演化方法能夠實現每秒17972個時間步長。相比Python實現,這方面有70倍的提升。
觀察
注意,C代碼一直保持盡可能的簡單。輸入參數和輸出矩陣可以進行類型檢查,并分配一個Python裝飾器函數。刪除分配,不僅能加快處理,而且消除了由Python對象不正確的引用計數造成的內存泄露(或更糟)。
下一部分
在本系列文章的下一部分,我們將通過發揮C-相鄰NumPy矩陣的優勢來提升這種實現的性能。之后,我們來看看使用英特爾的SIMD指令和OpenMP來進一步推進。