changeset 28:4c0dea4760c5

RabbitMq working
author adminsh@apollo
date Wed, 21 Mar 2012 20:29:04 +0000
parents 96fdf58e05b4
children 9919ee227c93
files Messaging/Client/Client.csproj Messaging/Client/MainWindow.xaml Messaging/Client/MainWindow.xaml.cs Messaging/Client/message.ico Messaging/Common/Common.csproj Messaging/Common/Messages/ClientMessages.cs Messaging/Server/EndPoints/BaseEndPoint.cs Messaging/Server/EndPoints/RabbitEndPoint.cs Messaging/Server/Listeners/AsyncSocketListener.cs Messaging/Server/Listeners/RabbitQueueListener.cs Messaging/Server/Listeners/StateObject.cs Messaging/Server/Server.csproj Messaging/Server/UI/MainWindowViewModel.cs
diffstat 13 files changed, 147 insertions(+), 73 deletions(-) [+]
line wrap: on
line diff
--- a/Messaging/Client/Client.csproj	Wed Mar 21 19:00:59 2012 +0000
+++ b/Messaging/Client/Client.csproj	Wed Mar 21 20:29:04 2012 +0000
@@ -36,9 +36,14 @@
     <WarningLevel>4</WarningLevel>
   </PropertyGroup>
   <PropertyGroup>
-    <ApplicationIcon>message.ico</ApplicationIcon>
+    <ApplicationIcon>
+    </ApplicationIcon>
   </PropertyGroup>
   <ItemGroup>
+    <Reference Include="RabbitMQ.Client, Version=2.6.1.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce, processorArchitecture=MSIL">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\Libs\RabbitMq.2.6.1.0\RabbitMQ.Client.dll</HintPath>
+    </Reference>
     <Reference Include="System" />
     <Reference Include="System.Xml" />
     <Reference Include="System.Core" />
@@ -94,7 +99,10 @@
     <AppDesigner Include="Properties\" />
   </ItemGroup>
   <ItemGroup>
-    <Resource Include="message.ico" />
+    <ProjectReference Include="..\Common\Common.csproj">
+      <Project>{241CE91D-18AC-4D84-ACC2-2273F50A5E9B}</Project>
+      <Name>Common</Name>
+    </ProjectReference>
   </ItemGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
--- a/Messaging/Client/MainWindow.xaml	Wed Mar 21 19:00:59 2012 +0000
+++ b/Messaging/Client/MainWindow.xaml	Wed Mar 21 20:29:04 2012 +0000
@@ -1,8 +1,16 @@
 <Window x:Class="Client.MainWindow"
         xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
         xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
-        Title="MainWindow" Height="350" Width="525">
+        Title="MainWindow"
+        Width="525"
+        Height="350">
     <Grid>
-        
+        <StackPanel>
+            <Button Name="btnSockets" Content="Sockets" />
+            <Button Name="btnRabbit"
+                    Click="BtnRabbitClick"
+                    Content="Rabbit" />
+            <Button Name="btnRabbitProto" Content="Rabbit Proto"  />
+        </StackPanel>
     </Grid>
 </Window>
--- a/Messaging/Client/MainWindow.xaml.cs	Wed Mar 21 19:00:59 2012 +0000
+++ b/Messaging/Client/MainWindow.xaml.cs	Wed Mar 21 20:29:04 2012 +0000
@@ -10,6 +10,8 @@
 using System.Windows.Media.Imaging;
 using System.Windows.Navigation;
 using System.Windows.Shapes;
+using Common;
+using RabbitMQ.Client;
 
 namespace Client
 {
@@ -22,5 +24,21 @@
         {
             InitializeComponent();
         }
+
+        private void BtnRabbitClick(object sender, RoutedEventArgs e)
+        {
+            Task
+            var cf = new ConnectionFactory {Address = "localhost:" + Settings.RabbitPortNumber};
+
+            using (var conn = cf.CreateConnection())
+            using (var channel = conn.CreateModel())
+            {
+                for (var i = 0; i < 10000; i++)
+                {
+                    channel.BasicPublish("amq.direct", Settings.QueueName, null,
+                                         Encoding.UTF8.GetBytes("hello from the client!"));
+                }
+            }
+        }
     }
-}
+}
\ No newline at end of file
Binary file Messaging/Client/message.ico has changed
--- a/Messaging/Common/Common.csproj	Wed Mar 21 19:00:59 2012 +0000
+++ b/Messaging/Common/Common.csproj	Wed Mar 21 20:29:04 2012 +0000
@@ -42,6 +42,7 @@
     <Reference Include="WindowsBase" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="Messages\ClientMessages.cs" />
     <Compile Include="Xaml\BindingErrorTraceListener.cs" />
     <Compile Include="Logger\Log.cs" />
     <Compile Include="Messages\LogMessages.cs" />
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Messaging/Common/Messages/ClientMessages.cs	Wed Mar 21 20:29:04 2012 +0000
@@ -0,0 +1,16 @@
+using GalaSoft.MvvmLight.Messaging;
+
+namespace Common.Messages
+{
+    public class SocketClientMessage : MessageBase
+    {
+    }
+
+    public class RabbitClientMessage : MessageBase
+    {
+    }
+
+    public class RabbitProtoClientMessage : MessageBase
+    {
+    }
+}
--- a/Messaging/Server/EndPoints/BaseEndPoint.cs	Wed Mar 21 19:00:59 2012 +0000
+++ b/Messaging/Server/EndPoints/BaseEndPoint.cs	Wed Mar 21 20:29:04 2012 +0000
@@ -2,6 +2,7 @@
 using System.Threading.Tasks;
 using Common.Messages;
 using GalaSoft.MvvmLight;
+using GalaSoft.MvvmLight.Messaging;
 using Server.Interfaces;
 
 namespace Server.EndPoints
@@ -33,6 +34,11 @@
             this.DisplayLog = DisplayLog + Environment.NewLine + m.Body;
         }
 
+        protected void ClientMessageReceived(MessageBase m)
+        {
+            this.DisplayCount++;
+        }
+
         #region Properties
 
         #region IsListening
@@ -150,7 +156,7 @@
             if (_listener.IsListening)
             {
                 Task.Factory.StartNew(_listener.Stop);
-            }
+            } 
             else
             {
                 Task.Factory.StartNew(_listener.Start);
--- a/Messaging/Server/EndPoints/RabbitEndPoint.cs	Wed Mar 21 19:00:59 2012 +0000
+++ b/Messaging/Server/EndPoints/RabbitEndPoint.cs	Wed Mar 21 20:29:04 2012 +0000
@@ -14,6 +14,7 @@
         public override sealed void Init()
         {
             Messenger.Default.Register<RabbitLogMessage>(this, LogMessageReceived);
+            Messenger.Default.Register<RabbitClientMessage>(this, ClientMessageReceived);
         }
     }
 }
--- a/Messaging/Server/Listeners/AsyncSocketListener.cs	Wed Mar 21 19:00:59 2012 +0000
+++ b/Messaging/Server/Listeners/AsyncSocketListener.cs	Wed Mar 21 20:29:04 2012 +0000
@@ -25,7 +25,12 @@
 
         public void Start()
         {
-            Messenger.Default.Send(new SocketLogMessage() { Body = "Opening connection..." });
+            if (IsListening)
+            {
+                Messenger.Default.Send(new SocketLogMessage() { Body = "Already listening..." });
+                return;
+            }
+            Messenger.Default.Send(new SocketLogMessage() { Body = "Opening listener..." });
 
             // Data buffer for incoming data.
             var bytes = new Byte[1024];
@@ -43,34 +48,48 @@
                 {
 
                 // Bind the socket to the local endpoint and listen for incoming connections.
-                try
-                {
-                    listener.Bind(localEndPoint);
-                    listener.Listen(100);
-                    IsListening = true;
+                    try
+                    {
+                        listener.Bind(localEndPoint);
+                        listener.Listen(100);
+                        IsListening = true;
+
+                        Messenger.Default.Send(new SocketLogMessage() { Body = "Listener opened" });
 
-                    Messenger.Default.Send(new SocketLogMessage() { Body = "Connection opened" });
+                        while (IsListening)
+                        {
+                            // Set the event to nonsignaled state.
+                            AllDone.Reset();
 
-                    while (IsListening)
-                    {
-                        // Set the event to nonsignaled state.
-                        AllDone.Reset();
+                            // Start an asynchronous socket to listen for connections.
+                            Messenger.Default.Send(new SocketLogMessage() { Body = "Waiting for a connection..." });
+
+                            listener.BeginAccept(
+                                AcceptCallback,
+                                listener);
 
-                        // Start an asynchronous socket to listen for connections.
-                        Messenger.Default.Send(new SocketLogMessage() { Body = "Waiting for a connection..." });
-                    
-                        listener.BeginAccept(
-                            AcceptCallback,
-                            listener);
+                            // Wait until a connection is made before continuing.
+                            AllDone.WaitOne();
+                        }
+
+                        if (listener.Connected)
+                        {
+                            Messenger.Default.Send(new SocketLogMessage() { Body = "Shutting down listener..." });
+                            listener.Shutdown(SocketShutdown.Both);
 
-                        // Wait until a connection is made before continuing.
-                        AllDone.WaitOne();
+                            Messenger.Default.Send(new SocketLogMessage() { Body = "Disconnecting listener..." });
+                            listener.Disconnect(true);
+                        }
+
                     }
-                }
-                catch (Exception e)
-                {
-                    Messenger.Default.Send(new SocketLogMessage() { Body = e.Message });
-                }
+                    catch (Exception e)
+                    {
+                        Messenger.Default.Send(new SocketLogMessage() { Body = e.Message });
+                    }
+                    finally
+                    {
+                        Messenger.Default.Send(new SocketLogMessage() { Body = "Connection closed." });
+                    }
             }
         }
 
@@ -191,7 +210,23 @@
 
             Messenger.Default.Send(new SocketLogMessage() { Body = "Closing connection..." });
             IsListening = false;
-            Messenger.Default.Send(new SocketLogMessage() { Body = "Connection closed." });
+            AllDone.Set();
         }
     }
+
+    // State object for reading client data asynchronously
+    public class StateObject
+    {
+        // Client  socket.
+        public Socket WorkSocket;
+
+        // Size of receive buffer.
+        public const int BufferSize = 1024;
+
+        // Receive buffer.
+        public byte[] Buffer = new byte[BufferSize];
+
+        // Received data string.
+        public StringBuilder Sb = new StringBuilder();
+    }
 }
\ No newline at end of file
--- a/Messaging/Server/Listeners/RabbitQueueListener.cs	Wed Mar 21 19:00:59 2012 +0000
+++ b/Messaging/Server/Listeners/RabbitQueueListener.cs	Wed Mar 21 20:29:04 2012 +0000
@@ -14,16 +14,21 @@
     {
         public bool IsListening { get; set; }
         private readonly int _port;
+        private Subscription _subscription;
+        private string _queueName;
 
-        public RabbitQueueListener(int port)
+        public RabbitQueueListener(int port, string queueName)
         {
             _port = port;
+            _queueName = queueName;
         }
 
         public void Start()
         {
             try
             {
+                if (IsListening) return;
+
                 var serverAddress = "localhost:" + _port;
 
                 var connectionFactory = new ConnectionFactory { Address = serverAddress };
@@ -32,21 +37,22 @@
                 {
                     using (var channel = connection.CreateModel())
                     {
-                        Log("Creating a queue and binding it to amq.direct");
-                        string queueName = channel.QueueDeclare();
+                        Log(string.Format("Creating a queue {0} and binding it to amq.direct", _queueName));
+                        string queueName = channel.QueueDeclare(_queueName, false, false, false, null);
 
                         channel.QueueBind(queueName, "amq.direct", queueName, null);
-                        Log(string.Format("Done.  Created queue {0} and bound to amq.direct\n", queueName));
+                        Log("Done.");
 
-                        using (var subscription = new Subscription(channel, queueName))
+                        using (_subscription = new Subscription(channel, queueName))
                         {
                             IsListening = true;
                             while (IsListening)
                             {
-                                foreach (BasicDeliverEventArgs eventArgs in subscription)
+                                foreach (BasicDeliverEventArgs eventArgs in _subscription)
                                 {
-                                    Log(Encoding.UTF8.GetString(eventArgs.Body));
-                                    subscription.Ack();
+                                    //Log(Encoding.UTF8.GetString(eventArgs.Body));
+                                    Messenger.Default.Send(new RabbitClientMessage());
+                                    _subscription.Ack();
                                 }
                             }
                         }
@@ -61,9 +67,10 @@
 
         public void Stop()
         {
-            Log("Stopping listening...");
+            Log("Closing listener...");
+            _subscription.Close();
             IsListening = false;
-            Log("Listening stopped.");
+            Log("Done.");
         }
 
         private void Log(string message)
--- a/Messaging/Server/Listeners/StateObject.cs	Wed Mar 21 19:00:59 2012 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,21 +0,0 @@
-using System.Net.Sockets;
-using System.Text;
-
-namespace Server.Listeners
-{
-    // State object for reading client data asynchronously
-    public class StateObject
-    {
-        // Client  socket.
-        public Socket WorkSocket;
-
-        // Size of receive buffer.
-        public const int BufferSize = 1024;
-
-        // Receive buffer.
-        public byte[] Buffer = new byte[BufferSize];
-
-        // Received data string.
-        public StringBuilder Sb = new StringBuilder();
-    }
-}
--- a/Messaging/Server/Server.csproj	Wed Mar 21 19:00:59 2012 +0000
+++ b/Messaging/Server/Server.csproj	Wed Mar 21 20:29:04 2012 +0000
@@ -87,7 +87,6 @@
     <Compile Include="Locator.cs" />
     <Compile Include="Listeners\RabbitQueueListener.cs" />
     <Compile Include="UI\MainWindowViewModel.cs" />
-    <Compile Include="Listeners\StateObject.cs" />
     <Compile Include="EndPoints\BaseEndPoint.cs" />
     <Page Include="UI\MainWindow.xaml">
       <Generator>MSBuild:Compile</Generator>
--- a/Messaging/Server/UI/MainWindowViewModel.cs	Wed Mar 21 19:00:59 2012 +0000
+++ b/Messaging/Server/UI/MainWindowViewModel.cs	Wed Mar 21 20:29:04 2012 +0000
@@ -1,12 +1,9 @@
-using System;
-using System.Windows.Input;
+using System.Windows.Input;
 using Common;
 using Common.Logger;
-using Common.Messages;
 using GalaSoft.MvvmLight;
 using GalaSoft.MvvmLight.Command;
 using System.Windows;
-using GalaSoft.MvvmLight.Messaging;
 using Server.EndPoints;
 using Server.Interfaces;
 using Server.Listeners;
@@ -48,12 +45,13 @@
         public MainWindowViewModel()
         {
             InitSocketEndPoint(Settings.SocketsPortNumber);
-            InitRabbitEndPoint(Settings.RabbitPortNumber);
+            InitRabbitEndPoint(Settings.RabbitPortNumber, Settings.QueueName);
             InitRabbitProtoEndPoint(Settings.RabbitProtoPortNumber);
 
             CloseCommand = new RelayCommand(CloseCommandExecute);
+        }
 
-        }
+        #endregion
 
         private void InitSocketEndPoint(int port)
         {
@@ -65,9 +63,9 @@
             };
         }
 
-        private void InitRabbitEndPoint(int port)
+        private void InitRabbitEndPoint(int port, string queueName)
         {
-            IListener listener = new RabbitQueueListener(port);
+            IListener listener = new RabbitQueueListener(port, queueName);
             RabbitEndPoint = new RabbitEndPoint(listener)
             {
                 DisplayText = "RabbitMQ",
@@ -89,7 +87,5 @@
             Log.Write("Closing command executed");
             Application.Current.Shutdown();
         }
-
-        #endregion
     }
 }