13.4 编写广播器
我们要做的是广播一个DatagramPacket日志条目,如下图所示:
上图显示我们有一个从日志条路到DatagramPacket一对一的关系。如同所有的基于Netty的应用程序一样,它由一个或多个ChannelHandler和一些实体对象绑定,用于引导该应用程序。首先让我们来看看LogEventBroadcaster的ChannelPipeline以及作为数据载体的LogEvent的流向,看下图:
上图显示,LogEventBroadcaster使用LogEvent消息并将消息写入本地Channel,所有的信息封装在LogEvent消息中,这些消息被传到ChannelPipeline中。流进ChannelPipeline的LogEvent消息被编码成DatagramPacket消息,最后通过UDP广播到远程对等通道。
这可以归结为有一个自定义的ChannelHandler,从LogEvent消息编程成DatagramPacket消息。回忆我们在第七章讲解的编解码器,我们定义个LogEventEncoder,代码如下:
[java] view plaincopy
- package netty.in.action.udp;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.socket.DatagramPacket;
- import io.netty.handler.codec.MessageToMessageEncoder;
- import io.netty.util.CharsetUtil;
- import java.net.InetSocketAddress;
- import java.util.List;
- public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
- private final InetSocketAddress remoteAddress;
- public LogEventEncoder(InetSocketAddress remoteAddress){
- this.remoteAddress = remoteAddress;
- }
- @Override
- protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out)
- throws Exception {
- ByteBuf buf = ctx.alloc().buffer();
- buf.writeBytes(msg.getLogfile().getBytes(CharsetUtil.UTF_8));
- buf.writeByte(LogEvent.SEPARATOR);
- buf.writeBytes(msg.getMsg().getBytes(CharsetUtil.UTF_8));
- out.add(new DatagramPacket(buf, remoteAddress));
- }
}
下面我们再编写一个广播器:
[java] view plaincopy
- package netty.in.action.udp;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioDatagramChannel;
- import java.io.File;
- import java.io.IOException;
- import java.io.RandomAccessFile;
- import java.net.InetSocketAddress;
- public class LogEventBroadcaster {
- private final EventLoopGroup group;
- private final Bootstrap bootstrap;
- private final File file;
- public LogEventBroadcaster(InetSocketAddress address, File file) {
- group = new NioEventLoopGroup();
- bootstrap = new Bootstrap();
- bootstrap.group(group).channel(NioDatagramChannel.class)
- .option(ChannelOption.SO_BROADCAST, true)
- .handler(new LogEventEncoder(address));
- this.file = file;
- }
- public void run() throws IOException {
- Channel ch = bootstrap.bind(0).syncUninterruptibly().channel();
- long pointer = 0;
- for (;;) {
- long len = file.length();
- if (len < pointer) {
- pointer = len;
- } else {
- RandomAccessFile raf = new RandomAccessFile(file, "r");
- raf.seek(pointer);
- String line;
- while ((line = raf.readLine()) != null) {
- ch.write(new LogEvent(null, -1, file.getAbsolutePath(), line));
- }
- ch.flush();
- pointer = raf.getFilePointer();
- raf.close();
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.interrupted();
- break;
- }
- }
- }
- public void stop() {
- group.shutdownGracefully();
- }
- public static void main(String[] args) throws Exception {
- int port = 4096;
- String path = System.getProperty("user.dir") + "/log.txt";
- LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress(
- "255.255.255.255", port), new File(path));
- try {
- broadcaster.run();
- } finally {
- broadcaster.stop();
- }
- }
- }