CSLA.NET 6.0.0
CSLA .NET is a software development framework that helps you build a reusable, maintainable object-oriented business layer for your app.
ProxyListener.cs
Go to the documentation of this file.
1//-----------------------------------------------------------------------
2// <copyright file="ProxyListener.cs" company="Marimer LLC">
3// Copyright (c) Marimer LLC. All rights reserved.
4// Website: https://cslanet.com
5// </copyright>
6// <summary>Handles replies from data portal server</summary>
7//-----------------------------------------------------------------------
8using System;
9using RabbitMQ.Client;
10using RabbitMQ.Client.Events;
11
13{
17 internal class ProxyListener : IDisposable
18 {
22 protected IConnection Connection { get; set; }
23
27 protected IModel Channel { get; set; }
28
33 public QueueDeclareOk ReplyQueue { get; set; }
34
35 private Uri QueueUri;
36
37 private static ProxyListener _instance;
38
39 private ProxyListener()
40 {
41 }
42
43 public static ProxyListener GetListener(Uri queueUri)
44 {
45 if (_instance == null)
46 {
47 lock (typeof(ProxyListener))
48 {
49 if (_instance == null)
50 {
51 _instance = new ProxyListener { QueueUri = queueUri };
52 }
53 }
54 }
55 return _instance;
56 }
57
58 private bool IsNamedReplyQueue { get; set; }
59
65 private void InitializeRabbitMQ()
66 {
67 var factory = new ConnectionFactory() { HostName = QueueUri.Host };
68 if (QueueUri.Port < 0)
69 factory.Port = QueueUri.Port;
70 var userInfo = QueueUri.UserInfo.Split(':');
71 if (userInfo.Length > 0 && !string.IsNullOrWhiteSpace(userInfo[0]))
72 factory.UserName = userInfo[0];
73 if (userInfo.Length > 1)
74 factory.Password = userInfo[1];
75 Connection = factory.CreateConnection();
76 Channel = Connection.CreateModel();
77 string[] query;
78 if (string.IsNullOrWhiteSpace(QueueUri.Query))
79 query = new string[] { };
80 else
81 query = QueueUri.Query.Substring(1).Split('&');
82 if (query.Length == 0 || !query[0].StartsWith("reply="))
83 {
84 IsNamedReplyQueue = false;
85 ReplyQueue = Channel.QueueDeclare();
86 }
87 else
88 {
89 IsNamedReplyQueue = true;
90 var split = query[0].Split('=');
91 ReplyQueue = Channel.QueueDeclare(
92 queue: split[1],
93 durable: false,
94 exclusive: false,
95 autoDelete: false,
96 arguments: null);
97 }
98 }
99
100 private volatile bool IsListening;
101 private object ListeningLock = new object();
102
103 public void StartListening()
104 {
105 if (IsListening) return;
106 lock (ListeningLock)
107 {
108 if (IsListening) return;
109 IsListening = true;
110 }
111
112 InitializeRabbitMQ();
113
114 var consumer = new EventingBasicConsumer(Channel);
115 consumer.Received += (model, ea) =>
116 {
117 Console.WriteLine($"Received reply for {ea.BasicProperties.CorrelationId}");
118 if (Wip.WorkInProgress.TryRemove(ea.BasicProperties.CorrelationId, out WipItem item))
119 {
120 item.Response = ea.Body.ToArray();
121 item.ResetEvent.Set();
122 }
123 else
124 {
125 // reply doesn't match any WIP item on this machine, but
126 // if we're using a named reply queue there could be other
127 // listeners; if so requeue the message up to 9 times
128 if (IsNamedReplyQueue && ea.BasicProperties.Priority < 9)
129 {
130 ea.BasicProperties.Priority++;
131 Channel.BasicPublish(
132 exchange: "",
133 routingKey: ReplyQueue.QueueName,
134 basicProperties: ea.BasicProperties,
135 body: ea.Body);
136 }
137 else
138 {
139 Console.WriteLine($"## WARN Undeliverable reply for {ea.BasicProperties.CorrelationId} (discarded by {Environment.MachineName})");
140 }
141 }
142 };
143 Console.WriteLine($"Listening on queue {ReplyQueue.QueueName}");
144 Channel.BasicConsume(queue: ReplyQueue.QueueName, autoAck: true, consumer: consumer);
145 }
146
147 public void Dispose()
148 {
149 Connection?.Close();
150 Channel?.Dispose();
151 Connection?.Dispose();
152 IsListening = false;
153 }
154 }
155}