10using RabbitMQ.Client.Events;
17 internal class ProxyListener : IDisposable
22 protected IConnection Connection {
get;
set; }
27 protected IModel Channel {
get;
set; }
33 public QueueDeclareOk ReplyQueue {
get;
set; }
37 private static ProxyListener _instance;
39 private ProxyListener()
43 public static ProxyListener GetListener(Uri queueUri)
45 if (_instance ==
null)
47 lock (typeof(ProxyListener))
49 if (_instance ==
null)
51 _instance =
new ProxyListener { QueueUri = queueUri };
58 private bool IsNamedReplyQueue {
get;
set; }
65 private void InitializeRabbitMQ()
67 var factory =
new ConnectionFactory() { HostName = QueueUri.Host };
68 if (QueueUri.Port < 0)
69 factory.Port = QueueUri.Port;
70 var userInfo = QueueUri.UserInfo.Split(
':');
71 if (userInfo.Length > 0 && !
string.IsNullOrWhiteSpace(userInfo[0]))
72 factory.UserName = userInfo[0];
73 if (userInfo.Length > 1)
74 factory.Password = userInfo[1];
75 Connection = factory.CreateConnection();
76 Channel = Connection.CreateModel();
78 if (
string.IsNullOrWhiteSpace(QueueUri.Query))
79 query =
new string[] { };
81 query = QueueUri.Query.Substring(1).Split(
'&');
82 if (query.Length == 0 || !query[0].StartsWith(
"reply="))
84 IsNamedReplyQueue =
false;
85 ReplyQueue = Channel.QueueDeclare();
89 IsNamedReplyQueue =
true;
90 var split = query[0].Split(
'=');
91 ReplyQueue = Channel.QueueDeclare(
100 private volatile bool IsListening;
101 private object ListeningLock =
new object();
103 public void StartListening()
105 if (IsListening)
return;
108 if (IsListening)
return;
112 InitializeRabbitMQ();
114 var consumer =
new EventingBasicConsumer(Channel);
115 consumer.Received += (model, ea) =>
117 Console.WriteLine($
"Received reply for {ea.BasicProperties.CorrelationId}");
118 if (Wip.WorkInProgress.TryRemove(ea.BasicProperties.CorrelationId, out WipItem item))
120 item.Response = ea.Body.ToArray();
121 item.ResetEvent.Set();
128 if (IsNamedReplyQueue && ea.BasicProperties.Priority < 9)
130 ea.BasicProperties.Priority++;
131 Channel.BasicPublish(
133 routingKey: ReplyQueue.QueueName,
134 basicProperties: ea.BasicProperties,
139 Console.WriteLine($
"## WARN Undeliverable reply for {ea.BasicProperties.CorrelationId} (discarded by {Environment.MachineName})");
143 Console.WriteLine($
"Listening on queue {ReplyQueue.QueueName}");
144 Channel.BasicConsume(queue: ReplyQueue.QueueName, autoAck:
true, consumer: consumer);
147 public void Dispose()
151 Connection?.Dispose();