13.5 编写监听者
这一节我们编写一个监听者:EventLogMonitor,也就是用来接收数据的程序。EventLogMonitor做下面事情:
- 接收LogEventBroadcaster广播的DatagramPacket
- 解码LogEvent消息
输出LogEvent
EventLogMonitor的示意图如下:
解码器代码如下:
[java] view plaincopy
- package netty.in.action.udp;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.socket.DatagramPacket;
- import io.netty.handler.codec.MessageToMessageDecoder;
- import io.netty.util.CharsetUtil;
- import java.util.List;
- public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
- @Override
- protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out)
- throws Exception {
- ByteBuf buf = msg.content();
- int i = buf.indexOf(0, buf.readableBytes(), LogEvent.SEPARATOR);
- String filename = buf.slice(0, i).toString(CharsetUtil.UTF_8);
- String logMsg = buf.slice(i + 1, buf.readableBytes()).toString(CharsetUtil.UTF_8);
- LogEvent event = new LogEvent(msg.sender(),
- System.currentTimeMillis(), filename, logMsg);
- out.add(event);
- }
- }
处理消息的Handler代码如下:
[java] view plaincopy
- package netty.in.action.udp;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, LogEvent msg) throws Exception {
- StringBuilder builder = new StringBuilder();
- builder.append(msg.getReceived());
- builder.append(" [");
- builder.append(msg.getSource().toString());
- builder.append("] [");
- builder.append(msg.getLogfile());
- builder.append("] : ");
- builder.append(msg.getMsg());
- System.out.println(builder.toString());
- }
- }
EventLogMonitor代码如下:
[java] view plaincopy
- package netty.in.action.udp;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioDatagramChannel;
- import java.net.InetSocketAddress;
- public class LogEventMonitor {
- private final EventLoopGroup group;
- private final Bootstrap bootstrap;
- public LogEventMonitor(InetSocketAddress address) {
- group = new NioEventLoopGroup();
- bootstrap = new Bootstrap();
- bootstrap.group(group).channel(NioDatagramChannel.class)
- .option(ChannelOption.SO_BROADCAST, true)
- .handler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(Channel channel) throws Exception {
- ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast(new LogEventDecoder());
- pipeline.addLast(new LogEventHandler());
- }
- }).localAddress(address);
- }
- public Channel bind() {
- return bootstrap.bind().syncUninterruptibly().channel();
- }
- public void stop() {
- group.shutdownGracefully();
- }
- public static void main(String[] args) throws InterruptedException {
- LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(4096));
- try {
- Channel channel = monitor.bind();
- System.out.println("LogEventMonitor running");
- channel.closeFuture().sync();
- } finally {
- monitor.stop();
- }
- }
- }