nd_async_rusqlite/
async_connection.rs

1mod builder;
2
3pub use self::builder::AsyncConnectionBuilder;
4use crate::Error;
5use crate::SyncWrapper;
6use std::sync::Arc;
7
8enum Message {
9    Access {
10        func: Box<dyn FnOnce(&mut rusqlite::Connection) + Send + 'static>,
11    },
12    Close {
13        tx: tokio::sync::oneshot::Sender<Result<(), Error>>,
14    },
15}
16
17/// An async rusqlite connection.
18#[derive(Debug, Clone)]
19pub struct AsyncConnection {
20    inner: Arc<InnerAsyncConnection>,
21}
22
23impl AsyncConnection {
24    /// Get a builder for an [`AsyncConnection`].
25    pub fn builder() -> AsyncConnectionBuilder {
26        AsyncConnectionBuilder::new()
27    }
28
29    /// Close the database.
30    ///
31    /// This will queue a close request.
32    /// When the database processes the close request,
33    /// all current queued requests will be aborted.
34    ///
35    /// When this function returns,
36    /// the database will be closed no matter the value of the return.
37    /// The return value will return errors that occured while closing.
38    pub async fn close(&self) -> Result<(), Error> {
39        let (tx, rx) = tokio::sync::oneshot::channel();
40        self.inner
41            .tx
42            .send(Message::Close { tx })
43            .map_err(|_| Error::Aborted)?;
44        rx.await.map_err(|_| Error::Aborted)??;
45        Ok(())
46    }
47
48    /// Access the database.
49    ///
50    /// Note that dropping the returned future will not cancel the database access.
51    pub async fn access<F, T>(&self, func: F) -> Result<T, Error>
52    where
53        F: FnOnce(&mut rusqlite::Connection) -> T + Send + 'static,
54        T: Send + 'static,
55    {
56        // TODO: We should make this a function and have it return a named Future.
57        // This will allow users to avoid spawning a seperate task for each database call.
58
59        let (tx, rx) = tokio::sync::oneshot::channel();
60        self.inner
61            .tx
62            .send(Message::Access {
63                func: Box::new(move |connection| {
64                    // TODO: Consider aborting if rx hung up.
65
66                    let func = std::panic::AssertUnwindSafe(|| func(connection));
67                    let result = std::panic::catch_unwind(func);
68                    let result = result
69                        .map_err(|panic_data| Error::AccessPanic(SyncWrapper::new(panic_data)));
70                    let _ = tx.send(result).is_ok();
71                }),
72            })
73            .map_err(|_| Error::Aborted)?;
74        let result = rx.await.map_err(|_| Error::Aborted)??;
75        Ok(result)
76    }
77}
78
79#[derive(Debug)]
80struct InnerAsyncConnection {
81    tx: std::sync::mpsc::Sender<Message>,
82}
83
84#[cfg(test)]
85mod test {
86    use super::*;
87    use crate::test::_assert_send;
88    use crate::test::_assert_sync;
89
90    const _MESSAGE_IS_SEND: () = _assert_send::<Message>();
91
92    const _INNER_ASYNC_CONNECTION_IS_SEND: () = _assert_send::<InnerAsyncConnection>();
93    const _INNER_ASYNC_CONNECTION_IS_SYNC: () = _assert_sync::<InnerAsyncConnection>();
94
95    const _ASYNC_CONNECTION_IS_SEND: () = _assert_send::<AsyncConnection>();
96    const _ASYNC_CONNECTION_IS_SYNC: () = _assert_sync::<AsyncConnection>();
97}