9using System.Security.Principal;
10using System.Threading.Tasks;
17using RabbitMQ.Client.Events;
36 dataPortalServer = dataPortal;
45 private IConnection Connection;
46 private IModel Channel;
47 private string DataPortalQueueName;
64 private Uri DataPortalUri {
get;
set; }
66 private void InitializeRabbitMQ()
68 if (Connection ==
null)
70 Console.WriteLine($
"Initializing connection to {DataPortalUrl}");
72 var url = DataPortalUri;
73 if (url.Scheme !=
"rabbitmq")
74 throw new UriFormatException(
"Scheme != rabbitmq://");
75 if (
string.IsNullOrWhiteSpace(url.Host))
76 throw new UriFormatException(
"Host");
77 DataPortalQueueName = url.AbsolutePath.Substring(1);
78 if (
string.IsNullOrWhiteSpace(DataPortalQueueName))
79 throw new UriFormatException(
"DataPortalQueueName");
81 var factory =
new ConnectionFactory() { HostName = url.Host };
83 factory.Port = url.Port;
84 var userInfo = url.UserInfo.Split(
':');
85 if (userInfo.Length > 0 && !
string.IsNullOrWhiteSpace(userInfo[0]))
86 factory.UserName = userInfo[0];
87 if (userInfo.Length > 1)
88 factory.Password = userInfo[1];
89 Connection = factory.CreateConnection();
90 Channel = Connection.CreateModel();
100 Channel.QueueDeclare(
101 queue: DataPortalQueueName,
107 var consumer =
new EventingBasicConsumer(Channel);
108 consumer.Received += (model, ea) =>
110 Console.WriteLine($
"Received {ea.BasicProperties.Type} for {ea.BasicProperties.CorrelationId} from {ea.BasicProperties.ReplyTo}");
111 InvokePortal(ea, ea.Body.ToArray());
113 Console.WriteLine($
"Listening on queue {DataPortalQueueName}");
114 Channel.BasicConsume(queue: DataPortalQueueName, autoAck:
true, consumer: consumer);
117 private async
void InvokePortal(BasicDeliverEventArgs ea,
byte[] requestData)
122 var request = SerializationFormatterFactory.GetFormatter(
ApplicationContext).Deserialize(requestData);
123 result = await CallPortal(ea.BasicProperties.Type, request);
132 var response = SerializationFormatterFactory.GetFormatter(
ApplicationContext).Serialize(result);
133 SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
141 var response = SerializationFormatterFactory.GetFormatter(ApplicationContext).Serialize(result);
142 SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response);
144 catch (Exception ex1)
146 Console.Error.WriteLine($
"## ERROR {ex1.Message}");
151 private void SendMessage(
string target,
string correlationId,
byte[] request)
153 InitializeRabbitMQ();
154 var props = Channel.CreateBasicProperties();
155 props.CorrelationId = correlationId;
156 Channel.BasicPublish(
159 basicProperties: props,
163 private async Task<DataPortalResponse> CallPortal(
string operation,
object request)
185 throw new InvalidOperationException(operation);
216 var dpr = await dataPortalServer.
Create(objectType, criteria, context,
true);
218 if (dpr.Error !=
null)
220 result.ObjectData = SerializationFormatterFactory.GetFormatter(
ApplicationContext).Serialize(dpr.ReturnObject);
260 var dpr = await dataPortalServer.
Fetch(objectType, criteria, context,
true);
262 if (dpr.Error !=
null)
264 result.ObjectData = SerializationFormatterFactory.GetFormatter(
ApplicationContext).Serialize(dpr.ReturnObject);
298 var dpr = await dataPortalServer.
Update(obj, context,
true);
300 if (dpr.Error !=
null)
303 result.ObjectData = SerializationFormatterFactory.GetFormatter(
ApplicationContext).Serialize(dpr.ReturnObject);
343 var dpr = await dataPortalServer.
Delete(objectType, criteria, context,
true);
345 if (dpr.Error !=
null)
347 result.ObjectData = SerializationFormatterFactory.GetFormatter(
ApplicationContext).Serialize(dpr.ReturnObject);
363 private static object GetCriteria(
ApplicationContext applicationContext,
byte[] criteriaData)
365 object criteria =
null;
366 if (criteriaData !=
null)
367 criteria = SerializationFormatterFactory.GetFormatter(applicationContext).Deserialize(criteriaData);
373 #region Conversion methods
405 #endregion Conversion methods
414 Connection?.Dispose();
Csla.Server.DataPortalContext DataPortalContext
Provides consistent context information between the client and server DataPortal objects.
object CreateInstanceDI(Type objectType, params object[] parameters)
Creates an object using 'Activator.CreateInstance' using service provider (if one is available) to po...
Exposes server-side DataPortal functionality through RabbitMQ
void Dispose()
Dispose this object.
void StartListening()
Start processing inbound messages.
RabbitMqPortal(string dataPortalUrl)
Creates an instance of the object.
virtual UpdateRequest ConvertRequest(UpdateRequest request)
Override to convert the request data before it is transferred over the network.
string DataPortalUrl
Gets the URI for the data portal service.
RabbitMqPortal(ApplicationContext applicationContext, IDataPortalServer dataPortal)
Creates an instance of the type
async Task< DataPortalResponse > Delete(CriteriaRequest request)
Delete a business object.
async Task< DataPortalResponse > Create(CriteriaRequest request)
Create and initialize an existing business object.
virtual DataPortalResponse ConvertResponse(DataPortalResponse response)
Override to convert the response data after it comes back from the network.
int Timeout
Gets or sets the timeout for network operations in seconds (default is 30 seconds).
async Task< DataPortalResponse > Fetch(CriteriaRequest request)
Get an existing business object.
virtual CriteriaRequest ConvertRequest(CriteriaRequest request)
Override to convert the request data before it is transferred over the network.
async Task< DataPortalResponse > Update(UpdateRequest request)
Update a business object.
Dictionary type that is serializable with the SerializationFormatterFactory.GetFormatter().
Class used as a wrapper for criteria based requests that use primitives
Message sent to the WCF data portal.
string ClientCulture
Serialized client culture.
byte[] CriteriaData
Serialized data for the criteria object.
string TypeName
Assembly qualified name of the business object type to create.
byte[] ClientContext
Serialized data for the client context object.
byte[] Principal
Serialized data for the principal object.
string ClientUICulture
Serialized client UI culture.
Message containing details about any server-side exception.
Response message for returning the results of a data portal call.
DataPortalErrorInfo ErrorData
Server-side exception data if an exception occurred on the server.
Request message for updating a business object.
byte[] ClientContext
Serialized data for the client context object.
string ClientCulture
Serialized client culture.
byte[] ObjectData
Serialized object data.
string ClientUICulture
Serialized client UI culture.
byte[] Principal
Serialized data for the principal object.
Interface implemented by server-side data portal components.
Task< DataPortalResult > Update(object obj, DataPortalContext context, bool isSync)
Update a business object.
Task< DataPortalResult > Create(Type objectType, object criteria, DataPortalContext context, bool isSync)
Create a new business object.
Task< DataPortalResult > Fetch(Type objectType, object criteria, DataPortalContext context, bool isSync)
Get an existing business object.
Task< DataPortalResult > Delete(Type objectType, object criteria, DataPortalContext context, bool isSync)
Delete a business object.