CSLA.NET 5.4.2
CSLA .NET is a software development framework that helps you build a reusable, maintainable object-oriented business layer for your app.
RabbitMqPortal.cs
Go to the documentation of this file.
1//-----------------------------------------------------------------------
2// <copyright file="RabbitMqPortal.cs" company="Marimer LLC">
3// Copyright (c) Marimer LLC. All rights reserved.
4// Website: https://cslanet.com
5// </copyright>
6// <summary>Exposes server-side DataPortal functionality through RabbitMQ</summary>
7//-----------------------------------------------------------------------
8using System;
9using System.Security.Principal;
10using System.Threading.Tasks;
11using Csla.Core;
14using Csla.Server;
16using RabbitMQ.Client;
17using RabbitMQ.Client.Events;
18
20{
24 public class RabbitMqPortal : IDisposable
25 {
29 public string DataPortalUrl { get; private set; }
30
31 private IConnection Connection;
32 private IModel Channel;
33 private string DataPortalQueueName;
34
39 public int Timeout { get; set; } = 30;
40
45 {
46 DataPortalUrl = ApplicationContext.DataPortalUrlString;
47 }
48
53 public RabbitMqPortal(string dataPortalUrl)
54 {
55 DataPortalUrl = dataPortalUrl;
56 }
57
58 private Uri DataPortalUri { get; set; }
59
60 private void InitializeRabbitMQ()
61 {
62 if (Connection == null)
63 {
64 Console.WriteLine($"Initializing connection to {DataPortalUrl}");
65 DataPortalUri = new Uri(DataPortalUrl);
66 var url = DataPortalUri;
67 if (url.Scheme != "rabbitmq")
68 throw new UriFormatException("Scheme != rabbitmq://");
69 if (string.IsNullOrWhiteSpace(url.Host))
70 throw new UriFormatException("Host");
71 DataPortalQueueName = url.AbsolutePath.Substring(1);
72 if (string.IsNullOrWhiteSpace(DataPortalQueueName))
73 throw new UriFormatException("DataPortalQueueName");
74
75 var factory = new ConnectionFactory() { HostName = url.Host };
76 if (url.Port < 0)
77 factory.Port = url.Port;
78 var userInfo = url.UserInfo.Split(':');
79 if (userInfo.Length > 0 && !string.IsNullOrWhiteSpace(userInfo[0]))
80 factory.UserName = userInfo[0];
81 if (userInfo.Length > 1)
82 factory.Password = userInfo[1];
83 Connection = factory.CreateConnection();
84 Channel = Connection.CreateModel();
85 }
86 }
87
91 public void StartListening()
92 {
93 InitializeRabbitMQ();
94 Channel.QueueDeclare(
95 queue: DataPortalQueueName,
96 durable: false,
97 exclusive: false,
98 autoDelete: false,
99 arguments: null);
100
101 var consumer = new EventingBasicConsumer(Channel);
102 consumer.Received += (model, ea) =>
103 {
104 Console.WriteLine($"Received {ea.BasicProperties.Type} for {ea.BasicProperties.CorrelationId} from {ea.BasicProperties.ReplyTo}");
105 InvokePortal(ea, ea.Body.ToArray());
106 };
107 Console.WriteLine($"Listening on queue {DataPortalQueueName}");
108 Channel.BasicConsume(queue: DataPortalQueueName, autoAck: true, consumer: consumer);
109 }
110
111 private async void InvokePortal(BasicDeliverEventArgs ea, byte[] requestData)
112 {
113 var result = new DataPortalResponse();
114 try
115 {
116 var request = SerializationFormatterFactory.GetFormatter().Deserialize(requestData);
117 result = await CallPortal(ea.BasicProperties.Type, request);
118 }
119 catch (Exception ex)
120 {
121 result.ErrorData = new DataPortalErrorInfo(ex);
122 }
123
124 try
125 {
126 var response = SerializationFormatterFactory.GetFormatter().Serialize(result);
127 SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
128 }
129 catch (Exception ex)
130 {
131 try
132 {
133 result = new DataPortalResponse { ErrorData = new DataPortalErrorInfo(ex) };
134 var response = SerializationFormatterFactory.GetFormatter().Serialize(result);
135 SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
136 }
137 catch (Exception ex1)
138 {
139 Console.Error.WriteLine($"## ERROR {ex1.Message}");
140 }
141 }
142 }
143
144 private void SendMessage(string target, string correlationId, byte[] request)
145 {
146 InitializeRabbitMQ();
147 var props = Channel.CreateBasicProperties();
148 props.CorrelationId = correlationId;
149 Channel.BasicPublish(
150 exchange: "",
151 routingKey: target,
152 basicProperties: props,
153 body: request);
154 }
155
156 private async Task<DataPortalResponse> CallPortal(string operation, object request)
157 {
158 DataPortalResponse result;
159 switch (operation)
160 {
161 case "create":
162 result = await Create((CriteriaRequest)request).ConfigureAwait(false);
163 break;
164
165 case "fetch":
166 result = await Fetch((CriteriaRequest)request).ConfigureAwait(false);
167 break;
168
169 case "update":
170 result = await Update((UpdateRequest)request).ConfigureAwait(false);
171 break;
172
173 case "delete":
174 result = await Delete((CriteriaRequest)request).ConfigureAwait(false);
175 break;
176
177 default:
178 throw new InvalidOperationException(operation);
179 }
180 return result;
181 }
182
187 public async Task<DataPortalResponse> Create(CriteriaRequest request)
188 {
189 var result = new DataPortalResponse();
190 try
191 {
192 request = ConvertRequest(request);
193
194 // unpack criteria data into object
195 object criteria = GetCriteria(request.CriteriaData);
197 {
198 criteria = ((Csla.DataPortalClient.PrimitiveCriteria)criteria).Value;
199 }
200
201 var objectType = Csla.Reflection.MethodCaller.GetType(AssemblyNameTranslator.GetAssemblyQualifiedName(request.TypeName), true);
202 var context = new DataPortalContext(
203 (IPrincipal)SerializationFormatterFactory.GetFormatter().Deserialize(request.Principal),
204 true,
205 request.ClientCulture,
206 request.ClientUICulture,
207 (ContextDictionary)SerializationFormatterFactory.GetFormatter().Deserialize(request.ClientContext),
208 (ContextDictionary)SerializationFormatterFactory.GetFormatter().Deserialize(request.GlobalContext));
209
210 var prtl = new Csla.Server.DataPortal();
211 var dpr = await prtl.Create(objectType, criteria, context, true);
212
213 if (dpr.Error != null)
214 result.ErrorData = new DataPortalErrorInfo(dpr.Error);
215 result.GlobalContext = SerializationFormatterFactory.GetFormatter().Serialize(dpr.GlobalContext);
216 result.ObjectData = SerializationFormatterFactory.GetFormatter().Serialize(dpr.ReturnObject);
217 }
218 catch (Exception ex)
219 {
220 result.ErrorData = new DataPortalErrorInfo(ex);
221 throw;
222 }
223 finally
224 {
225 result = ConvertResponse(result);
226 }
227 return result;
228 }
229
234 public async Task<DataPortalResponse> Fetch(CriteriaRequest request)
235 {
236 var result = new DataPortalResponse();
237 try
238 {
239 request = ConvertRequest(request);
240
241 // unpack criteria data into object
242 object criteria = GetCriteria(request.CriteriaData);
244 {
245 criteria = ((Csla.DataPortalClient.PrimitiveCriteria)criteria).Value;
246 }
247
248 var objectType = Csla.Reflection.MethodCaller.GetType(AssemblyNameTranslator.GetAssemblyQualifiedName(request.TypeName), true);
249 var context = new DataPortalContext(
250 (IPrincipal)SerializationFormatterFactory.GetFormatter().Deserialize(request.Principal),
251 true,
252 request.ClientCulture,
253 request.ClientUICulture,
254 (ContextDictionary)SerializationFormatterFactory.GetFormatter().Deserialize(request.ClientContext),
255 (ContextDictionary)SerializationFormatterFactory.GetFormatter().Deserialize(request.GlobalContext));
256
257 var prtl = new Csla.Server.DataPortal();
258 var dpr = await prtl.Fetch(objectType, criteria, context, true);
259
260 if (dpr.Error != null)
261 result.ErrorData = new DataPortalErrorInfo(dpr.Error);
262 result.GlobalContext = SerializationFormatterFactory.GetFormatter().Serialize(dpr.GlobalContext);
263 result.ObjectData = SerializationFormatterFactory.GetFormatter().Serialize(dpr.ReturnObject);
264 }
265 catch (Exception ex)
266 {
267 result.ErrorData = new DataPortalErrorInfo(ex);
268 throw;
269 }
270 finally
271 {
272 result = ConvertResponse(result);
273 }
274 return result;
275 }
276
281 public async Task<DataPortalResponse> Update(UpdateRequest request)
282 {
283 var result = new DataPortalResponse();
284 try
285 {
286 request = ConvertRequest(request);
287 // unpack object
288 object obj = GetCriteria(request.ObjectData);
289
290 var context = new DataPortalContext(
291 (IPrincipal)SerializationFormatterFactory.GetFormatter().Deserialize(request.Principal),
292 true,
293 request.ClientCulture,
294 request.ClientUICulture,
295 (ContextDictionary)SerializationFormatterFactory.GetFormatter().Deserialize(request.ClientContext),
296 (ContextDictionary)SerializationFormatterFactory.GetFormatter().Deserialize(request.GlobalContext));
297
298 var prtl = new Csla.Server.DataPortal();
299 var dpr = await prtl.Update(obj, context, true);
300
301 if (dpr.Error != null)
302 result.ErrorData = new DataPortalErrorInfo(dpr.Error);
303
304 result.GlobalContext = SerializationFormatterFactory.GetFormatter().Serialize(dpr.GlobalContext);
305 result.ObjectData = SerializationFormatterFactory.GetFormatter().Serialize(dpr.ReturnObject);
306 }
307 catch (Exception ex)
308 {
309 result.ErrorData = new DataPortalErrorInfo(ex);
310 throw;
311 }
312 finally
313 {
314 result = ConvertResponse(result);
315 }
316 return result;
317 }
318
323 public async Task<DataPortalResponse> Delete(CriteriaRequest request)
324 {
325 var result = new DataPortalResponse();
326 try
327 {
328 request = ConvertRequest(request);
329
330 // unpack criteria data into object
331 object criteria = GetCriteria(request.CriteriaData);
333 {
334 criteria = ((Csla.DataPortalClient.PrimitiveCriteria)criteria).Value;
335 }
336
337 var objectType = Csla.Reflection.MethodCaller.GetType(AssemblyNameTranslator.GetAssemblyQualifiedName(request.TypeName), true);
338 var context = new DataPortalContext(
339 (IPrincipal)SerializationFormatterFactory.GetFormatter().Deserialize(request.Principal),
340 true,
341 request.ClientCulture,
342 request.ClientUICulture,
343 (ContextDictionary)SerializationFormatterFactory.GetFormatter().Deserialize(request.ClientContext),
344 (ContextDictionary)SerializationFormatterFactory.GetFormatter().Deserialize(request.GlobalContext));
345
346 var prtl = new Csla.Server.DataPortal();
347 var dpr = await prtl.Delete(objectType, criteria, context, true);
348
349 if (dpr.Error != null)
350 result.ErrorData = new DataPortalErrorInfo(dpr.Error);
351 result.GlobalContext = SerializationFormatterFactory.GetFormatter().Serialize(dpr.GlobalContext);
352 result.ObjectData = SerializationFormatterFactory.GetFormatter().Serialize(dpr.ReturnObject);
353 }
354 catch (Exception ex)
355 {
356 result.ErrorData = new DataPortalErrorInfo(ex);
357 throw;
358 }
359 finally
360 {
361 result = ConvertResponse(result);
362 }
363 return result;
364 }
365
366 #region Criteria
367
368 private static object GetCriteria(byte[] criteriaData)
369 {
370 object criteria = null;
371 if (criteriaData != null)
372 criteria = SerializationFormatterFactory.GetFormatter().Deserialize(criteriaData);
373 return criteria;
374 }
375
376 #endregion Criteria
377
378 #region Conversion methods
379
385 protected virtual UpdateRequest ConvertRequest(UpdateRequest request)
386 {
387 return request;
388 }
389
396 {
397 return request;
398 }
399
406 {
407 return response;
408 }
409
410 #endregion Conversion methods
411
415 public void Dispose()
416 {
417 Connection?.Close();
418 Channel?.Dispose();
419 Connection?.Dispose();
420 }
421 }
422}
Exposes server-side DataPortal functionality through RabbitMQ
void Dispose()
Dispose this object.
void StartListening()
Start processing inbound messages.
RabbitMqPortal(string dataPortalUrl)
Creates an instance of the object.
virtual UpdateRequest ConvertRequest(UpdateRequest request)
Override to convert the request data before it is transferred over the network.
string DataPortalUrl
Gets the URI for the data portal service.
async Task< DataPortalResponse > Delete(CriteriaRequest request)
Delete a business object.
async Task< DataPortalResponse > Create(CriteriaRequest request)
Create and initialize an existing business object.
virtual DataPortalResponse ConvertResponse(DataPortalResponse response)
Override to convert the response data after it comes back from the network.
int Timeout
Gets or sets the timeout for network operations in seconds (default is 30 seconds).
async Task< DataPortalResponse > Fetch(CriteriaRequest request)
Get an existing business object.
virtual CriteriaRequest ConvertRequest(CriteriaRequest request)
Override to convert the request data before it is transferred over the network.
async Task< DataPortalResponse > Update(UpdateRequest request)
Update a business object.
RabbitMqPortal()
Creates an instance of the object.
Dictionary type that is serializable with the SerializationFormatterFactory.GetFormatter().
Class used as a wrapper for criteria based requests that use primitives
Provides consistent context information between the client and server DataPortal objects.
Implements the server-side DataPortal message router as discussed in Chapter 4.
async Task< DataPortalResult > Fetch(Type objectType, object criteria, DataPortalContext context, bool isSync)
Get an existing business object.
async Task< DataPortalResult > Update(object obj, DataPortalContext context, bool isSync)
Update a business object.
async Task< DataPortalResult > Create(Type objectType, object criteria, DataPortalContext context, bool isSync)
Create a new business object.
async Task< DataPortalResult > Delete(Type objectType, object criteria, DataPortalContext context, bool isSync)
Delete a business object.
byte[] CriteriaData
Serialized data for the criteria object.
string TypeName
Assembly qualified name of the business object type to create.
byte[] GlobalContext
Serialized data for the global context object.
byte[] ClientContext
Serialized data for the client context object.
byte[] Principal
Serialized data for the principal object.
Message containing details about any server-side exception.
Response message for returning the results of a data portal call.
Request message for updating a business object.
byte[] GlobalContext
Serialized data for the global context object.
byte[] ClientContext
Serialized data for the client context object.
byte[] Principal
Serialized data for the principal object.