13.4 编写广播器

    我们要做的是广播一个DatagramPacket日志条目,如下图所示:

http://img.blog.csdn.net/20140804160616609?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvYWJjX2tleQ==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast

    上图显示我们有一个从日志条路到DatagramPacket一对一的关系。如同所有的基于Netty的应用程序一样,它由一个或多个ChannelHandler和一些实体对象绑定,用于引导该应用程序。首先让我们来看看LogEventBroadcaster的ChannelPipeline以及作为数据载体的LogEvent的流向,看下图:

http://img.blog.csdn.net/20140804160643953?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvYWJjX2tleQ==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast

    上图显示,LogEventBroadcaster使用LogEvent消息并将消息写入本地Channel,所有的信息封装在LogEvent消息中,这些消息被传到ChannelPipeline中。流进ChannelPipeline的LogEvent消息被编码成DatagramPacket消息,最后通过UDP广播到远程对等通道。

    这可以归结为有一个自定义的ChannelHandler,从LogEvent消息编程成DatagramPacket消息。回忆我们在第七章讲解的编解码器,我们定义个LogEventEncoder,代码如下:

[java] view plaincopy

  1. package netty.in.action.udp;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.socket.DatagramPacket;
  5. import io.netty.handler.codec.MessageToMessageEncoder;
  6. import io.netty.util.CharsetUtil;
  7. import java.net.InetSocketAddress;
  8. import java.util.List;
  9. public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
  10. private final InetSocketAddress remoteAddress;
  11. public LogEventEncoder(InetSocketAddress remoteAddress){
  12. this.remoteAddress = remoteAddress;
  13. }
  14. @Override
  15. protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out)
  16. throws Exception {
  17. ByteBuf buf = ctx.alloc().buffer();
  18. buf.writeBytes(msg.getLogfile().getBytes(CharsetUtil.UTF_8));
  19. buf.writeByte(LogEvent.SEPARATOR);
  20. buf.writeBytes(msg.getMsg().getBytes(CharsetUtil.UTF_8));
  21. out.add(new DatagramPacket(buf, remoteAddress));
  22. }
  23. }

    下面我们再编写一个广播器:

[java] view plaincopy

  1. package netty.in.action.udp;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioDatagramChannel;
  8. import java.io.File;
  9. import java.io.IOException;
  10. import java.io.RandomAccessFile;
  11. import java.net.InetSocketAddress;
  12. public class LogEventBroadcaster {
  13. private final EventLoopGroup group;
  14. private final Bootstrap bootstrap;
  15. private final File file;
  16. public LogEventBroadcaster(InetSocketAddress address, File file) {
  17. group = new NioEventLoopGroup();
  18. bootstrap = new Bootstrap();
  19. bootstrap.group(group).channel(NioDatagramChannel.class)
  20. .option(ChannelOption.SO_BROADCAST, true)
  21. .handler(new LogEventEncoder(address));
  22. this.file = file;
  23. }
  24. public void run() throws IOException {
  25. Channel ch = bootstrap.bind(0).syncUninterruptibly().channel();
  26. long pointer = 0;
  27. for (;;) {
  28. long len = file.length();
  29. if (len < pointer) {
  30. pointer = len;
  31. } else {
  32. RandomAccessFile raf = new RandomAccessFile(file, "r");
  33. raf.seek(pointer);
  34. String line;
  35. while ((line = raf.readLine()) != null) {
  36. ch.write(new LogEvent(null, -1, file.getAbsolutePath(), line));
  37. }
  38. ch.flush();
  39. pointer = raf.getFilePointer();
  40. raf.close();
  41. }
  42. try {
  43. Thread.sleep(1000);
  44. } catch (InterruptedException e) {
  45. Thread.interrupted();
  46. break;
  47. }
  48. }
  49. }
  50. public void stop() {
  51. group.shutdownGracefully();
  52. }
  53. public static void main(String[] args) throws Exception {
  54. int port = 4096;
  55. String path = System.getProperty("user.dir") + "/log.txt";
  56. LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress(
  57. "255.255.255.255", port), new File(path));
  58. try {
  59. broadcaster.run();
  60. } finally {
  61. broadcaster.stop();
  62. }
  63. }
  64. }

results matching ""

    No results matching ""