14.4 Netty编码器和解码器
14.4.1 实现Memcached编码器
先定义memcached操作码(Opcode)和响应状态码(Status):
[java] view plaincopy
- package netty.in.action.mem;
- /**
- memcached operation codes
- @author c.king
- *
- */
- public class Opcode {
- public static final byte GET = 0x00;
- public static final byte SET = 0x01;
- public static final byte DELETE = 0x04;
- }
[java] view plaincopy
- package netty.in.action.mem;
- /**
- memcached response statuses
- @author c.king
- *
- */
- public class Status {
- public static final short NO_ERROR = 0x0000;
- public static final short KEY_NOT_FOUND = 0x0001;
- public static final short KEY_EXISTS = 0x0002;
- public static final short VALUE_TOO_LARGE = 0x0003;
- public static final short INVALID_ARGUMENTS = 0x0004;
- public static final short ITEM_NOT_STORED = 0x0005;
- public static final short INC_DEC_NON_NUM_VAL = 0x0006;
}
继续编写memcached请求消息体:
[java] view plaincopy
- package netty.in.action.mem;
- import java.util.Random;
- /**
- memcached request message object
- @author c.king
- *
- */
- public class MemcachedRequest {
- private static final Random rand = new Random();
- private int magic = 0x80;// fixed so hard coded
- private byte opCode; // the operation e.g. set or get
- private String key; // the key to delete, get or set
- private int flags = 0xdeadbeef; // random
- private int expires; // 0 = item never expires
- private String body; // if opCode is set, the value
- private int id = rand.nextInt(); // Opaque
- private long cas; // data version check...not used
- private boolean hasExtras; // not all ops have extras
- public MemcachedRequest(byte opcode, String key, String value) {
- this.opCode = opcode;
- this.key = key;
- this.body = value == null ? "" : value;
- // only set command has extras in our example
- hasExtras = opcode == Opcode.SET;
- }
- public MemcachedRequest(byte opCode, String key) {
- this(opCode, key, null);
- }
- public int getMagic() {
- return magic;
- }
- public byte getOpCode() {
- return opCode;
- }
- public String getKey() {
- return key;
- }
- public int getFlags() {
- return flags;
- }
- public int getExpires() {
- return expires;
- }
- public String getBody() {
- return body;
- }
- public int getId() {
- return id;
- }
- public long getCas() {
- return cas;
- }
- public boolean isHasExtras() {
- return hasExtras;
- }
}
最后编写memcached请求编码器:
[java] view plaincopy
- package netty.in.action.mem;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToByteEncoder;
- import io.netty.util.CharsetUtil;
- /**
- memcached request encoder
- @author c.king
- *
- */
- public class MemcachedRequestEncoder extends MessageToByteEncoder<MemcachedRequest> {
- @Override
- protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg, ByteBuf out)
- throws Exception {
- // convert key and body to bytes array
- byte[] key = msg.getKey().getBytes(CharsetUtil.UTF_8);
- byte[] body = msg.getBody().getBytes(CharsetUtil.UTF_8);
- // total size of body = key size + body size + extras size
- int bodySize = key.length + body.length + (msg.isHasExtras() ? 8 : 0);
- // write magic int
- out.writeInt(msg.getMagic());
- // write opcode byte
- out.writeByte(msg.getOpCode());
- // write key length (2 byte) i.e a Java short
- out.writeShort(key.length);
- // write extras length (1 byte)
- int extraSize = msg.isHasExtras() ? 0x08 : 0x0;
- out.writeByte(extraSize);
- // byte is the data type, not currently implemented in Memcached
- // but required
- out.writeByte(0);
- // next two bytes are reserved, not currently implemented
- // but are required
- out.writeShort(0);
- // write total body length ( 4 bytes - 32 bit int)
- out.writeInt(bodySize);
- // write opaque ( 4 bytes) - a 32 bit int that is returned
- // in the response
- out.writeInt(msg.getId());
- // write CAS ( 8 bytes)
- // 24 byte header finishes with the CAS
- out.writeLong(msg.getCas());
- if(msg.isHasExtras()){
- // write extras
- // (flags and expiry, 4 bytes each), 8 bytes total
- out.writeInt(msg.getFlags());
- out.writeInt(msg.getExpires());
- }
- //write key
- out.writeBytes(key);
- //write value
- out.writeBytes(body);
- }
- }
14.4.2 实现Memcached解码器
编写memcached响应消息体:
[java] view plaincopy
- package netty.in.action.mem;
- /**
- memcached response message object
- @author c.king
- *
- */
- public class MemcachedResponse {
- private byte magic;
- private byte opCode;
- private byte dataType;
- private short status;
- private int id;
- private long cas;
- private int flags;
- private int expires;
- private String key;
- private String data;
- public MemcachedResponse(byte magic, byte opCode, byte dataType, short status,
- int id, long cas, int flags, int expires, String key, String data) {
- this.magic = magic;
- this.opCode = opCode;
- this.dataType = dataType;
- this.status = status;
- this.id = id;
- this.cas = cas;
- this.flags = flags;
- this.expires = expires;
- this.key = key;
- this.data = data;
- }
- public byte getMagic() {
- return magic;
- }
- public byte getOpCode() {
- return opCode;
- }
- public byte getDataType() {
- return dataType;
- }
- public short getStatus() {
- return status;
- }
- public int getId() {
- return id;
- }
- public long getCas() {
- return cas;
- }
- public int getFlags() {
- return flags;
- }
- public int getExpires() {
- return expires;
- }
- public String getKey() {
- return key;
- }
- public String getData() {
- return data;
- }
}
编写memcached响应解码器:
[java] view plaincopy
- package netty.in.action.mem;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- import io.netty.util.CharsetUtil;
- import java.util.List;
- public class MemcachedResponseDecoder extends ByteToMessageDecoder {
- private enum State {
- Header, Body
- }
- private State state = State.Header;
- private int totalBodySize;
- private byte magic;
- private byte opCode;
- private short keyLength;
- private byte extraLength;
- private byte dataType;
- private short status;
- private int id;
- private long cas;
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
- throws Exception {
- switch (state) {
- case Header:
- // response header is 24 bytes
- if (in.readableBytes() < 24) {
- return;
- }
- // read header
- magic = in.readByte();
- opCode = in.readByte();
- keyLength = in.readShort();
- extraLength = in.readByte();
- dataType = in.readByte();
- status = in.readShort();
- totalBodySize = in.readInt();
- id = in.readInt();
- cas = in.readLong();
- state = State.Body;
- break;
- case Body:
- if (in.readableBytes() < totalBodySize) {
- return;
- }
- int flags = 0;
- int expires = 0;
- int actualBodySize = totalBodySize;
- if (extraLength > 0) {
- flags = in.readInt();
- actualBodySize -= 4;
- }
- if (extraLength > 4) {
- expires = in.readInt();
- actualBodySize -= 4;
- }
- String key = "";
- if (keyLength > 0) {
- ByteBuf keyBytes = in.readBytes(keyLength);
- key = keyBytes.toString(CharsetUtil.UTF_8);
- actualBodySize -= keyLength;
- }
- ByteBuf body = in.readBytes(actualBodySize);
- String data = body.toString(CharsetUtil.UTF_8);
- out.add(new MemcachedResponse(magic, opCode, dataType, status,
- id, cas, flags, expires, key, data));
- state = State.Header;
- break;
- default:
- break;
- }
- }
- }