暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ源码分析之消息ID

徘徊笔记 2019-04-18
873

来源:https://github.com/apache/rocketmq


msgId


消息唯一id,发消息时有生产端生成,保存在消息的属性中,也是发送消息的返回值SendResult中的msgId,主要用于创建索引和作为事务消息的事务id。

    MessageClientIDSetter.setUniqID(msg);
    public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
    public static void setUniqID(final Message msg) {
    if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
    msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
    }
    }


    在MessageClientIDSetter初始化时,LEN代表,ip四个字节,端口两个字节,类加载器的哈希码四个字节,当前时间戳与该月第一毫秒的差值四个字节,自增计数器两个字节,因为端口只占两个字节8位,最大65535,但是获取端口的时候返回int类型4个字节,所以前两位是空的,因此往后再移动两位放端口,这样前面四位就可以放下ip了,然后再把位置移到6,放入最后一个哈希码,生成长度为20的固定字符串

      static {
      LEN = 4 + 2 + 4 + 4 + 2;
      ByteBuffer tempBuffer = ByteBuffer.allocate(10);
      tempBuffer.position(2);
      tempBuffer.putInt(UtilAll.getPid());
      tempBuffer.position(0);
      try {
      tempBuffer.put(UtilAll.getIP());
      } catch (Exception e) {
      tempBuffer.put(createFakeIP());
      }
      tempBuffer.position(6);
      tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
      FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
      setStartTime(System.currentTimeMillis());
      COUNTER = new AtomicInteger(0);
      }


      保存当月以及下月的第一个毫秒值。

        private synchronized static void setStartTime(long millis) {
        Calendar cal = Calendar.getInstance();
        cal.setTimeInMillis(millis);
        cal.set(Calendar.DAY_OF_MONTH, 1);
        cal.set(Calendar.HOUR_OF_DAY, 0);
        cal.set(Calendar.MINUTE, 0);
        cal.set(Calendar.SECOND, 0);
        cal.set(Calendar.MILLISECOND, 0);
        startTime = cal.getTimeInMillis();
        cal.add(Calendar.MONTH, 1);
        nextStartTime = cal.getTimeInMillis();
        }


        生成唯一ID,长度总共为32位,固定字符串占20位,剩下的占12位

          public static String createUniqID() {
          StringBuilder sb = new StringBuilder(LEN * 2);
          sb.append(FIX_STRING);
          sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
          return sb.toString();
          }


          创建唯一ID的buffer,如果当前时间大于等于初始保存的下月开始毫秒值,则需要重置当月下月初始毫秒值。计算当前时间戳与当月初始毫秒值的差值,获取自增器的当前数值,转化成short类型,总共6位放入buffer,最后生成长度为12的字符串,这样可以保证一个月内不会出现重复。

            private static byte[] createUniqIDBuffer() {
            ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
            long current = System.currentTimeMillis();
            if (current >= nextStartTime) {
            setStartTime(current);
            }
            buffer.position(0);
            buffer.putInt((int) (System.currentTimeMillis() - startTime));
            buffer.putShort((short) COUNTER.getAndIncrement());
            return buffer.array();
            }


            offsetMsgId


            具体代表存储消息的broker的ip和端口号,以及在文件中的偏移量

              this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
              String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);


              八个字节长度用来保存存储消息的broker的ip和端口信息

                private final ByteBuffer hostHolder = ByteBuffer.allocate(8);


                public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) {
                return socketAddress2ByteBuffer(this.storeHost, byteBuffer);
                }


                public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
                InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
                byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
                byteBuffer.putInt(inetSocketAddress.getPort());
                byteBuffer.flip();
                return byteBuffer;
                }


                生成msgId,长度也是32位。

                  public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
                  input.flip();
                  input.limit(MessageDecoder.MSG_ID_LENGTH);


                  input.put(addr);
                  input.putLong(offset);


                  return UtilAll.bytes2string(input.array());
                  }


                  一个字节用两个char表示

                    public static String bytes2string(byte[] src) {
                    char[] hexChars = new char[src.length * 2];
                    for (int j = 0; j < src.length; j++) {
                    int v = src[j] & 0xFF;
                    hexChars[j * 2] = HEX_ARRAY[v >>> 4];
                    hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
                    }
                    return new String(hexChars);
                    }


                    这个消息id可以解码出对应消息存储的位置

                      public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
                      SocketAddress address;
                      long offset;


                      byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
                      byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
                      ByteBuffer bb = ByteBuffer.wrap(port);
                      int portInt = bb.getInt(0);
                      address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);


                      // offset
                      byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
                      bb = ByteBuffer.wrap(data);
                      offset = bb.getLong(0);


                      return new MessageId(address, offset);
                      }


                      文章转载自徘徊笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论