9using System.Threading.Tasks;
29 : base(applicationContext)
38 public override int Timeout {
get;
set; } = 30;
60 private ProxyListener QueueListener {
get;
set; }
72 Console.WriteLine($
"Initializing {DataPortalUrl}");
73 if (url.Scheme !=
"rabbitmq")
74 throw new UriFormatException(
"Scheme != rabbitmq://");
75 if (
string.IsNullOrWhiteSpace(url.Host))
76 throw new UriFormatException(
"Host");
79 throw new UriFormatException(
"DataPortalQueueName");
80 Console.WriteLine($
"Will send to queue {DataPortalQueueName}");
81 var factory =
new ConnectionFactory() { HostName = url.Host };
83 factory.Port = url.Port;
84 var userInfo = url.UserInfo.Split(
':');
85 if (userInfo.Length > 0 && !
string.IsNullOrWhiteSpace(userInfo[0]))
86 factory.UserName = userInfo[0];
87 if (userInfo.Length > 1)
88 factory.Password = userInfo[1];
91 if (QueueListener ==
null)
93 QueueListener = ProxyListener.GetListener(url);
94 QueueListener.StartListening();
110 throw new NotSupportedException(
"isSync == true");
113 return await base.Create(objectType, criteria, context, isSync);
126 public override async Task<DataPortalResult>
Fetch(Type objectType,
object criteria,
DataPortalContext context,
bool isSync)
129 throw new NotSupportedException(
"isSync == true");
132 return await base.Fetch(objectType, criteria, context, isSync);
147 throw new NotSupportedException(
"isSync == true");
150 return await base.Update(obj, context, isSync);
166 throw new NotSupportedException(
"isSync == true");
169 return await base.Delete(objectType, criteria, context, isSync);
181 protected override async Task<byte[]>
CallDataPortalServer(
byte[] serialized,
string operation,
string routingToken,
bool isSync)
183 var correlationId = Guid.NewGuid().ToString();
185 var wip = Wip.WorkInProgress.GetOrAdd(correlationId,
new WipItem { ResetEvent = resetEvent });
187 SendMessage(QueueListener.ReplyQueue.QueueName, correlationId, operation, serialized);
189 var timeout = Task.Delay(
Timeout * 1000);
190 if (await Task.WhenAny(wip.ResetEvent.WaitAsync(), timeout) == timeout)
191 throw new TimeoutException();
196 private void SendMessage(
string sender,
string correlationId,
string operation,
byte[] request)
198 var props =
Channel.CreateBasicProperties();
199 if (!
string.IsNullOrWhiteSpace(sender))
200 props.ReplyTo = sender;
201 props.CorrelationId = correlationId;
202 props.Type = operation;
206 basicProperties: props,
215 QueueListener?.Dispose();
Provides consistent context information between the client and server DataPortal objects.
Implements a data portal proxy to relay data portal calls to a remote application server by using Rab...
string DataPortalQueueName
Gets or sets the name of the data portal service queue.
override async Task< DataPortalResult > Create(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to create a new business object.
override async Task< DataPortalResult > Fetch(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to load an existing business object.
IConnection Connection
Gets or sets the connection to the RabbitMQ service.
virtual void InitializeRabbitMQ()
Method responsible for creating the Connection, Channel, ReplyQueue, and DataPortalQueueName values u...
void Dispose()
Dispose this object and its resources.
override async Task< DataPortalResult > Delete(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to delete a business object.
override int Timeout
Gets or sets the timeout for network operations in seconds (default is 30 seconds).
override async Task< byte[]> CallDataPortalServer(byte[] serialized, string operation, string routingToken, bool isSync)
Create message and send to Rabbit MQ server.
IModel Channel
Gets or sets the channel (model) for RabbitMQ.
override async Task< DataPortalResult > Update(object obj, DataPortalContext context, bool isSync)
Called by DataPortal to update a business object.
RabbitMqProxy(ApplicationContext applicationContext, RabbitMqProxyOptions options)
Creates an instance of the object, initializing it to use the supplied URL.
Options for RabbitMqProxy
string DataPortalUrl
Data portal server endpoint URL
Implements a data portal proxy to relay data portal calls to a remote application server.
string DataPortalUrl
Gets the URL address for the data portal server used by this proxy instance.
Provides consistent context information between the client and server DataPortal objects.
Async/await implementation of a ManualResetEvent