ngx/async_/
spawn.rs

1use core::cell::UnsafeCell;
2use core::future::Future;
3use core::mem;
4use core::ptr::{self, NonNull};
5
6#[cfg(all(not(feature = "std"), feature = "alloc"))]
7use alloc::collections::vec_deque::VecDeque;
8#[cfg(feature = "std")]
9use std::collections::vec_deque::VecDeque;
10
11pub use async_task::Task;
12use async_task::{Runnable, ScheduleInfo, WithInfo};
13use nginx_sys::{
14    ngx_del_timer, ngx_delete_posted_event, ngx_event_t, ngx_post_event, ngx_posted_next_events,
15};
16
17use crate::log::ngx_cycle_log;
18use crate::{ngx_container_of, ngx_log_debug};
19
20static SCHEDULER: Scheduler = Scheduler::new();
21
22struct Scheduler(UnsafeCell<SchedulerInner>);
23
24// SAFETY: Scheduler must only be used from the main thread of a worker process.
25unsafe impl Send for Scheduler {}
26unsafe impl Sync for Scheduler {}
27
28impl Scheduler {
29    const fn new() -> Self {
30        Self(SchedulerInner::new())
31    }
32
33    pub fn schedule(&self, runnable: Runnable) {
34        // SAFETY: the cell is not empty, and we have exclusive access due to being a
35        // single-threaded application.
36        let inner = unsafe { &mut *UnsafeCell::raw_get(&self.0) };
37        inner.send(runnable)
38    }
39}
40
41#[repr(C)]
42struct SchedulerInner {
43    _ident: [usize; 4], // `ngx_event_ident` compatibility
44    event: ngx_event_t,
45    queue: VecDeque<Runnable>,
46}
47
48impl SchedulerInner {
49    const fn new() -> UnsafeCell<Self> {
50        let mut event: ngx_event_t = unsafe { mem::zeroed() };
51        event.handler = Some(Self::scheduler_event_handler);
52
53        UnsafeCell::new(Self {
54            _ident: [
55                0, 0, 0, 0x4153594e, // ASYN
56            ],
57            event,
58            queue: VecDeque::new(),
59        })
60    }
61
62    pub fn send(&mut self, runnable: Runnable) {
63        // Cached `ngx_cycle.log` can be invalidated when reloading configuration in a single
64        // process mode. Update `log` every time to avoid using stale log pointer.
65        self.event.log = ngx_cycle_log().as_ptr();
66
67        // While this event is not used as a timer at the moment, we still want to ensure that it is
68        // compatible with `ngx_event_ident`.
69        if self.event.data.is_null() {
70            self.event.data = ptr::from_mut(self).cast();
71        }
72
73        // FIXME: VecDeque::push could panic on an allocation failure, switch to a datastructure
74        // which will not and propagate the failure.
75        self.queue.push_back(runnable);
76        unsafe { ngx_post_event(&mut self.event, ptr::addr_of_mut!(ngx_posted_next_events)) }
77    }
78
79    /// This event handler is called by ngx_event_process_posted at the end of
80    /// ngx_process_events_and_timers.
81    extern "C" fn scheduler_event_handler(ev: *mut ngx_event_t) {
82        let mut runnables = {
83            // SAFETY:
84            // This handler always receives a non-null pointer to an event embedded into a
85            // UnsafeCell<SchedulerInner> instance. We modify the contents of the `UnsafeCell`,
86            // but we ensured that:
87            //  - we access the cell correctly, as documented in
88            //    https://doc.rust-lang.org/stable/std/cell/struct.UnsafeCell.html#memory-layout
89            //  - the access is unique due to being single-threaded
90            //  - the reference is dropped before we start processing queued runnables.
91            let cell: NonNull<UnsafeCell<Self>> =
92                unsafe { ngx_container_of!(NonNull::new_unchecked(ev), Self, event).cast() };
93            let this = unsafe { &mut *UnsafeCell::raw_get(cell.as_ptr()) };
94
95            ngx_log_debug!(
96                this.event.log,
97                "async: processing {} deferred wakeups",
98                this.queue.len()
99            );
100
101            // Move runnables to a new queue to avoid borrowing from the SchedulerInner and limit
102            // processing to already queued wakeups. This ensures that we correctly handle tasks
103            // that keep scheduling themselves (e.g. using yield_now() in a loop).
104            // We can't use drain() as it borrows from self and breaks aliasing rules.
105            mem::take(&mut this.queue)
106        };
107
108        for runnable in runnables.drain(..) {
109            runnable.run();
110        }
111    }
112}
113
114impl Drop for SchedulerInner {
115    fn drop(&mut self) {
116        if self.event.posted() != 0 {
117            unsafe { ngx_delete_posted_event(&mut self.event) };
118        }
119
120        if self.event.timer_set() != 0 {
121            unsafe { ngx_del_timer(&mut self.event) };
122        }
123    }
124}
125
126fn schedule(runnable: Runnable, info: ScheduleInfo) {
127    if info.woken_while_running {
128        SCHEDULER.schedule(runnable);
129        ngx_log_debug!(
130            ngx_cycle_log().as_ptr(),
131            "async: task scheduled while running"
132        );
133    } else {
134        runnable.run();
135    }
136}
137
138/// Creates a new task running on the NGINX event loop.
139pub fn spawn<F, T>(future: F) -> Task<T>
140where
141    F: Future<Output = T> + 'static,
142    T: 'static,
143{
144    ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: spawning new task");
145    let scheduler = WithInfo(schedule);
146    // Safety: single threaded embedding takes care of send/sync requirements for future and
147    // scheduler. Future and scheduler are both 'static.
148    let (runnable, task) = unsafe { async_task::spawn_unchecked(future, scheduler) };
149    runnable.schedule();
150    task
151}