nd_async_rusqlite/async_connection/
builder.rs1use super::AsyncConnection;
2use super::InnerAsyncConnection;
3use super::Message;
4use crate::Error;
5use std::path::Path;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9#[derive(Debug)]
11pub struct AsyncConnectionBuilder {}
12
13impl AsyncConnectionBuilder {
14 pub fn new() -> Self {
16 Self {}
17 }
18
19 fn open_internal(
21 &self,
22 path: PathBuf,
23 ) -> (
24 AsyncConnection,
25 tokio::sync::oneshot::Receiver<Result<(), rusqlite::Error>>,
26 ) {
27 let (tx, rx) = std::sync::mpsc::channel::<Message>();
28 let (connection_open_tx, connection_open_rx) = tokio::sync::oneshot::channel();
29 std::thread::spawn(move || async_connection_thread_impl(rx, path, connection_open_tx));
30
31 (
32 AsyncConnection {
33 inner: Arc::new(InnerAsyncConnection { tx }),
34 },
35 connection_open_rx,
36 )
37 }
38
39 pub async fn open<P>(&self, path: P) -> Result<AsyncConnection, Error>
41 where
42 P: AsRef<Path>,
43 {
44 let path = path.as_ref().to_path_buf();
45
46 let (async_connection, connection_open_rx) = self.open_internal(path);
47 connection_open_rx.await.map_err(|_| Error::Aborted)??;
48
49 Ok(async_connection)
50 }
51
52 pub fn blocking_open<P>(&self, path: P) -> Result<AsyncConnection, Error>
54 where
55 P: AsRef<Path>,
56 {
57 let path = path.as_ref().to_path_buf();
58
59 let (async_connection, connection_open_rx) = self.open_internal(path);
60 connection_open_rx
61 .blocking_recv()
62 .map_err(|_| Error::Aborted)??;
63
64 Ok(async_connection)
65 }
66}
67
68impl Default for AsyncConnectionBuilder {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74fn async_connection_thread_impl(
76 rx: std::sync::mpsc::Receiver<Message>,
77 path: PathBuf,
78 connection_open_tx: tokio::sync::oneshot::Sender<rusqlite::Result<()>>,
79) {
80 let mut connection = match rusqlite::Connection::open(path) {
81 Ok(connection) => {
82 if connection_open_tx.send(Ok(())).is_err() {
84 return;
85 }
86
87 connection
88 }
89 Err(error) => {
90 let _ = connection_open_tx.send(Err(error)).is_ok();
92 return;
93 }
94 };
95
96 let mut close_tx = None;
97 for message in rx.iter() {
98 match message {
99 Message::Close { tx } => {
100 close_tx = Some(tx);
101 break;
102 }
103 Message::Access { func } => {
104 func(&mut connection);
105 }
106 }
107 }
108
109 drop(rx);
113
114 let result = connection.close();
115 if let Some(tx) = close_tx {
116 let _ = tx
117 .send(result.map_err(|(_connection, error)| Error::from(error)))
118 .is_ok();
119 }
120}