CSLA.NET 6.0.0
CSLA .NET is a software development framework that helps you build a reusable, maintainable object-oriented business layer for your app.
RabbitMqPortal.cs
Go to the documentation of this file.
1//-----------------------------------------------------------------------
2// <copyright file="RabbitMqPortal.cs" company="Marimer LLC">
3// Copyright (c) Marimer LLC. All rights reserved.
4// Website: https://cslanet.com
5// </copyright>
6// <summary>Exposes server-side DataPortal functionality through RabbitMQ</summary>
7//-----------------------------------------------------------------------
8using System;
9using System.Security.Principal;
10using System.Threading.Tasks;
11using Csla.Core;
14using Csla.Server;
16using RabbitMQ.Client;
17using RabbitMQ.Client.Events;
18
20{
24 public class RabbitMqPortal : IDisposable
25 {
26 private IDataPortalServer dataPortalServer;
27 private ApplicationContext ApplicationContext { get; set; }
28
34 public RabbitMqPortal(ApplicationContext applicationContext, IDataPortalServer dataPortal)
35 {
36 dataPortalServer = dataPortal;
37 ApplicationContext = applicationContext;
38 }
39
43 public string DataPortalUrl { get; private set; }
44
45 private IConnection Connection;
46 private IModel Channel;
47 private string DataPortalQueueName;
48
53 public int Timeout { get; set; } = 30;
54
59 public RabbitMqPortal(string dataPortalUrl)
60 {
61 DataPortalUrl = dataPortalUrl;
62 }
63
64 private Uri DataPortalUri { get; set; }
65
66 private void InitializeRabbitMQ()
67 {
68 if (Connection == null)
69 {
70 Console.WriteLine($"Initializing connection to {DataPortalUrl}");
71 DataPortalUri = new Uri(DataPortalUrl);
72 var url = DataPortalUri;
73 if (url.Scheme != "rabbitmq")
74 throw new UriFormatException("Scheme != rabbitmq://");
75 if (string.IsNullOrWhiteSpace(url.Host))
76 throw new UriFormatException("Host");
77 DataPortalQueueName = url.AbsolutePath.Substring(1);
78 if (string.IsNullOrWhiteSpace(DataPortalQueueName))
79 throw new UriFormatException("DataPortalQueueName");
80
81 var factory = new ConnectionFactory() { HostName = url.Host };
82 if (url.Port < 0)
83 factory.Port = url.Port;
84 var userInfo = url.UserInfo.Split(':');
85 if (userInfo.Length > 0 && !string.IsNullOrWhiteSpace(userInfo[0]))
86 factory.UserName = userInfo[0];
87 if (userInfo.Length > 1)
88 factory.Password = userInfo[1];
89 Connection = factory.CreateConnection();
90 Channel = Connection.CreateModel();
91 }
92 }
93
97 public void StartListening()
98 {
99 InitializeRabbitMQ();
100 Channel.QueueDeclare(
101 queue: DataPortalQueueName,
102 durable: false,
103 exclusive: false,
104 autoDelete: false,
105 arguments: null);
106
107 var consumer = new EventingBasicConsumer(Channel);
108 consumer.Received += (model, ea) =>
109 {
110 Console.WriteLine($"Received {ea.BasicProperties.Type} for {ea.BasicProperties.CorrelationId} from {ea.BasicProperties.ReplyTo}");
111 InvokePortal(ea, ea.Body.ToArray());
112 };
113 Console.WriteLine($"Listening on queue {DataPortalQueueName}");
114 Channel.BasicConsume(queue: DataPortalQueueName, autoAck: true, consumer: consumer);
115 }
116
117 private async void InvokePortal(BasicDeliverEventArgs ea, byte[] requestData)
118 {
120 try
121 {
122 var request = SerializationFormatterFactory.GetFormatter(ApplicationContext).Deserialize(requestData);
123 result = await CallPortal(ea.BasicProperties.Type, request);
124 }
125 catch (Exception ex)
126 {
128 }
129
130 try
131 {
132 var response = SerializationFormatterFactory.GetFormatter(ApplicationContext).Serialize(result);
133 SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
134 }
135 catch (Exception ex)
136 {
137 try
138 {
139 result = ApplicationContext.CreateInstanceDI<DataPortalResponse>();
140 result.ErrorData = ApplicationContext.CreateInstanceDI<DataPortalErrorInfo>(ex);
141 var response = SerializationFormatterFactory.GetFormatter(ApplicationContext).Serialize(result);
142 SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
143 }
144 catch (Exception ex1)
145 {
146 Console.Error.WriteLine($"## ERROR {ex1.Message}");
147 }
148 }
149 }
150
151 private void SendMessage(string target, string correlationId, byte[] request)
152 {
153 InitializeRabbitMQ();
154 var props = Channel.CreateBasicProperties();
155 props.CorrelationId = correlationId;
156 Channel.BasicPublish(
157 exchange: "",
158 routingKey: target,
159 basicProperties: props,
160 body: request);
161 }
162
163 private async Task<DataPortalResponse> CallPortal(string operation, object request)
164 {
165 DataPortalResponse result;
166 switch (operation)
167 {
168 case "create":
169 result = await Create((CriteriaRequest)request).ConfigureAwait(false);
170 break;
171
172 case "fetch":
173 result = await Fetch((CriteriaRequest)request).ConfigureAwait(false);
174 break;
175
176 case "update":
177 result = await Update((UpdateRequest)request).ConfigureAwait(false);
178 break;
179
180 case "delete":
181 result = await Delete((CriteriaRequest)request).ConfigureAwait(false);
182 break;
183
184 default:
185 throw new InvalidOperationException(operation);
186 }
187 return result;
188 }
189
194 public async Task<DataPortalResponse> Create(CriteriaRequest request)
195 {
197 try
198 {
199 request = ConvertRequest(request);
200
201 // unpack criteria data into object
202 object criteria = GetCriteria(ApplicationContext, request.CriteriaData);
204 {
205 criteria = ((Csla.DataPortalClient.PrimitiveCriteria)criteria).Value;
206 }
207
208 var objectType = Csla.Reflection.MethodCaller.GetType(AssemblyNameTranslator.GetAssemblyQualifiedName(request.TypeName), true);
209 var context = new DataPortalContext(
210 ApplicationContext, (IPrincipal)SerializationFormatterFactory.GetFormatter(ApplicationContext).Deserialize(request.Principal),
211 true,
212 request.ClientCulture,
213 request.ClientUICulture,
214 (ContextDictionary)SerializationFormatterFactory.GetFormatter(ApplicationContext).Deserialize(request.ClientContext));
215
216 var dpr = await dataPortalServer.Create(objectType, criteria, context, true);
217
218 if (dpr.Error != null)
219 result.ErrorData = ApplicationContext.CreateInstanceDI<DataPortalErrorInfo>(dpr.Error);
220 result.ObjectData = SerializationFormatterFactory.GetFormatter(ApplicationContext).Serialize(dpr.ReturnObject);
221 }
222 catch (Exception ex)
223 {
225 throw;
226 }
227 finally
228 {
229 result = ConvertResponse(result);
230 }
231 return result;
232 }
233
238 public async Task<DataPortalResponse> Fetch(CriteriaRequest request)
239 {
241 try
242 {
243 request = ConvertRequest(request);
244
245 // unpack criteria data into object
246 object criteria = GetCriteria(ApplicationContext, request.CriteriaData);
248 {
249 criteria = ((Csla.DataPortalClient.PrimitiveCriteria)criteria).Value;
250 }
251
252 var objectType = Csla.Reflection.MethodCaller.GetType(AssemblyNameTranslator.GetAssemblyQualifiedName(request.TypeName), true);
253 var context = new DataPortalContext(
254 ApplicationContext, (IPrincipal)SerializationFormatterFactory.GetFormatter(ApplicationContext).Deserialize(request.Principal),
255 true,
256 request.ClientCulture,
257 request.ClientUICulture,
258 (ContextDictionary)SerializationFormatterFactory.GetFormatter(ApplicationContext).Deserialize(request.ClientContext));
259
260 var dpr = await dataPortalServer.Fetch(objectType, criteria, context, true);
261
262 if (dpr.Error != null)
263 result.ErrorData = ApplicationContext.CreateInstanceDI<DataPortalErrorInfo>(dpr.Error);
264 result.ObjectData = SerializationFormatterFactory.GetFormatter(ApplicationContext).Serialize(dpr.ReturnObject);
265 }
266 catch (Exception ex)
267 {
269 throw;
270 }
271 finally
272 {
273 result = ConvertResponse(result);
274 }
275 return result;
276 }
277
282 public async Task<DataPortalResponse> Update(UpdateRequest request)
283 {
285 try
286 {
287 request = ConvertRequest(request);
288 // unpack object
289 object obj = GetCriteria(ApplicationContext, request.ObjectData);
290
291 var context = new DataPortalContext(
292 ApplicationContext, (IPrincipal)SerializationFormatterFactory.GetFormatter(ApplicationContext).Deserialize(request.Principal),
293 true,
294 request.ClientCulture,
295 request.ClientUICulture,
296 (ContextDictionary)SerializationFormatterFactory.GetFormatter(ApplicationContext).Deserialize(request.ClientContext));
297
298 var dpr = await dataPortalServer.Update(obj, context, true);
299
300 if (dpr.Error != null)
301 result.ErrorData = ApplicationContext.CreateInstanceDI<DataPortalErrorInfo>(dpr.Error);
302
303 result.ObjectData = SerializationFormatterFactory.GetFormatter(ApplicationContext).Serialize(dpr.ReturnObject);
304 }
305 catch (Exception ex)
306 {
308 throw;
309 }
310 finally
311 {
312 result = ConvertResponse(result);
313 }
314 return result;
315 }
316
321 public async Task<DataPortalResponse> Delete(CriteriaRequest request)
322 {
324 try
325 {
326 request = ConvertRequest(request);
327
328 // unpack criteria data into object
329 object criteria = GetCriteria(ApplicationContext, request.CriteriaData);
331 {
332 criteria = ((Csla.DataPortalClient.PrimitiveCriteria)criteria).Value;
333 }
334
335 var objectType = Csla.Reflection.MethodCaller.GetType(AssemblyNameTranslator.GetAssemblyQualifiedName(request.TypeName), true);
336 var context = new DataPortalContext(
337 ApplicationContext, (IPrincipal)SerializationFormatterFactory.GetFormatter(ApplicationContext).Deserialize(request.Principal),
338 true,
339 request.ClientCulture,
340 request.ClientUICulture,
341 (ContextDictionary)SerializationFormatterFactory.GetFormatter(ApplicationContext).Deserialize(request.ClientContext));
342
343 var dpr = await dataPortalServer.Delete(objectType, criteria, context, true);
344
345 if (dpr.Error != null)
346 result.ErrorData = ApplicationContext.CreateInstanceDI<DataPortalErrorInfo>(dpr.Error);
347 result.ObjectData = SerializationFormatterFactory.GetFormatter(ApplicationContext).Serialize(dpr.ReturnObject);
348 }
349 catch (Exception ex)
350 {
352 throw;
353 }
354 finally
355 {
356 result = ConvertResponse(result);
357 }
358 return result;
359 }
360
361 #region Criteria
362
363 private static object GetCriteria(ApplicationContext applicationContext, byte[] criteriaData)
364 {
365 object criteria = null;
366 if (criteriaData != null)
367 criteria = SerializationFormatterFactory.GetFormatter(applicationContext).Deserialize(criteriaData);
368 return criteria;
369 }
370
371 #endregion Criteria
372
373 #region Conversion methods
374
380 protected virtual UpdateRequest ConvertRequest(UpdateRequest request)
381 {
382 return request;
383 }
384
391 {
392 return request;
393 }
394
401 {
402 return response;
403 }
404
405 #endregion Conversion methods
406
410 public void Dispose()
411 {
412 Connection?.Close();
413 Channel?.Dispose();
414 Connection?.Dispose();
415 }
416 }
417}
Csla.Server.DataPortalContext DataPortalContext
Provides consistent context information between the client and server DataPortal objects.
object CreateInstanceDI(Type objectType, params object[] parameters)
Creates an object using 'Activator.CreateInstance' using service provider (if one is available) to po...
Exposes server-side DataPortal functionality through RabbitMQ
void Dispose()
Dispose this object.
void StartListening()
Start processing inbound messages.
RabbitMqPortal(string dataPortalUrl)
Creates an instance of the object.
virtual UpdateRequest ConvertRequest(UpdateRequest request)
Override to convert the request data before it is transferred over the network.
string DataPortalUrl
Gets the URI for the data portal service.
RabbitMqPortal(ApplicationContext applicationContext, IDataPortalServer dataPortal)
Creates an instance of the type
async Task< DataPortalResponse > Delete(CriteriaRequest request)
Delete a business object.
async Task< DataPortalResponse > Create(CriteriaRequest request)
Create and initialize an existing business object.
virtual DataPortalResponse ConvertResponse(DataPortalResponse response)
Override to convert the response data after it comes back from the network.
int Timeout
Gets or sets the timeout for network operations in seconds (default is 30 seconds).
async Task< DataPortalResponse > Fetch(CriteriaRequest request)
Get an existing business object.
virtual CriteriaRequest ConvertRequest(CriteriaRequest request)
Override to convert the request data before it is transferred over the network.
async Task< DataPortalResponse > Update(UpdateRequest request)
Update a business object.
Dictionary type that is serializable with the SerializationFormatterFactory.GetFormatter().
Class used as a wrapper for criteria based requests that use primitives
Message sent to the WCF data portal.
string ClientCulture
Serialized client culture.
byte[] CriteriaData
Serialized data for the criteria object.
string TypeName
Assembly qualified name of the business object type to create.
byte[] ClientContext
Serialized data for the client context object.
byte[] Principal
Serialized data for the principal object.
string ClientUICulture
Serialized client UI culture.
Message containing details about any server-side exception.
Response message for returning the results of a data portal call.
DataPortalErrorInfo ErrorData
Server-side exception data if an exception occurred on the server.
Request message for updating a business object.
byte[] ClientContext
Serialized data for the client context object.
string ClientCulture
Serialized client culture.
string ClientUICulture
Serialized client UI culture.
byte[] Principal
Serialized data for the principal object.
Interface implemented by server-side data portal components.
Task< DataPortalResult > Update(object obj, DataPortalContext context, bool isSync)
Update a business object.
Task< DataPortalResult > Create(Type objectType, object criteria, DataPortalContext context, bool isSync)
Create a new business object.
Task< DataPortalResult > Fetch(Type objectType, object criteria, DataPortalContext context, bool isSync)
Get an existing business object.
Task< DataPortalResult > Delete(Type objectType, object criteria, DataPortalContext context, bool isSync)
Delete a business object.