
来源:https://github.com/apache/rocketmq
msgId
消息唯一id,发消息时有生产端生成,保存在消息的属性中,也是发送消息的返回值SendResult中的msgId,主要用于创建索引和作为事务消息的事务id。
MessageClientIDSetter.setUniqID(msg);public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";public static void setUniqID(final Message msg) {if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());}}
在MessageClientIDSetter初始化时,LEN代表,ip四个字节,端口两个字节,类加载器的哈希码四个字节,当前时间戳与该月第一毫秒的差值四个字节,自增计数器两个字节,因为端口只占两个字节8位,最大65535,但是获取端口的时候返回int类型4个字节,所以前两位是空的,因此往后再移动两位放端口,这样前面四位就可以放下ip了,然后再把位置移到6,放入最后一个哈希码,生成长度为20的固定字符串
static {LEN = 4 + 2 + 4 + 4 + 2;ByteBuffer tempBuffer = ByteBuffer.allocate(10);tempBuffer.position(2);tempBuffer.putInt(UtilAll.getPid());tempBuffer.position(0);try {tempBuffer.put(UtilAll.getIP());} catch (Exception e) {tempBuffer.put(createFakeIP());}tempBuffer.position(6);tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());FIX_STRING = UtilAll.bytes2string(tempBuffer.array());setStartTime(System.currentTimeMillis());COUNTER = new AtomicInteger(0);}
保存当月以及下月的第一个毫秒值。
private synchronized static void setStartTime(long millis) {Calendar cal = Calendar.getInstance();cal.setTimeInMillis(millis);cal.set(Calendar.DAY_OF_MONTH, 1);cal.set(Calendar.HOUR_OF_DAY, 0);cal.set(Calendar.MINUTE, 0);cal.set(Calendar.SECOND, 0);cal.set(Calendar.MILLISECOND, 0);startTime = cal.getTimeInMillis();cal.add(Calendar.MONTH, 1);nextStartTime = cal.getTimeInMillis();}
生成唯一ID,长度总共为32位,固定字符串占20位,剩下的占12位
public static String createUniqID() {StringBuilder sb = new StringBuilder(LEN * 2);sb.append(FIX_STRING);sb.append(UtilAll.bytes2string(createUniqIDBuffer()));return sb.toString();}
创建唯一ID的buffer,如果当前时间大于等于初始保存的下月开始毫秒值,则需要重置当月下月初始毫秒值。计算当前时间戳与当月初始毫秒值的差值,获取自增器的当前数值,转化成short类型,总共6位放入buffer,最后生成长度为12的字符串,这样可以保证一个月内不会出现重复。
private static byte[] createUniqIDBuffer() {ByteBuffer buffer = ByteBuffer.allocate(4 + 2);long current = System.currentTimeMillis();if (current >= nextStartTime) {setStartTime(current);}buffer.position(0);buffer.putInt((int) (System.currentTimeMillis() - startTime));buffer.putShort((short) COUNTER.getAndIncrement());return buffer.array();}
offsetMsgId
具体代表存储消息的broker的ip和端口号,以及在文件中的偏移量
this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
八个字节长度用来保存存储消息的broker的ip和端口信息
private final ByteBuffer hostHolder = ByteBuffer.allocate(8);public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) {return socketAddress2ByteBuffer(this.storeHost, byteBuffer);}public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);byteBuffer.putInt(inetSocketAddress.getPort());byteBuffer.flip();return byteBuffer;}
生成msgId,长度也是32位。
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {input.flip();input.limit(MessageDecoder.MSG_ID_LENGTH);input.put(addr);input.putLong(offset);return UtilAll.bytes2string(input.array());}
一个字节用两个char表示
public static String bytes2string(byte[] src) {char[] hexChars = new char[src.length * 2];for (int j = 0; j < src.length; j++) {int v = src[j] & 0xFF;hexChars[j * 2] = HEX_ARRAY[v >>> 4];hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];}return new String(hexChars);}
这个消息id可以解码出对应消息存储的位置
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {SocketAddress address;long offset;byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));ByteBuffer bb = ByteBuffer.wrap(port);int portInt = bb.getInt(0);address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);// offsetbyte[] data = UtilAll.string2bytes(msgId.substring(16, 32));bb = ByteBuffer.wrap(data);offset = bb.getLong(0);return new MessageId(address, offset);}
文章转载自徘徊笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




