Mercurial > silverbladetech
changeset 28:4c0dea4760c5
RabbitMq working
author | adminsh@apollo |
---|---|
date | Wed, 21 Mar 2012 20:29:04 +0000 |
parents | 96fdf58e05b4 |
children | 9919ee227c93 |
files | Messaging/Client/Client.csproj Messaging/Client/MainWindow.xaml Messaging/Client/MainWindow.xaml.cs Messaging/Client/message.ico Messaging/Common/Common.csproj Messaging/Common/Messages/ClientMessages.cs Messaging/Server/EndPoints/BaseEndPoint.cs Messaging/Server/EndPoints/RabbitEndPoint.cs Messaging/Server/Listeners/AsyncSocketListener.cs Messaging/Server/Listeners/RabbitQueueListener.cs Messaging/Server/Listeners/StateObject.cs Messaging/Server/Server.csproj Messaging/Server/UI/MainWindowViewModel.cs |
diffstat | 13 files changed, 147 insertions(+), 73 deletions(-) [+] |
line wrap: on
line diff
--- a/Messaging/Client/Client.csproj Wed Mar 21 19:00:59 2012 +0000 +++ b/Messaging/Client/Client.csproj Wed Mar 21 20:29:04 2012 +0000 @@ -36,9 +36,14 @@ <WarningLevel>4</WarningLevel> </PropertyGroup> <PropertyGroup> - <ApplicationIcon>message.ico</ApplicationIcon> + <ApplicationIcon> + </ApplicationIcon> </PropertyGroup> <ItemGroup> + <Reference Include="RabbitMQ.Client, Version=2.6.1.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce, processorArchitecture=MSIL"> + <SpecificVersion>False</SpecificVersion> + <HintPath>..\Libs\RabbitMq.2.6.1.0\RabbitMQ.Client.dll</HintPath> + </Reference> <Reference Include="System" /> <Reference Include="System.Xml" /> <Reference Include="System.Core" /> @@ -94,7 +99,10 @@ <AppDesigner Include="Properties\" /> </ItemGroup> <ItemGroup> - <Resource Include="message.ico" /> + <ProjectReference Include="..\Common\Common.csproj"> + <Project>{241CE91D-18AC-4D84-ACC2-2273F50A5E9B}</Project> + <Name>Common</Name> + </ProjectReference> </ItemGroup> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
--- a/Messaging/Client/MainWindow.xaml Wed Mar 21 19:00:59 2012 +0000 +++ b/Messaging/Client/MainWindow.xaml Wed Mar 21 20:29:04 2012 +0000 @@ -1,8 +1,16 @@ <Window x:Class="Client.MainWindow" xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" - Title="MainWindow" Height="350" Width="525"> + Title="MainWindow" + Width="525" + Height="350"> <Grid> - + <StackPanel> + <Button Name="btnSockets" Content="Sockets" /> + <Button Name="btnRabbit" + Click="BtnRabbitClick" + Content="Rabbit" /> + <Button Name="btnRabbitProto" Content="Rabbit Proto" /> + </StackPanel> </Grid> </Window>
--- a/Messaging/Client/MainWindow.xaml.cs Wed Mar 21 19:00:59 2012 +0000 +++ b/Messaging/Client/MainWindow.xaml.cs Wed Mar 21 20:29:04 2012 +0000 @@ -10,6 +10,8 @@ using System.Windows.Media.Imaging; using System.Windows.Navigation; using System.Windows.Shapes; +using Common; +using RabbitMQ.Client; namespace Client { @@ -22,5 +24,21 @@ { InitializeComponent(); } + + private void BtnRabbitClick(object sender, RoutedEventArgs e) + { + Task + var cf = new ConnectionFactory {Address = "localhost:" + Settings.RabbitPortNumber}; + + using (var conn = cf.CreateConnection()) + using (var channel = conn.CreateModel()) + { + for (var i = 0; i < 10000; i++) + { + channel.BasicPublish("amq.direct", Settings.QueueName, null, + Encoding.UTF8.GetBytes("hello from the client!")); + } + } + } } -} +} \ No newline at end of file
--- a/Messaging/Common/Common.csproj Wed Mar 21 19:00:59 2012 +0000 +++ b/Messaging/Common/Common.csproj Wed Mar 21 20:29:04 2012 +0000 @@ -42,6 +42,7 @@ <Reference Include="WindowsBase" /> </ItemGroup> <ItemGroup> + <Compile Include="Messages\ClientMessages.cs" /> <Compile Include="Xaml\BindingErrorTraceListener.cs" /> <Compile Include="Logger\Log.cs" /> <Compile Include="Messages\LogMessages.cs" />
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Messaging/Common/Messages/ClientMessages.cs Wed Mar 21 20:29:04 2012 +0000 @@ -0,0 +1,16 @@ +using GalaSoft.MvvmLight.Messaging; + +namespace Common.Messages +{ + public class SocketClientMessage : MessageBase + { + } + + public class RabbitClientMessage : MessageBase + { + } + + public class RabbitProtoClientMessage : MessageBase + { + } +}
--- a/Messaging/Server/EndPoints/BaseEndPoint.cs Wed Mar 21 19:00:59 2012 +0000 +++ b/Messaging/Server/EndPoints/BaseEndPoint.cs Wed Mar 21 20:29:04 2012 +0000 @@ -2,6 +2,7 @@ using System.Threading.Tasks; using Common.Messages; using GalaSoft.MvvmLight; +using GalaSoft.MvvmLight.Messaging; using Server.Interfaces; namespace Server.EndPoints @@ -33,6 +34,11 @@ this.DisplayLog = DisplayLog + Environment.NewLine + m.Body; } + protected void ClientMessageReceived(MessageBase m) + { + this.DisplayCount++; + } + #region Properties #region IsListening @@ -150,7 +156,7 @@ if (_listener.IsListening) { Task.Factory.StartNew(_listener.Stop); - } + } else { Task.Factory.StartNew(_listener.Start);
--- a/Messaging/Server/EndPoints/RabbitEndPoint.cs Wed Mar 21 19:00:59 2012 +0000 +++ b/Messaging/Server/EndPoints/RabbitEndPoint.cs Wed Mar 21 20:29:04 2012 +0000 @@ -14,6 +14,7 @@ public override sealed void Init() { Messenger.Default.Register<RabbitLogMessage>(this, LogMessageReceived); + Messenger.Default.Register<RabbitClientMessage>(this, ClientMessageReceived); } } }
--- a/Messaging/Server/Listeners/AsyncSocketListener.cs Wed Mar 21 19:00:59 2012 +0000 +++ b/Messaging/Server/Listeners/AsyncSocketListener.cs Wed Mar 21 20:29:04 2012 +0000 @@ -25,7 +25,12 @@ public void Start() { - Messenger.Default.Send(new SocketLogMessage() { Body = "Opening connection..." }); + if (IsListening) + { + Messenger.Default.Send(new SocketLogMessage() { Body = "Already listening..." }); + return; + } + Messenger.Default.Send(new SocketLogMessage() { Body = "Opening listener..." }); // Data buffer for incoming data. var bytes = new Byte[1024]; @@ -43,34 +48,48 @@ { // Bind the socket to the local endpoint and listen for incoming connections. - try - { - listener.Bind(localEndPoint); - listener.Listen(100); - IsListening = true; + try + { + listener.Bind(localEndPoint); + listener.Listen(100); + IsListening = true; + + Messenger.Default.Send(new SocketLogMessage() { Body = "Listener opened" }); - Messenger.Default.Send(new SocketLogMessage() { Body = "Connection opened" }); + while (IsListening) + { + // Set the event to nonsignaled state. + AllDone.Reset(); - while (IsListening) - { - // Set the event to nonsignaled state. - AllDone.Reset(); + // Start an asynchronous socket to listen for connections. + Messenger.Default.Send(new SocketLogMessage() { Body = "Waiting for a connection..." }); + + listener.BeginAccept( + AcceptCallback, + listener); - // Start an asynchronous socket to listen for connections. - Messenger.Default.Send(new SocketLogMessage() { Body = "Waiting for a connection..." }); - - listener.BeginAccept( - AcceptCallback, - listener); + // Wait until a connection is made before continuing. + AllDone.WaitOne(); + } + + if (listener.Connected) + { + Messenger.Default.Send(new SocketLogMessage() { Body = "Shutting down listener..." }); + listener.Shutdown(SocketShutdown.Both); - // Wait until a connection is made before continuing. - AllDone.WaitOne(); + Messenger.Default.Send(new SocketLogMessage() { Body = "Disconnecting listener..." }); + listener.Disconnect(true); + } + } - } - catch (Exception e) - { - Messenger.Default.Send(new SocketLogMessage() { Body = e.Message }); - } + catch (Exception e) + { + Messenger.Default.Send(new SocketLogMessage() { Body = e.Message }); + } + finally + { + Messenger.Default.Send(new SocketLogMessage() { Body = "Connection closed." }); + } } } @@ -191,7 +210,23 @@ Messenger.Default.Send(new SocketLogMessage() { Body = "Closing connection..." }); IsListening = false; - Messenger.Default.Send(new SocketLogMessage() { Body = "Connection closed." }); + AllDone.Set(); } } + + // State object for reading client data asynchronously + public class StateObject + { + // Client socket. + public Socket WorkSocket; + + // Size of receive buffer. + public const int BufferSize = 1024; + + // Receive buffer. + public byte[] Buffer = new byte[BufferSize]; + + // Received data string. + public StringBuilder Sb = new StringBuilder(); + } } \ No newline at end of file
--- a/Messaging/Server/Listeners/RabbitQueueListener.cs Wed Mar 21 19:00:59 2012 +0000 +++ b/Messaging/Server/Listeners/RabbitQueueListener.cs Wed Mar 21 20:29:04 2012 +0000 @@ -14,16 +14,21 @@ { public bool IsListening { get; set; } private readonly int _port; + private Subscription _subscription; + private string _queueName; - public RabbitQueueListener(int port) + public RabbitQueueListener(int port, string queueName) { _port = port; + _queueName = queueName; } public void Start() { try { + if (IsListening) return; + var serverAddress = "localhost:" + _port; var connectionFactory = new ConnectionFactory { Address = serverAddress }; @@ -32,21 +37,22 @@ { using (var channel = connection.CreateModel()) { - Log("Creating a queue and binding it to amq.direct"); - string queueName = channel.QueueDeclare(); + Log(string.Format("Creating a queue {0} and binding it to amq.direct", _queueName)); + string queueName = channel.QueueDeclare(_queueName, false, false, false, null); channel.QueueBind(queueName, "amq.direct", queueName, null); - Log(string.Format("Done. Created queue {0} and bound to amq.direct\n", queueName)); + Log("Done."); - using (var subscription = new Subscription(channel, queueName)) + using (_subscription = new Subscription(channel, queueName)) { IsListening = true; while (IsListening) { - foreach (BasicDeliverEventArgs eventArgs in subscription) + foreach (BasicDeliverEventArgs eventArgs in _subscription) { - Log(Encoding.UTF8.GetString(eventArgs.Body)); - subscription.Ack(); + //Log(Encoding.UTF8.GetString(eventArgs.Body)); + Messenger.Default.Send(new RabbitClientMessage()); + _subscription.Ack(); } } } @@ -61,9 +67,10 @@ public void Stop() { - Log("Stopping listening..."); + Log("Closing listener..."); + _subscription.Close(); IsListening = false; - Log("Listening stopped."); + Log("Done."); } private void Log(string message)
--- a/Messaging/Server/Listeners/StateObject.cs Wed Mar 21 19:00:59 2012 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,21 +0,0 @@ -using System.Net.Sockets; -using System.Text; - -namespace Server.Listeners -{ - // State object for reading client data asynchronously - public class StateObject - { - // Client socket. - public Socket WorkSocket; - - // Size of receive buffer. - public const int BufferSize = 1024; - - // Receive buffer. - public byte[] Buffer = new byte[BufferSize]; - - // Received data string. - public StringBuilder Sb = new StringBuilder(); - } -}
--- a/Messaging/Server/Server.csproj Wed Mar 21 19:00:59 2012 +0000 +++ b/Messaging/Server/Server.csproj Wed Mar 21 20:29:04 2012 +0000 @@ -87,7 +87,6 @@ <Compile Include="Locator.cs" /> <Compile Include="Listeners\RabbitQueueListener.cs" /> <Compile Include="UI\MainWindowViewModel.cs" /> - <Compile Include="Listeners\StateObject.cs" /> <Compile Include="EndPoints\BaseEndPoint.cs" /> <Page Include="UI\MainWindow.xaml"> <Generator>MSBuild:Compile</Generator>
--- a/Messaging/Server/UI/MainWindowViewModel.cs Wed Mar 21 19:00:59 2012 +0000 +++ b/Messaging/Server/UI/MainWindowViewModel.cs Wed Mar 21 20:29:04 2012 +0000 @@ -1,12 +1,9 @@ -using System; -using System.Windows.Input; +using System.Windows.Input; using Common; using Common.Logger; -using Common.Messages; using GalaSoft.MvvmLight; using GalaSoft.MvvmLight.Command; using System.Windows; -using GalaSoft.MvvmLight.Messaging; using Server.EndPoints; using Server.Interfaces; using Server.Listeners; @@ -48,12 +45,13 @@ public MainWindowViewModel() { InitSocketEndPoint(Settings.SocketsPortNumber); - InitRabbitEndPoint(Settings.RabbitPortNumber); + InitRabbitEndPoint(Settings.RabbitPortNumber, Settings.QueueName); InitRabbitProtoEndPoint(Settings.RabbitProtoPortNumber); CloseCommand = new RelayCommand(CloseCommandExecute); + } - } + #endregion private void InitSocketEndPoint(int port) { @@ -65,9 +63,9 @@ }; } - private void InitRabbitEndPoint(int port) + private void InitRabbitEndPoint(int port, string queueName) { - IListener listener = new RabbitQueueListener(port); + IListener listener = new RabbitQueueListener(port, queueName); RabbitEndPoint = new RabbitEndPoint(listener) { DisplayText = "RabbitMQ", @@ -89,7 +87,5 @@ Log.Write("Closing command executed"); Application.Current.Shutdown(); } - - #endregion } }