ngx/async_/
sleep.rs

1use core::future::Future;
2use core::mem;
3use core::pin::Pin;
4use core::ptr::{self, NonNull};
5use core::task::{self, Poll};
6use core::time::Duration;
7
8use nginx_sys::{ngx_add_timer, ngx_del_timer, ngx_event_t, ngx_log_t, ngx_msec_int_t, ngx_msec_t};
9use pin_project_lite::pin_project;
10
11use crate::{ngx_container_of, ngx_log_debug};
12
13/// Maximum duration that can be achieved using [ngx_add_timer].
14const NGX_TIMER_DURATION_MAX: Duration = Duration::from_millis(ngx_msec_int_t::MAX as _);
15
16/// Puts the current task to sleep for at least the specified amount of time.
17///
18/// The function is a shorthand for [Sleep::new] using the global logger for debug output.
19#[inline]
20pub fn sleep(duration: Duration) -> Sleep {
21    Sleep::new(duration, crate::log::ngx_cycle_log())
22}
23
24pin_project! {
25/// Future returned by [sleep].
26pub struct Sleep {
27    #[pin]
28    timer: TimerEvent,
29    duration: Duration,
30}
31}
32
33impl Sleep {
34    /// Creates a new Sleep with the specified duration and logger for debug messages.
35    pub fn new(duration: Duration, log: NonNull<ngx_log_t>) -> Self {
36        let timer = TimerEvent::new(log);
37        ngx_log_debug!(timer.event.log, "async: sleep for {duration:?}");
38        Sleep { timer, duration }
39    }
40}
41
42impl Future for Sleep {
43    type Output = ();
44
45    #[cfg(not(target_pointer_width = "32"))]
46    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
47        let msec = self.duration.min(NGX_TIMER_DURATION_MAX).as_millis() as ngx_msec_t;
48        let this = self.project();
49        this.timer.poll_sleep(msec, cx)
50    }
51
52    #[cfg(target_pointer_width = "32")]
53    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
54        if self.duration.is_zero() {
55            return Poll::Ready(());
56        }
57        let step = self.duration.min(NGX_TIMER_DURATION_MAX);
58
59        let mut this = self.project();
60        // Handle ngx_msec_t overflow on 32-bit platforms.
61        match this.timer.as_mut().poll_sleep(step.as_millis() as _, cx) {
62            // Last step
63            Poll::Ready(()) if this.duration == &step => Poll::Ready(()),
64            Poll::Ready(()) => {
65                *this.duration = this.duration.saturating_sub(step);
66                this.timer.event.set_timedout(0); // rearm
67                this.timer.as_mut().poll_sleep(step.as_millis() as _, cx)
68            }
69            x => x,
70        }
71    }
72}
73
74struct TimerEvent {
75    event: ngx_event_t,
76    waker: Option<task::Waker>,
77}
78
79// SAFETY: Timer will only be used in a single-threaded environment
80unsafe impl Send for TimerEvent {}
81unsafe impl Sync for TimerEvent {}
82
83impl TimerEvent {
84    pub fn new(log: NonNull<ngx_log_t>) -> Self {
85        static IDENT: [usize; 4] = [
86            0, 0, 0, 0x4153594e, // ASYN
87        ];
88
89        let mut ev: ngx_event_t = unsafe { mem::zeroed() };
90        // The data is only used for `ngx_event_ident` and will not be mutated.
91        ev.data = ptr::addr_of!(IDENT).cast_mut().cast();
92        ev.handler = Some(Self::timer_handler);
93        ev.log = log.as_ptr();
94        ev.set_cancelable(1);
95
96        Self {
97            event: ev,
98            waker: None,
99        }
100    }
101
102    pub fn poll_sleep(
103        mut self: Pin<&mut Self>,
104        duration: ngx_msec_t,
105        context: &mut task::Context<'_>,
106    ) -> Poll<()> {
107        if self.event.timedout() != 0 {
108            Poll::Ready(())
109        } else if self.event.timer_set() != 0 {
110            if let Some(waker) = self.waker.as_mut() {
111                waker.clone_from(context.waker());
112            } else {
113                self.waker = Some(context.waker().clone());
114            }
115            Poll::Pending
116        } else {
117            unsafe { ngx_add_timer(ptr::addr_of_mut!(self.event), duration) };
118            self.waker = Some(context.waker().clone());
119            Poll::Pending
120        }
121    }
122
123    unsafe extern "C" fn timer_handler(ev: *mut ngx_event_t) {
124        let timer = ngx_container_of!(ev, Self, event);
125
126        if let Some(waker) = (*timer).waker.take() {
127            waker.wake();
128        }
129    }
130}
131
132impl Drop for TimerEvent {
133    fn drop(&mut self) {
134        if self.event.timer_set() != 0 {
135            unsafe { ngx_del_timer(ptr::addr_of_mut!(self.event)) };
136        }
137    }
138}