暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Disruptor源码分析之提供端

徘徊笔记 2019-05-17
909

来源:https://github.com/LMAX-Exchange/disruptor


提供端发布消息

    public void publishEvent(final EventTranslator<T> eventTranslator)
    {
    ringBuffer.publishEvent(eventTranslator);
    }


    public void publishEvent(EventTranslator<E> translator)
    {
    final long sequence = sequencer.next();
    translateAndPublish(translator, sequence);
    }


    获取提供者定序器的下一个可用下标,

      public long next()
      {
      return next(1);
      }


      获取当前下标以及下一个下标,wrapPoint大于等于0代表已经设置一圈数据了,获取gatingSequenceCache下标,初始值为-1,刚开始时设置成功cursor.compareAndSet(current, next)返回next下标,第二圈时,wrapPoint为0,cachedGatingSequence为-1,这个时候就会进入if语句判断是否会需要等待消费,current溢出时也会进入,获取消费端已经消费的下标和当前提供者的下标的最小下标gatingSequence,当wrapPoint大于gatingSequence时,代表消费者还没有消费当前下标,所以不能覆盖当前位置的对象,所以就需要等待LockSupport.parkNanos(1);如果小于的话,就把那个最小位置设置到gatingSequenceCache保存下来,下一次当提供者放入了之前两次gatingSequenceCache之间的差值数量的数据时又会进入下面的if语句,不一定是是每bufferSize进入一次,如果消费速度慢的话会小于bufferSize进入一次if语句。

        public long next(int n)
        {
        if (n < 1 || n > bufferSize)
        {
        throw new IllegalArgumentException("n must be > 0 and < bufferSize");
        }


        long current;
        long next;


        do
        {
        current = cursor.get();
        next = current + n;


        long wrapPoint = next - bufferSize;
        long cachedGatingSequence = gatingSequenceCache.get();


        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
        {
        long gatingSequence = Util.getMinimumSequence(gatingSequences, current);


        if (wrapPoint > gatingSequence)
        {
        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
        continue;
        }


        gatingSequenceCache.set(gatingSequence);
        }
        else if (cursor.compareAndSet(current, next))
        {
        break;
        }
        }
        while (true);


        return next;
        }


        获取消费者和提供者的下标中最小的下标

          public static long getMinimumSequence(final Sequence[] sequences, long minimum)
          {
          for (int i = 0, n = sequences.length; i < n; i++)
          {
          long value = sequences[i].get();
          minimum = Math.min(minimum, value);
          }


          return minimum;
          }


          转化需要设置的对象并发布,获取对应位置的值,EventTranslator是用来设置对应数组entries下标的值。

            private void translateAndPublish(EventTranslator<E> translator, long sequence)
            {
            try
            {
            translator.translateTo(get(sequence), sequence);
            }
            finally
            {
            sequencer.publish(sequence);
            }
            }


            设置完后发布对应的下标,设置该下标的可用标志

              public void publish(final long sequence)
              {
              setAvailable(sequence);
              waitStrategy.signalAllWhenBlocking();
              }


              private void setAvailable(final long sequence)
              {
              setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
              }


              计算对应的数组下标

                private int calculateIndex(final long sequence)
                {
                return ((int) sequence) & indexMask;
                }


                计算可用flag

                  private int calculateAvailabilityFlag(final long sequence)
                  {
                  return (int) (sequence >>> indexShift);
                  }


                  给对应的可用标志数组的地址设置可用flag

                    private void setAvailableBufferValue(int index, int flag)
                    {
                    long bufferAddress = (index * SCALE) + BASE;
                    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
                    }


                    唤醒所有等待的消费线程有数据到达

                      public void signalAllWhenBlocking()
                      {
                      synchronized (mutex)
                      {
                      mutex.notifyAll();
                      }
                      }


                      文章转载自徘徊笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论