10using System.Threading.Tasks;
48 public override int Timeout {
get;
set; } = 30;
70 private ProxyListener QueueListener {
get;
set; }
82 Console.WriteLine($
"Initializing {DataPortalUrl}");
83 if (url.Scheme !=
"rabbitmq")
84 throw new UriFormatException(
"Scheme != rabbitmq://");
85 if (
string.IsNullOrWhiteSpace(url.Host))
86 throw new UriFormatException(
"Host");
89 throw new UriFormatException(
"DataPortalQueueName");
90 Console.WriteLine($
"Will send to queue {DataPortalQueueName}");
91 var factory =
new ConnectionFactory() { HostName = url.Host };
93 factory.Port = url.Port;
94 var userInfo = url.UserInfo.Split(
':');
95 if (userInfo.Length > 0 && !
string.IsNullOrWhiteSpace(userInfo[0]))
96 factory.UserName = userInfo[0];
97 if (userInfo.Length > 1)
98 factory.Password = userInfo[1];
101 if (QueueListener ==
null)
103 QueueListener = ProxyListener.GetListener(url);
104 QueueListener.StartListening();
120 throw new NotSupportedException(
"isSync == true");
123 return await base.Create(objectType, criteria, context, isSync);
136 public override async Task<DataPortalResult>
Fetch(Type objectType,
object criteria,
DataPortalContext context,
bool isSync)
139 throw new NotSupportedException(
"isSync == true");
142 return await base.Fetch(objectType, criteria, context, isSync);
157 throw new NotSupportedException(
"isSync == true");
160 return await base.Update(obj, context, isSync);
176 throw new NotSupportedException(
"isSync == true");
179 return await base.Delete(objectType, criteria, context, isSync);
191 protected override async Task<byte[]>
CallDataPortalServer(
byte[] serialized,
string operation,
string routingToken,
bool isSync)
193 var correlationId =
Guid.NewGuid().ToString();
195 var wip = Wip.WorkInProgress.GetOrAdd(correlationId,
new WipItem { ResetEvent = resetEvent });
197 SendMessage(QueueListener.ReplyQueue.QueueName, correlationId, operation, serialized);
199 var timeout = Task.Delay(
Timeout * 1000);
200 if (await Task.WhenAny(wip.ResetEvent.WaitAsync(), timeout) == timeout)
201 throw new TimeoutException();
206 private void SendMessage(
string sender,
string correlationId,
string operation,
byte[] request)
208 var props =
Channel.CreateBasicProperties();
209 if (!
string.IsNullOrWhiteSpace(sender))
210 props.ReplyTo = sender;
211 props.CorrelationId = correlationId;
212 props.Type = operation;
216 basicProperties: props,
225 QueueListener?.Dispose();
Implements a data portal proxy to relay data portal calls to a remote application server by using Rab...
string DataPortalQueueName
Gets or sets the name of the data portal service queue.
override async Task< DataPortalResult > Create(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to create a new business object.
override async Task< DataPortalResult > Fetch(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to load an existing business object.
RabbitMqProxy(string dataPortalUrl)
Creates an instance of the object, initializing it to use the supplied URL.
IConnection Connection
Gets or sets the connection to the RabbitMQ service.
virtual void InitializeRabbitMQ()
Method responsible for creating the Connection, Channel, ReplyQueue, and DataPortalQueueName values u...
void Dispose()
Dispose this object and its resources.
override async Task< DataPortalResult > Delete(Type objectType, object criteria, DataPortalContext context, bool isSync)
Called by DataPortal to delete a business object.
override int Timeout
Gets or sets the timeout for network operations in seconds (default is 30 seconds).
override async Task< byte[]> CallDataPortalServer(byte[] serialized, string operation, string routingToken, bool isSync)
Create message and send to Rabbit MQ server.
IModel Channel
Gets or sets the channel (model) for RabbitMQ.
RabbitMqProxy()
Creates an instance of the object, initializing it to use the DefaultUrl value.
override async Task< DataPortalResult > Update(object obj, DataPortalContext context, bool isSync)
Called by DataPortal to update a business object.
Implements a data portal proxy to relay data portal calls to a remote application server.
string DataPortalUrl
Gets the URL address for the data portal server used by this proxy instance.
Provides consistent context information between the client and server DataPortal objects.
Async/await implementation of a ManualResetEvent
@ Guid
Globally unique identifier / Guid