1use core::cell::UnsafeCell;
2use core::future::Future;
3use core::mem;
4use core::ptr::{self, NonNull};
5
6#[cfg(all(not(feature = "std"), feature = "alloc"))]
7use alloc::collections::vec_deque::VecDeque;
8#[cfg(feature = "std")]
9use std::collections::vec_deque::VecDeque;
10
11pub use async_task::Task;
12use async_task::{Runnable, ScheduleInfo, WithInfo};
13use nginx_sys::{
14 ngx_del_timer, ngx_delete_posted_event, ngx_event_t, ngx_post_event, ngx_posted_next_events,
15};
16
17use crate::log::ngx_cycle_log;
18use crate::{ngx_container_of, ngx_log_debug};
19
20static SCHEDULER: Scheduler = Scheduler::new();
21
22struct Scheduler(UnsafeCell<SchedulerInner>);
23
24unsafe impl Send for Scheduler {}
26unsafe impl Sync for Scheduler {}
27
28impl Scheduler {
29 const fn new() -> Self {
30 Self(SchedulerInner::new())
31 }
32
33 pub fn schedule(&self, runnable: Runnable) {
34 let inner = unsafe { &mut *UnsafeCell::raw_get(&self.0) };
37 inner.send(runnable)
38 }
39}
40
41#[repr(C)]
42struct SchedulerInner {
43 _ident: [usize; 4], event: ngx_event_t,
45 queue: VecDeque<Runnable>,
46}
47
48impl SchedulerInner {
49 const fn new() -> UnsafeCell<Self> {
50 let mut event: ngx_event_t = unsafe { mem::zeroed() };
51 event.handler = Some(Self::scheduler_event_handler);
52
53 UnsafeCell::new(Self {
54 _ident: [
55 0, 0, 0, 0x4153594e, ],
57 event,
58 queue: VecDeque::new(),
59 })
60 }
61
62 pub fn send(&mut self, runnable: Runnable) {
63 self.event.log = ngx_cycle_log().as_ptr();
66
67 if self.event.data.is_null() {
70 self.event.data = ptr::from_mut(self).cast();
71 }
72
73 self.queue.push_back(runnable);
76 unsafe { ngx_post_event(&mut self.event, ptr::addr_of_mut!(ngx_posted_next_events)) }
77 }
78
79 extern "C" fn scheduler_event_handler(ev: *mut ngx_event_t) {
82 let mut runnables = {
83 let cell: NonNull<UnsafeCell<Self>> =
92 unsafe { ngx_container_of!(NonNull::new_unchecked(ev), Self, event).cast() };
93 let this = unsafe { &mut *UnsafeCell::raw_get(cell.as_ptr()) };
94
95 ngx_log_debug!(
96 this.event.log,
97 "async: processing {} deferred wakeups",
98 this.queue.len()
99 );
100
101 mem::take(&mut this.queue)
106 };
107
108 for runnable in runnables.drain(..) {
109 runnable.run();
110 }
111 }
112}
113
114impl Drop for SchedulerInner {
115 fn drop(&mut self) {
116 if self.event.posted() != 0 {
117 unsafe { ngx_delete_posted_event(&mut self.event) };
118 }
119
120 if self.event.timer_set() != 0 {
121 unsafe { ngx_del_timer(&mut self.event) };
122 }
123 }
124}
125
126fn schedule(runnable: Runnable, info: ScheduleInfo) {
127 if info.woken_while_running {
128 SCHEDULER.schedule(runnable);
129 ngx_log_debug!(
130 ngx_cycle_log().as_ptr(),
131 "async: task scheduled while running"
132 );
133 } else {
134 runnable.run();
135 }
136}
137
138pub fn spawn<F, T>(future: F) -> Task<T>
140where
141 F: Future<Output = T> + 'static,
142 T: 'static,
143{
144 ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: spawning new task");
145 let scheduler = WithInfo(schedule);
146 let (runnable, task) = unsafe { async_task::spawn_unchecked(future, scheduler) };
149 runnable.schedule();
150 task
151}