CSLA.NET 6.0.0
CSLA .NET is a software development framework that helps you build a reusable, maintainable object-oriented business layer for your app.
RabbitMqProxy.cs
Go to the documentation of this file.
1//-----------------------------------------------------------------------
2// <copyright file="RabbitMqProxy.cs" company="Marimer LLC">
3// Copyright (c) Marimer LLC. All rights reserved.
4// Website: https://cslanet.com
5// </copyright>
6// <summary>Implements a data portal proxy to relay data portal</summary>
7//-----------------------------------------------------------------------
8using System;
9using System.Threading.Tasks;
11using Csla.Server;
12using RabbitMQ.Client;
13
15{
20 public class RabbitMqProxy : DataPortalProxy, IDisposable
21 {
28 public RabbitMqProxy(ApplicationContext applicationContext, RabbitMqProxyOptions options)
29 : base(applicationContext)
30 {
32 }
33
38 public override int Timeout { get; set; } = 30;
39
43 protected IConnection Connection { get; set; }
44
48 protected IModel Channel { get; set; }
49
54 protected string DataPortalQueueName { get; set; }
55
60 private ProxyListener QueueListener { get; set; }
61
67 protected virtual void InitializeRabbitMQ()
68 {
69 if (Connection == null)
70 {
71 var url = new Uri(DataPortalUrl);
72 Console.WriteLine($"Initializing {DataPortalUrl}");
73 if (url.Scheme != "rabbitmq")
74 throw new UriFormatException("Scheme != rabbitmq://");
75 if (string.IsNullOrWhiteSpace(url.Host))
76 throw new UriFormatException("Host");
77 DataPortalQueueName = url.AbsolutePath.Substring(1);
78 if (string.IsNullOrWhiteSpace(DataPortalQueueName))
79 throw new UriFormatException("DataPortalQueueName");
80 Console.WriteLine($"Will send to queue {DataPortalQueueName}");
81 var factory = new ConnectionFactory() { HostName = url.Host };
82 if (url.Port < 0)
83 factory.Port = url.Port;
84 var userInfo = url.UserInfo.Split(':');
85 if (userInfo.Length > 0 && !string.IsNullOrWhiteSpace(userInfo[0]))
86 factory.UserName = userInfo[0];
87 if (userInfo.Length > 1)
88 factory.Password = userInfo[1];
89 Connection = factory.CreateConnection();
90 Channel = Connection.CreateModel();
91 if (QueueListener == null)
92 {
93 QueueListener = ProxyListener.GetListener(url);
94 QueueListener.StartListening();
95 }
96 }
97 }
98
107 public override async Task<DataPortalResult> Create(Type objectType, object criteria, DataPortalContext context, bool isSync)
108 {
109 if (isSync)
110 throw new NotSupportedException("isSync == true");
111
113 return await base.Create(objectType, criteria, context, isSync);
114 }
115
126 public override async Task<DataPortalResult> Fetch(Type objectType, object criteria, DataPortalContext context, bool isSync)
127 {
128 if (isSync)
129 throw new NotSupportedException("isSync == true");
130
132 return await base.Fetch(objectType, criteria, context, isSync);
133 }
134
144 public override async Task<DataPortalResult> Update(object obj, DataPortalContext context, bool isSync)
145 {
146 if (isSync)
147 throw new NotSupportedException("isSync == true");
148
150 return await base.Update(obj, context, isSync);
151 }
152
163 public override async Task<DataPortalResult> Delete(Type objectType, object criteria, DataPortalContext context, bool isSync)
164 {
165 if (isSync)
166 throw new NotSupportedException("isSync == true");
167
169 return await base.Delete(objectType, criteria, context, isSync);
170 }
171
181 protected override async Task<byte[]> CallDataPortalServer(byte[] serialized, string operation, string routingToken, bool isSync)
182 {
183 var correlationId = Guid.NewGuid().ToString();
184 var resetEvent = new Csla.Threading.AsyncManualResetEvent();
185 var wip = Wip.WorkInProgress.GetOrAdd(correlationId, new WipItem { ResetEvent = resetEvent });
186
187 SendMessage(QueueListener.ReplyQueue.QueueName, correlationId, operation, serialized);
188
189 var timeout = Task.Delay(Timeout * 1000);
190 if (await Task.WhenAny(wip.ResetEvent.WaitAsync(), timeout) == timeout)
191 throw new TimeoutException();
192
193 return wip.Response;
194 }
195
196 private void SendMessage(string sender, string correlationId, string operation, byte[] request)
197 {
198 var props = Channel.CreateBasicProperties();
199 if (!string.IsNullOrWhiteSpace(sender))
200 props.ReplyTo = sender;
201 props.CorrelationId = correlationId;
202 props.Type = operation;
203 Channel.BasicPublish(
204 exchange: "",
205 routingKey: DataPortalQueueName,
206 basicProperties: props,
207 body: request);
208 }
209
213 public void Dispose()
214 {
215 QueueListener?.Dispose();
216 Connection?.Close();
217 Channel?.Dispose();
218 Connection?.Dispose();
219 Channel = null;
220 Connection = null;
221 }
222 }
223}
Provides consistent context information between the client and server DataPortal objects.
Implements a data portal proxy to relay data portal calls to a remote application server by using Rab...
string DataPortalQueueName
Gets or sets the name of the data portal service queue.
override async Task< DataPortalResult > Create(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to create a new business object.
override async Task< DataPortalResult > Fetch(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to load an existing business object.
IConnection Connection
Gets or sets the connection to the RabbitMQ service.
virtual void InitializeRabbitMQ()
Method responsible for creating the Connection, Channel, ReplyQueue, and DataPortalQueueName values u...
void Dispose()
Dispose this object and its resources.
override async Task< DataPortalResult > Delete(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to delete a business object.
override int Timeout
Gets or sets the timeout for network operations in seconds (default is 30 seconds).
override async Task< byte[]> CallDataPortalServer(byte[] serialized, string operation, string routingToken, bool isSync)
Create message and send to Rabbit MQ server.
IModel Channel
Gets or sets the channel (model) for RabbitMQ.
override async Task< DataPortalResult > Update(object obj, DataPortalContext context, bool isSync)
Called by DataPortal to update a business object.
RabbitMqProxy(ApplicationContext applicationContext, RabbitMqProxyOptions options)
Creates an instance of the object, initializing it to use the supplied URL.
string DataPortalUrl
Data portal server endpoint URL
Implements a data portal proxy to relay data portal calls to a remote application server.
string DataPortalUrl
Gets the URL address for the data portal server used by this proxy instance.
Provides consistent context information between the client and server DataPortal objects.
Async/await implementation of a ManualResetEvent