1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#![warn(clippy::all)]
#![warn(missing_docs)]
use bb8_lapin::{bb8::Pool, LapinConnectionManager};
use lapin::{Connection, ConnectionProperties, Error};
const AUTH_USER: &str = "rabbitmq";
const AUTH_PASS: Option<&str> = Some("minerva");
pub const QUEUES: &[&str] = &["session_management"];
pub mod model;
pub type LapinPool = Pool<LapinConnectionManager>;
fn make_vhost_url(host: &str, vhost: &str) -> String {
format!("http://{}:15672/api/vhosts/{}", host, vhost)
}
pub fn build_broker_uri(host: &str, vhost: &str) -> String {
format!(
"amqp://{}:{}@{}:5672/{}",
AUTH_USER,
AUTH_PASS.unwrap(),
host,
if vhost.is_empty() { "%2f" } else { vhost }
)
}
pub async fn make_connection(host: &str, vhost: Option<&str>) -> Result<Connection, Error> {
let uri = build_broker_uri(host, vhost.unwrap_or(""));
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
Connection::connect(&uri, options).await
}
pub async fn make_connection_pool(
host: &str,
vhost: Option<&str>,
max_connections: u32,
) -> LapinPool {
let uri = build_broker_uri(host, vhost.unwrap_or(""));
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
let manager = LapinConnectionManager::new(&uri, options);
Pool::builder()
.max_size(max_connections)
.build(manager)
.await
.map_err(|e| panic!("Error creating RabbitMQ connection pool: {}", e))
.unwrap()
}
pub async fn check_virtual_host(host: &str) -> Result<bool, reqwest::Error> {
let url = make_vhost_url(host, "");
let client = reqwest::Client::new();
Ok(client
.get(url)
.basic_auth(AUTH_USER, AUTH_PASS)
.send()
.await?
.status()
.as_u16()
== 200)
}
pub async fn make_virtual_host(host: &str, vhost: &str) -> Result<Option<bool>, reqwest::Error> {
let url = make_vhost_url(host, vhost);
let client = reqwest::Client::new();
match client
.put(url)
.basic_auth(AUTH_USER, AUTH_PASS)
.send()
.await
{
Ok(response) => Ok(if !response.status().is_success() {
None
} else {
Some(response.status().as_u16() == 201)
}),
Err(e) => Err(e),
}
}