CSLA.NET 5.4.2
CSLA .NET is a software development framework that helps you build a reusable, maintainable object-oriented business layer for your app.
RabbitMqProxy.cs
Go to the documentation of this file.
1//-----------------------------------------------------------------------
2// <copyright file="RabbitMqProxy.cs" company="Marimer LLC">
3// Copyright (c) Marimer LLC. All rights reserved.
4// Website: https://cslanet.com
5// </copyright>
6// <summary>Implements a data portal proxy to relay data portal</summary>
7//-----------------------------------------------------------------------
8using System;
9using System.Threading;
10using System.Threading.Tasks;
11using Csla.Core;
15using Csla.Server;
16using RabbitMQ.Client;
17
19{
24 public class RabbitMqProxy : DataPortalProxy, IDisposable
25 {
31 {
32 }
33
39 public RabbitMqProxy(string dataPortalUrl)
40 {
41 DataPortalUrl = dataPortalUrl;
42 }
43
48 public override int Timeout { get; set; } = 30;
49
53 protected IConnection Connection { get; set; }
54
58 protected IModel Channel { get; set; }
59
64 protected string DataPortalQueueName { get; set; }
65
70 private ProxyListener QueueListener { get; set; }
71
77 protected virtual void InitializeRabbitMQ()
78 {
79 if (Connection == null)
80 {
81 var url = new Uri(DataPortalUrl);
82 Console.WriteLine($"Initializing {DataPortalUrl}");
83 if (url.Scheme != "rabbitmq")
84 throw new UriFormatException("Scheme != rabbitmq://");
85 if (string.IsNullOrWhiteSpace(url.Host))
86 throw new UriFormatException("Host");
87 DataPortalQueueName = url.AbsolutePath.Substring(1);
88 if (string.IsNullOrWhiteSpace(DataPortalQueueName))
89 throw new UriFormatException("DataPortalQueueName");
90 Console.WriteLine($"Will send to queue {DataPortalQueueName}");
91 var factory = new ConnectionFactory() { HostName = url.Host };
92 if (url.Port < 0)
93 factory.Port = url.Port;
94 var userInfo = url.UserInfo.Split(':');
95 if (userInfo.Length > 0 && !string.IsNullOrWhiteSpace(userInfo[0]))
96 factory.UserName = userInfo[0];
97 if (userInfo.Length > 1)
98 factory.Password = userInfo[1];
99 Connection = factory.CreateConnection();
100 Channel = Connection.CreateModel();
101 if (QueueListener == null)
102 {
103 QueueListener = ProxyListener.GetListener(url);
104 QueueListener.StartListening();
105 }
106 }
107 }
108
117 public override async Task<DataPortalResult> Create(Type objectType, object criteria, DataPortalContext context, bool isSync)
118 {
119 if (isSync)
120 throw new NotSupportedException("isSync == true");
121
123 return await base.Create(objectType, criteria, context, isSync);
124 }
125
136 public override async Task<DataPortalResult> Fetch(Type objectType, object criteria, DataPortalContext context, bool isSync)
137 {
138 if (isSync)
139 throw new NotSupportedException("isSync == true");
140
142 return await base.Fetch(objectType, criteria, context, isSync);
143 }
144
154 public override async Task<DataPortalResult> Update(object obj, DataPortalContext context, bool isSync)
155 {
156 if (isSync)
157 throw new NotSupportedException("isSync == true");
158
160 return await base.Update(obj, context, isSync);
161 }
162
173 public override async Task<DataPortalResult> Delete(Type objectType, object criteria, DataPortalContext context, bool isSync)
174 {
175 if (isSync)
176 throw new NotSupportedException("isSync == true");
177
179 return await base.Delete(objectType, criteria, context, isSync);
180 }
181
191 protected override async Task<byte[]> CallDataPortalServer(byte[] serialized, string operation, string routingToken, bool isSync)
192 {
193 var correlationId = Guid.NewGuid().ToString();
194 var resetEvent = new Csla.Threading.AsyncManualResetEvent();
195 var wip = Wip.WorkInProgress.GetOrAdd(correlationId, new WipItem { ResetEvent = resetEvent });
196
197 SendMessage(QueueListener.ReplyQueue.QueueName, correlationId, operation, serialized);
198
199 var timeout = Task.Delay(Timeout * 1000);
200 if (await Task.WhenAny(wip.ResetEvent.WaitAsync(), timeout) == timeout)
201 throw new TimeoutException();
202
203 return wip.Response;
204 }
205
206 private void SendMessage(string sender, string correlationId, string operation, byte[] request)
207 {
208 var props = Channel.CreateBasicProperties();
209 if (!string.IsNullOrWhiteSpace(sender))
210 props.ReplyTo = sender;
211 props.CorrelationId = correlationId;
212 props.Type = operation;
213 Channel.BasicPublish(
214 exchange: "",
215 routingKey: DataPortalQueueName,
216 basicProperties: props,
217 body: request);
218 }
219
223 public void Dispose()
224 {
225 QueueListener?.Dispose();
226 Connection?.Close();
227 Channel?.Dispose();
228 Connection?.Dispose();
229 Channel = null;
230 Connection = null;
231 }
232 }
233}
Implements a data portal proxy to relay data portal calls to a remote application server by using Rab...
string DataPortalQueueName
Gets or sets the name of the data portal service queue.
override async Task< DataPortalResult > Create(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to create a new business object.
override async Task< DataPortalResult > Fetch(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to load an existing business object.
RabbitMqProxy(string dataPortalUrl)
Creates an instance of the object, initializing it to use the supplied URL.
IConnection Connection
Gets or sets the connection to the RabbitMQ service.
virtual void InitializeRabbitMQ()
Method responsible for creating the Connection, Channel, ReplyQueue, and DataPortalQueueName values u...
void Dispose()
Dispose this object and its resources.
override async Task< DataPortalResult > Delete(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to delete a business object.
override int Timeout
Gets or sets the timeout for network operations in seconds (default is 30 seconds).
override async Task< byte[]> CallDataPortalServer(byte[] serialized, string operation, string routingToken, bool isSync)
Create message and send to Rabbit MQ server.
IModel Channel
Gets or sets the channel (model) for RabbitMQ.
RabbitMqProxy()
Creates an instance of the object, initializing it to use the DefaultUrl value.
override async Task< DataPortalResult > Update(object obj, DataPortalContext context, bool isSync)
Called by DataPortal to update a business object.
Implements a data portal proxy to relay data portal calls to a remote application server.
string DataPortalUrl
Gets the URL address for the data portal server used by this proxy instance.
Provides consistent context information between the client and server DataPortal objects.
Async/await implementation of a ManualResetEvent
@ Guid
Globally unique identifier / Guid