nd_async_rusqlite/async_connection/
builder.rs

1use super::AsyncConnection;
2use super::InnerAsyncConnection;
3use super::Message;
4use crate::Error;
5use std::path::Path;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9/// A builder for an [`AsyncConnection`].
10#[derive(Debug)]
11pub struct AsyncConnectionBuilder {}
12
13impl AsyncConnectionBuilder {
14    /// Create an [`AsyncConnectionBuilder`] with default settings.
15    pub fn new() -> Self {
16        Self {}
17    }
18
19    /// The internal open function.
20    fn open_internal(
21        &self,
22        path: PathBuf,
23    ) -> (
24        AsyncConnection,
25        tokio::sync::oneshot::Receiver<Result<(), rusqlite::Error>>,
26    ) {
27        let (tx, rx) = std::sync::mpsc::channel::<Message>();
28        let (connection_open_tx, connection_open_rx) = tokio::sync::oneshot::channel();
29        std::thread::spawn(move || async_connection_thread_impl(rx, path, connection_open_tx));
30
31        (
32            AsyncConnection {
33                inner: Arc::new(InnerAsyncConnection { tx }),
34            },
35            connection_open_rx,
36        )
37    }
38
39    /// Open the connection.
40    pub async fn open<P>(&self, path: P) -> Result<AsyncConnection, Error>
41    where
42        P: AsRef<Path>,
43    {
44        let path = path.as_ref().to_path_buf();
45
46        let (async_connection, connection_open_rx) = self.open_internal(path);
47        connection_open_rx.await.map_err(|_| Error::Aborted)??;
48
49        Ok(async_connection)
50    }
51
52    /// Open the connection from a blocking context.
53    pub fn blocking_open<P>(&self, path: P) -> Result<AsyncConnection, Error>
54    where
55        P: AsRef<Path>,
56    {
57        let path = path.as_ref().to_path_buf();
58
59        let (async_connection, connection_open_rx) = self.open_internal(path);
60        connection_open_rx
61            .blocking_recv()
62            .map_err(|_| Error::Aborted)??;
63
64        Ok(async_connection)
65    }
66}
67
68impl Default for AsyncConnectionBuilder {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74/// The impl for the async connection background thread.
75fn async_connection_thread_impl(
76    rx: std::sync::mpsc::Receiver<Message>,
77    path: PathBuf,
78    connection_open_tx: tokio::sync::oneshot::Sender<rusqlite::Result<()>>,
79) {
80    let mut connection = match rusqlite::Connection::open(path) {
81        Ok(connection) => {
82            // Check if the user cancelled the opening of the database connection and return early if needed.
83            if connection_open_tx.send(Ok(())).is_err() {
84                return;
85            }
86
87            connection
88        }
89        Err(error) => {
90            // Don't care if we succed since we should exit in either case.
91            let _ = connection_open_tx.send(Err(error)).is_ok();
92            return;
93        }
94    };
95
96    let mut close_tx = None;
97    for message in rx.iter() {
98        match message {
99            Message::Close { tx } => {
100                close_tx = Some(tx);
101                break;
102            }
103            Message::Access { func } => {
104                func(&mut connection);
105            }
106        }
107    }
108
109    // Drop rx.
110    // This will abort all queued messages, dropping them without sending a response.
111    // This is considered aborting the request.
112    drop(rx);
113
114    let result = connection.close();
115    if let Some(tx) = close_tx {
116        let _ = tx
117            .send(result.map_err(|(_connection, error)| Error::from(error)))
118            .is_ok();
119    }
120}