13.5 编写监听者

    这一节我们编写一个监听者:EventLogMonitor,也就是用来接收数据的程序。EventLogMonitor做下面事情:
  • 接收LogEventBroadcaster广播的DatagramPacket
  • 解码LogEvent消息
  • 输出LogEvent

    EventLogMonitor的示意图如下:
    

http://img.blog.csdn.net/20140804162255231?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvYWJjX2tleQ==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast

解码器代码如下:

[java] view plaincopy

  1. package netty.in.action.udp;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.socket.DatagramPacket;
  5. import io.netty.handler.codec.MessageToMessageDecoder;
  6. import io.netty.util.CharsetUtil;
  7. import java.util.List;
  8. public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
  9. @Override
  10. protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out)
  11. throws Exception {
  12. ByteBuf buf = msg.content();
  13. int i = buf.indexOf(0, buf.readableBytes(), LogEvent.SEPARATOR);
  14. String filename = buf.slice(0, i).toString(CharsetUtil.UTF_8);
  15. String logMsg = buf.slice(i + 1, buf.readableBytes()).toString(CharsetUtil.UTF_8);
  16. LogEvent event = new LogEvent(msg.sender(),
  17. System.currentTimeMillis(), filename, logMsg);
  18. out.add(event);
  19. }
  20. }

处理消息的Handler代码如下:

[java] view plaincopy

  1. package netty.in.action.udp;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
  5. @Override
  6. protected void channelRead0(ChannelHandlerContext ctx, LogEvent msg) throws Exception {
  7. StringBuilder builder = new StringBuilder();
  8. builder.append(msg.getReceived());
  9. builder.append(" [");
  10. builder.append(msg.getSource().toString());
  11. builder.append("] [");
  12. builder.append(msg.getLogfile());
  13. builder.append("] : ");
  14. builder.append(msg.getMsg());
  15. System.out.println(builder.toString());
  16. }
  17. }

EventLogMonitor代码如下:

[java] view plaincopy

  1. package netty.in.action.udp;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.ChannelPipeline;
  7. import io.netty.channel.EventLoopGroup;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.nio.NioDatagramChannel;
  10. import java.net.InetSocketAddress;
  11. public class LogEventMonitor {
  12. private final EventLoopGroup group;
  13. private final Bootstrap bootstrap;
  14. public LogEventMonitor(InetSocketAddress address) {
  15. group = new NioEventLoopGroup();
  16. bootstrap = new Bootstrap();
  17. bootstrap.group(group).channel(NioDatagramChannel.class)
  18. .option(ChannelOption.SO_BROADCAST, true)
  19. .handler(new ChannelInitializer<Channel>() {
  20. @Override
  21. protected void initChannel(Channel channel) throws Exception {
  22. ChannelPipeline pipeline = channel.pipeline();
  23. pipeline.addLast(new LogEventDecoder());
  24. pipeline.addLast(new LogEventHandler());
  25. }
  26. }).localAddress(address);
  27. }
  28. public Channel bind() {
  29. return bootstrap.bind().syncUninterruptibly().channel();
  30. }
  31. public void stop() {
  32. group.shutdownGracefully();
  33. }
  34. public static void main(String[] args) throws InterruptedException {
  35. LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(4096));
  36. try {
  37. Channel channel = monitor.bind();
  38. System.out.println("LogEventMonitor running");
  39. channel.closeFuture().sync();
  40. } finally {
  41. monitor.stop();
  42. }
  43. }
  44. }

results matching ""

    No results matching ""