
来源:https://github.com/LMAX-Exchange/disruptor
提供端发布消息
public void publishEvent(final EventTranslator<T> eventTranslator){ringBuffer.publishEvent(eventTranslator);}public void publishEvent(EventTranslator<E> translator){final long sequence = sequencer.next();translateAndPublish(translator, sequence);}
获取提供者定序器的下一个可用下标,
public long next(){return next(1);}
获取当前下标以及下一个下标,wrapPoint大于等于0代表已经设置一圈数据了,获取gatingSequenceCache下标,初始值为-1,刚开始时设置成功cursor.compareAndSet(current, next)返回next下标,第二圈时,wrapPoint为0,cachedGatingSequence为-1,这个时候就会进入if语句判断是否会需要等待消费,当current溢出时也会进入,获取消费端已经消费的下标和当前提供者的下标的最小下标gatingSequence,当wrapPoint大于gatingSequence时,代表消费者还没有消费当前下标,所以不能覆盖当前位置的对象,所以就需要等待LockSupport.parkNanos(1);如果小于的话,就把那个最小位置设置到gatingSequenceCache保存下来,下一次当提供者放入了之前两次gatingSequenceCache之间的差值数量的数据时又会进入下面的if语句,不一定是是每bufferSize进入一次,如果消费速度慢的话会小于bufferSize进入一次if语句。
public long next(int n){if (n < 1 || n > bufferSize){throw new IllegalArgumentException("n must be > 0 and < bufferSize");}long current;long next;do{current = cursor.get();next = current + n;long wrapPoint = next - bufferSize;long cachedGatingSequence = gatingSequenceCache.get();if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){long gatingSequence = Util.getMinimumSequence(gatingSequences, current);if (wrapPoint > gatingSequence){LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?continue;}gatingSequenceCache.set(gatingSequence);}else if (cursor.compareAndSet(current, next)){break;}}while (true);return next;}
获取消费者和提供者的下标中最小的下标
public static long getMinimumSequence(final Sequence[] sequences, long minimum){for (int i = 0, n = sequences.length; i < n; i++){long value = sequences[i].get();minimum = Math.min(minimum, value);}return minimum;}
转化需要设置的对象并发布,获取对应位置的值,EventTranslator是用来设置对应数组entries下标的值。
private void translateAndPublish(EventTranslator<E> translator, long sequence){try{translator.translateTo(get(sequence), sequence);}finally{sequencer.publish(sequence);}}
设置完后发布对应的下标,设置该下标的可用标志
public void publish(final long sequence){setAvailable(sequence);waitStrategy.signalAllWhenBlocking();}private void setAvailable(final long sequence){setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));}
计算对应的数组下标
private int calculateIndex(final long sequence){return ((int) sequence) & indexMask;}
计算可用flag
private int calculateAvailabilityFlag(final long sequence){return (int) (sequence >>> indexShift);}
给对应的可用标志数组的地址设置可用flag
private void setAvailableBufferValue(int index, int flag){long bufferAddress = (index * SCALE) + BASE;UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);}
唤醒所有等待的消费线程有数据到达
public void signalAllWhenBlocking(){synchronized (mutex){mutex.notifyAll();}}
文章转载自徘徊笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




