1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use lapin::{options::QueueDeclareOptions, types::FieldTable};
use minerva_broker as broker;
pub async fn create_virtual_host(
tenant: &str,
host: &str,
) -> Result<(), Box<dyn std::error::Error>> {
println!("{}: Creating RabbitMQ virtual host...", tenant);
match broker::make_virtual_host(host, tenant).await? {
None => panic!(
"Unable to create RabbitMQ virtual host for tenant {}",
tenant
),
Some(result) => {
if result {
println!("{}: RabbitMQ virtual host successfully created.", tenant);
} else {
println!("{}: RabbitMQ virtual host already exists.", tenant);
}
}
}
Ok(())
}
pub async fn broker_spinlock(host: &str) {
while !broker::check_virtual_host(host)
.await
.expect("Could not query broker")
{
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
}
}
pub async fn create_default_queues(
tenant: &str,
host: &str,
) -> Result<(), Box<dyn std::error::Error>> {
println!("{}: Creating default RabbitMQ queues...", tenant);
let connection = broker::make_connection(host, Some(tenant)).await?;
let channel = connection.create_channel().await?;
for queue in broker::QUEUES {
let options = QueueDeclareOptions {
durable: true,
..QueueDeclareOptions::default()
};
let _ = channel
.queue_declare(queue, options, FieldTable::default())
.await?;
println!("{}: Durable queue \"{}\" created.", tenant, queue);
}
Ok(())
}