Mercurial > silverbladetech
view Messaging/Server/Listeners/RabbitQueueListener.cs @ 31:7d9de5746f18
Working version
author | adminsh@apollo |
---|---|
date | Thu, 22 Mar 2012 08:09:41 +0000 |
parents | 4c0dea4760c5 |
children |
line wrap: on
line source
using System; using System.Diagnostics; using System.Text; using Common.Messages; using GalaSoft.MvvmLight.Messaging; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.MessagePatterns; namespace Server.Listeners { class RabbitQueueListener : IListener { public bool IsListening { get; set; } private readonly int _port; private Subscription _subscription; private string _queueName; public RabbitQueueListener(int port, string queueName) { _port = port; _queueName = queueName; } public void Start() { try { if (IsListening) return; var serverAddress = "localhost:" + _port; var connectionFactory = new ConnectionFactory { Address = serverAddress }; using (var connection = connectionFactory.CreateConnection()) { using (var channel = connection.CreateModel()) { Log("Opening listener"); string queueName = channel.QueueDeclare(_queueName, false, false, false, null); channel.QueueBind(queueName, "amq.direct", queueName, null); using (_subscription = new Subscription(channel, queueName)) { IsListening = true; while (IsListening) { foreach (BasicDeliverEventArgs eventArgs in _subscription) { //Log(Encoding.UTF8.GetString(eventArgs.Body)); Messenger.Default.Send(new RabbitClientMessage()); _subscription.Ack(); } } } } } } catch (Exception e) { Log(e.Message); } } public void Stop() { if (!IsListening) return; _subscription.Close(); IsListening = false; Log("Listener closed."); } private void Log(string message) { Debug.WriteLine(message); Messenger.Default.Send(new RabbitLogMessage() { Body = message }); } } }