9using System.Security.Principal;
10using System.Threading.Tasks;
17using RabbitMQ.Client.Events;
31 private IConnection Connection;
32 private IModel Channel;
33 private string DataPortalQueueName;
58 private Uri DataPortalUri {
get;
set; }
60 private void InitializeRabbitMQ()
62 if (Connection ==
null)
64 Console.WriteLine($
"Initializing connection to {DataPortalUrl}");
66 var url = DataPortalUri;
67 if (url.Scheme !=
"rabbitmq")
68 throw new UriFormatException(
"Scheme != rabbitmq://");
69 if (
string.IsNullOrWhiteSpace(url.Host))
70 throw new UriFormatException(
"Host");
71 DataPortalQueueName = url.AbsolutePath.Substring(1);
72 if (
string.IsNullOrWhiteSpace(DataPortalQueueName))
73 throw new UriFormatException(
"DataPortalQueueName");
75 var factory =
new ConnectionFactory() { HostName = url.Host };
77 factory.Port = url.Port;
78 var userInfo = url.UserInfo.Split(
':');
79 if (userInfo.Length > 0 && !
string.IsNullOrWhiteSpace(userInfo[0]))
80 factory.UserName = userInfo[0];
81 if (userInfo.Length > 1)
82 factory.Password = userInfo[1];
83 Connection = factory.CreateConnection();
84 Channel = Connection.CreateModel();
95 queue: DataPortalQueueName,
101 var consumer =
new EventingBasicConsumer(Channel);
102 consumer.Received += (model, ea) =>
104 Console.WriteLine($
"Received {ea.BasicProperties.Type} for {ea.BasicProperties.CorrelationId} from {ea.BasicProperties.ReplyTo}");
105 InvokePortal(ea, ea.Body.ToArray());
107 Console.WriteLine($
"Listening on queue {DataPortalQueueName}");
108 Channel.BasicConsume(queue: DataPortalQueueName, autoAck:
true, consumer: consumer);
111 private async
void InvokePortal(BasicDeliverEventArgs ea,
byte[] requestData)
116 var request = SerializationFormatterFactory.GetFormatter().Deserialize(requestData);
117 result = await CallPortal(ea.BasicProperties.Type, request);
126 var response = SerializationFormatterFactory.GetFormatter().Serialize(result);
127 SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
134 var response = SerializationFormatterFactory.GetFormatter().Serialize(result);
135 SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
137 catch (Exception ex1)
139 Console.Error.WriteLine($
"## ERROR {ex1.Message}");
144 private void SendMessage(
string target,
string correlationId,
byte[] request)
146 InitializeRabbitMQ();
147 var props = Channel.CreateBasicProperties();
148 props.CorrelationId = correlationId;
149 Channel.BasicPublish(
152 basicProperties: props,
156 private async Task<DataPortalResponse> CallPortal(
string operation,
object request)
178 throw new InvalidOperationException(operation);
203 (IPrincipal)SerializationFormatterFactory.GetFormatter().Deserialize(request.
Principal),
211 var dpr = await prtl.
Create(objectType, criteria, context,
true);
213 if (dpr.Error !=
null)
215 result.GlobalContext = SerializationFormatterFactory.GetFormatter().Serialize(dpr.GlobalContext);
216 result.ObjectData = SerializationFormatterFactory.GetFormatter().Serialize(dpr.ReturnObject);
250 (IPrincipal)SerializationFormatterFactory.GetFormatter().Deserialize(request.
Principal),
258 var dpr = await prtl.
Fetch(objectType, criteria, context,
true);
260 if (dpr.Error !=
null)
262 result.GlobalContext = SerializationFormatterFactory.GetFormatter().Serialize(dpr.GlobalContext);
263 result.ObjectData = SerializationFormatterFactory.GetFormatter().Serialize(dpr.ReturnObject);
291 (IPrincipal)SerializationFormatterFactory.GetFormatter().Deserialize(request.
Principal),
299 var dpr = await prtl.
Update(obj, context,
true);
301 if (dpr.Error !=
null)
304 result.GlobalContext = SerializationFormatterFactory.GetFormatter().Serialize(dpr.GlobalContext);
305 result.ObjectData = SerializationFormatterFactory.GetFormatter().Serialize(dpr.ReturnObject);
339 (IPrincipal)SerializationFormatterFactory.GetFormatter().Deserialize(request.
Principal),
347 var dpr = await prtl.
Delete(objectType, criteria, context,
true);
349 if (dpr.Error !=
null)
351 result.GlobalContext = SerializationFormatterFactory.GetFormatter().Serialize(dpr.GlobalContext);
352 result.ObjectData = SerializationFormatterFactory.GetFormatter().Serialize(dpr.ReturnObject);
368 private static object GetCriteria(
byte[] criteriaData)
370 object criteria =
null;
371 if (criteriaData !=
null)
372 criteria = SerializationFormatterFactory.GetFormatter().Deserialize(criteriaData);
378 #region Conversion methods
410 #endregion Conversion methods
419 Connection?.Dispose();
Exposes server-side DataPortal functionality through RabbitMQ
void Dispose()
Dispose this object.
void StartListening()
Start processing inbound messages.
RabbitMqPortal(string dataPortalUrl)
Creates an instance of the object.
virtual UpdateRequest ConvertRequest(UpdateRequest request)
Override to convert the request data before it is transferred over the network.
string DataPortalUrl
Gets the URI for the data portal service.
async Task< DataPortalResponse > Delete(CriteriaRequest request)
Delete a business object.
async Task< DataPortalResponse > Create(CriteriaRequest request)
Create and initialize an existing business object.
virtual DataPortalResponse ConvertResponse(DataPortalResponse response)
Override to convert the response data after it comes back from the network.
int Timeout
Gets or sets the timeout for network operations in seconds (default is 30 seconds).
async Task< DataPortalResponse > Fetch(CriteriaRequest request)
Get an existing business object.
virtual CriteriaRequest ConvertRequest(CriteriaRequest request)
Override to convert the request data before it is transferred over the network.
async Task< DataPortalResponse > Update(UpdateRequest request)
Update a business object.
RabbitMqPortal()
Creates an instance of the object.
Dictionary type that is serializable with the SerializationFormatterFactory.GetFormatter().
Class used as a wrapper for criteria based requests that use primitives
Provides consistent context information between the client and server DataPortal objects.
Implements the server-side DataPortal message router as discussed in Chapter 4.
async Task< DataPortalResult > Fetch(Type objectType, object criteria, DataPortalContext context, bool isSync)
Get an existing business object.
async Task< DataPortalResult > Update(object obj, DataPortalContext context, bool isSync)
Update a business object.
async Task< DataPortalResult > Create(Type objectType, object criteria, DataPortalContext context, bool isSync)
Create a new business object.
async Task< DataPortalResult > Delete(Type objectType, object criteria, DataPortalContext context, bool isSync)
Delete a business object.
Message sent to the WCF data portal.
string ClientCulture
Serialized client culture.
byte[] CriteriaData
Serialized data for the criteria object.
string TypeName
Assembly qualified name of the business object type to create.
byte[] GlobalContext
Serialized data for the global context object.
byte[] ClientContext
Serialized data for the client context object.
byte[] Principal
Serialized data for the principal object.
string ClientUICulture
Serialized client UI culture.
Message containing details about any server-side exception.
Response message for returning the results of a data portal call.
Request message for updating a business object.
byte[] GlobalContext
Serialized data for the global context object.
byte[] ClientContext
Serialized data for the client context object.
string ClientCulture
Serialized client culture.
byte[] ObjectData
Serialized object data.
string ClientUICulture
Serialized client UI culture.
byte[] Principal
Serialized data for the principal object.