14.4 Netty编码器和解码器

14.4.1 实现Memcached编码器

    先定义memcached操作码(Opcode)和响应状态码(Status):

[java] view plaincopy在CODE上查看代码片

  1. package netty.in.action.mem;
  2. /**
    • memcached operation codes
    • @author c.king
  3. *
  4. */
  5. public class Opcode {
  6. public static final byte GET = 0x00;
  7. public static final byte SET = 0x01;
  8. public static final byte DELETE = 0x04;
  9. }

[java] view plaincopy在CODE上查看代码片

  1. package netty.in.action.mem;
  2. /**
    • memcached response statuses
    • @author c.king
  3. *
  4. */
  5. public class Status {
  6. public static final short NO_ERROR = 0x0000;
  7. public static final short KEY_NOT_FOUND = 0x0001;
  8. public static final short KEY_EXISTS = 0x0002;
  9. public static final short VALUE_TOO_LARGE = 0x0003;
  10. public static final short INVALID_ARGUMENTS = 0x0004;
  11. public static final short ITEM_NOT_STORED = 0x0005;
  12. public static final short INC_DEC_NON_NUM_VAL = 0x0006;
  13. }

    继续编写memcached请求消息体:

[java] view plaincopy在CODE上查看代码片

  1. package netty.in.action.mem;
  2. import java.util.Random;
  3. /**
    • memcached request message object
    • @author c.king
  4. *
  5. */
  6. public class MemcachedRequest {
  7. private static final Random rand = new Random();
  8. private int magic = 0x80;// fixed so hard coded
  9. private byte opCode; // the operation e.g. set or get
  10. private String key; // the key to delete, get or set
  11. private int flags = 0xdeadbeef; // random
  12. private int expires; // 0 = item never expires
  13. private String body; // if opCode is set, the value
  14. private int id = rand.nextInt(); // Opaque
  15. private long cas; // data version check...not used
  16. private boolean hasExtras; // not all ops have extras
  17. public MemcachedRequest(byte opcode, String key, String value) {
  18. this.opCode = opcode;
  19. this.key = key;
  20. this.body = value == null ? "" : value;
  21. // only set command has extras in our example
  22. hasExtras = opcode == Opcode.SET;
  23. }
  24. public MemcachedRequest(byte opCode, String key) {
  25. this(opCode, key, null);
  26. }
  27. public int getMagic() {
  28. return magic;
  29. }
  30. public byte getOpCode() {
  31. return opCode;
  32. }
  33. public String getKey() {
  34. return key;
  35. }
  36. public int getFlags() {
  37. return flags;
  38. }
  39. public int getExpires() {
  40. return expires;
  41. }
  42. public String getBody() {
  43. return body;
  44. }
  45. public int getId() {
  46. return id;
  47. }
  48. public long getCas() {
  49. return cas;
  50. }
  51. public boolean isHasExtras() {
  52. return hasExtras;
  53. }
  54. }

    最后编写memcached请求编码器:

[java] view plaincopy在CODE上查看代码片

  1. package netty.in.action.mem;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.MessageToByteEncoder;
  5. import io.netty.util.CharsetUtil;
  6. /**
    • memcached request encoder
    • @author c.king
  7. *
  8. */
  9. public class MemcachedRequestEncoder extends MessageToByteEncoder<MemcachedRequest> {
  10. @Override
  11. protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg, ByteBuf out)
  12. throws Exception {
  13. // convert key and body to bytes array
  14. byte[] key = msg.getKey().getBytes(CharsetUtil.UTF_8);
  15. byte[] body = msg.getBody().getBytes(CharsetUtil.UTF_8);
  16. // total size of body = key size + body size + extras size
  17. int bodySize = key.length + body.length + (msg.isHasExtras() ? 8 : 0);
  18. // write magic int
  19. out.writeInt(msg.getMagic());
  20. // write opcode byte
  21. out.writeByte(msg.getOpCode());
  22. // write key length (2 byte) i.e a Java short
  23. out.writeShort(key.length);
  24. // write extras length (1 byte)
  25. int extraSize = msg.isHasExtras() ? 0x08 : 0x0;
  26. out.writeByte(extraSize);
  27. // byte is the data type, not currently implemented in Memcached
  28. // but required
  29. out.writeByte(0);
  30. // next two bytes are reserved, not currently implemented
  31. // but are required
  32. out.writeShort(0);
  33. // write total body length ( 4 bytes - 32 bit int)
  34. out.writeInt(bodySize);
  35. // write opaque ( 4 bytes) - a 32 bit int that is returned
  36. // in the response
  37. out.writeInt(msg.getId());
  38. // write CAS ( 8 bytes)
  39. // 24 byte header finishes with the CAS
  40. out.writeLong(msg.getCas());
  41. if(msg.isHasExtras()){
  42. // write extras
  43. // (flags and expiry, 4 bytes each), 8 bytes total
  44. out.writeInt(msg.getFlags());
  45. out.writeInt(msg.getExpires());
  46. }
  47. //write key
  48. out.writeBytes(key);
  49. //write value
  50. out.writeBytes(body);
  51. }
  52. }

14.4.2 实现Memcached解码器

    编写memcached响应消息体:

[java] view plaincopy在CODE上查看代码片

  1. package netty.in.action.mem;
  2. /**
    • memcached response message object
    • @author c.king
  3. *
  4. */
  5. public class MemcachedResponse {
  6. private byte magic;
  7. private byte opCode;
  8. private byte dataType;
  9. private short status;
  10. private int id;
  11. private long cas;
  12. private int flags;
  13. private int expires;
  14. private String key;
  15. private String data;
  16. public MemcachedResponse(byte magic, byte opCode, byte dataType, short status,
  17. int id, long cas, int flags, int expires, String key, String data) {
  18. this.magic = magic;
  19. this.opCode = opCode;
  20. this.dataType = dataType;
  21. this.status = status;
  22. this.id = id;
  23. this.cas = cas;
  24. this.flags = flags;
  25. this.expires = expires;
  26. this.key = key;
  27. this.data = data;
  28. }
  29. public byte getMagic() {
  30. return magic;
  31. }
  32. public byte getOpCode() {
  33. return opCode;
  34. }
  35. public byte getDataType() {
  36. return dataType;
  37. }
  38. public short getStatus() {
  39. return status;
  40. }
  41. public int getId() {
  42. return id;
  43. }
  44. public long getCas() {
  45. return cas;
  46. }
  47. public int getFlags() {
  48. return flags;
  49. }
  50. public int getExpires() {
  51. return expires;
  52. }
  53. public String getKey() {
  54. return key;
  55. }
  56. public String getData() {
  57. return data;
  58. }
  59. }

    编写memcached响应解码器:

[java] view plaincopy在CODE上查看代码片

  1. package netty.in.action.mem;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.ByteToMessageDecoder;
  5. import io.netty.util.CharsetUtil;
  6. import java.util.List;
  7. public class MemcachedResponseDecoder extends ByteToMessageDecoder {
  8. private enum State {
  9. Header, Body
  10. }
  11. private State state = State.Header;
  12. private int totalBodySize;
  13. private byte magic;
  14. private byte opCode;
  15. private short keyLength;
  16. private byte extraLength;
  17. private byte dataType;
  18. private short status;
  19. private int id;
  20. private long cas;
  21. @Override
  22. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
  23. throws Exception {
  24. switch (state) {
  25. case Header:
  26. // response header is 24 bytes
  27. if (in.readableBytes() < 24) {
  28. return;
  29. }
  30. // read header
  31. magic = in.readByte();
  32. opCode = in.readByte();
  33. keyLength = in.readShort();
  34. extraLength = in.readByte();
  35. dataType = in.readByte();
  36. status = in.readShort();
  37. totalBodySize = in.readInt();
  38. id = in.readInt();
  39. cas = in.readLong();
  40. state = State.Body;
  41. break;
  42. case Body:
  43. if (in.readableBytes() < totalBodySize) {
  44. return;
  45. }
  46. int flags = 0;
  47. int expires = 0;
  48. int actualBodySize = totalBodySize;
  49. if (extraLength > 0) {
  50. flags = in.readInt();
  51. actualBodySize -= 4;
  52. }
  53. if (extraLength > 4) {
  54. expires = in.readInt();
  55. actualBodySize -= 4;
  56. }
  57. String key = "";
  58. if (keyLength > 0) {
  59. ByteBuf keyBytes = in.readBytes(keyLength);
  60. key = keyBytes.toString(CharsetUtil.UTF_8);
  61. actualBodySize -= keyLength;
  62. }
  63. ByteBuf body = in.readBytes(actualBodySize);
  64. String data = body.toString(CharsetUtil.UTF_8);
  65. out.add(new MemcachedResponse(magic, opCode, dataType, status,
  66. id, cas, flags, expires, key, data));
  67. state = State.Header;
  68. break;
  69. default:
  70. break;
  71. }
  72. }
  73. }

results matching ""

    No results matching ""