暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Worker-Thread设计模式

Alleria Windrunner 2020-06-05
619

什么是Worker-Thread模式

Worker-Thread 模式有时也称为流水线设计模式,这种设计模式类似于工厂流水线,上游工作人员完成了某个电子产品的组装之后,将半成品放到流水线传送带上,接下来的加工工作则会交给下游的工人,如下图所示。
线程池在某种意义上也算是 Worker-Thread 模式的一种实现,线程池初始化时所创建的线程类似于在流水线等待工作的工人,提交给线程池的 Runnable 接口类似于需要加工的产品,而 Runnable 的 run 方法则相当于组装该产品的说明书。


Worker-Thread模式实现

根据我们前面的描述可以看出 Worker-Thread 模式需要有如下几个角色。
  • 流水线工人:流水线工人主要用来对传送带上的产品进行加工。

  • 流水线传送带:用于传送来自上游的产品。

  • 产品组装说明书:用来说明该产品如何组装。


产品及组装说明书

抽象类 InstructionBook 的代码如清单所示。
    //在流水线上需要被加工的产品,create 作为一个模板方法,提供了加工产品的说明书
    public abstract class InstructionBook
    {
    public final void create()
    {
    this.firstProcess();
    this.secondProcess();
    }
    protected abstract void firstProcess();
    protected abstract void secondProcess();
    }
    抽象类 InstructionBook,代表着组装产品的说明书,其中经过流水线传送带的产品将通过 create()方法进行加工,而 firstProcess()和 secondProcess()则代表着加工每个产品的步骤,这就是说明书的作用。
    传送带上的产品除了说明书以外还需要有产品自身,产品继承了说明书,每个产品都有产品编号,通用 Production 的代码如清单所示。
      public class Production extends InstructionBook
      {
      //产品编号
      private final int prodID;
      public Production(int prodID)
      {
      this.prodID = prodID;
      }
      @Override
      protected void firstProcess()
      {
      System.out.println("execute the " + prodID + " first process");
      }
      @Override
      protected void secondProcess()
      {
      System.out.println("execute the " + prodID + " second process");
      }
      }


      流水线传送带

      流水线的传送带主要用于传送待加工的产品,上游的工作人员将完成的半成品放到传送带上,工作人员从传送带上取下产品进行再次加工,传送带 ProductionChannel 的代码如清单所示。
        //产品传送带,在传送带上除了负责产品加工的工人之外,还有在传送带上等待加工的产品
        public class ProductionChannel
        {
        //传送带上最多可以有多少个待加工的产品
        private final static int MAX_PROD = 100;
        //主要用来存放待加工的产品,也就是传送带
        private final Production[] productionQueue;
        //队列尾
        private int tail;
        //队列头
        private int head;
        //当前在流水线上有多少个待加工的产品
        private int total;


        //在流水线上工作的工人
        private final Worker[] workers;
        //创建 ProductionChannel 时应指定需要多少个流水线工人
        public ProductionChannel(int workerSize)
        {
        this.workers = new Worker[workerSize];
        this.productionQueue = new Production[MAX_PROD];
        //实例化每一个工人(Worker 线程)并且启动
        for (int i = 0; i < workerSize; i++)
        {
        workers[i] = new Worker("Worker-" + i, this);
        workers[i].start();
        }
        }


        //接受来自上游的半成品(待加工的产品)
        public void offerProduction(Production production)
        {
        synchronized (this)
        {
        //当传送带上待加工的产品超过了最大值时需要阻塞上游再次传送产品
        while (total >= productionQueue.length)
        {
        try
        {
        this.wait();
        } catch (InterruptedException e)
        {
        }
        }
        //将产品放到传送带,并且通知工人线程工作
        productionQueue[tail] = production;
        tail = (tail + 1) % productionQueue.length;
        total++;
        this.notifyAll();
        }
        }
        //工人线程(Worker)从传送带上获取产品,并且进行加工
        public Production takeProduction()
        {
        synchronized (this)
        {
        //当传送带上没有产品时,工人等待着产品从上游输送到传送带上
        while (total <= 0)
        {
        try
        {
        this.wait();
        } catch (InterruptedException e)
        {
        }
        }
        //获取产品
        Production prod = productionQueue[head];
        head = (head + 1) % productionQueue.length;
        total--;
        this.notifyAll();
        return prod;
        }
        }
        }
        传送带是搁置产品的地方,如果工人们处理比较慢则会导致无限制的产品积压,因此我们需要做的是让上游的流水线阻塞并等待,直至流水线有位置可以用于放置新的产品为止,MAX_PROD 的作用就在于此,其用于控制传送带的最大容量,传送带被创建的同时,流水线上的工人们也已经就绪到位,等待着流水线上产品的到来。


        流水线工人

        流水线工人是 Thread 的子类,不断地从流水线上提取产品然后进行再次加工,加工的方法是 create()(对该产品的加工方法说明书),流水线工人示例代码如清单所示。
          public class Worker extends Thread
          {
          private final ProductionChannel channel;
          //主要用于获取一个随机值,模拟加工一个产品需要耗费一定的时间,当然每个工人操作时所花费的时间可也能不一样
          private final static Random random =
          new Random(System.currentTimeMillis());


          public Worker(String workerName, ProductionChannel channel)
          {
          super(workerName);
          this.channel = channel;
          }


          @Override
          public void run()
          {
          while (true)
          {
          try
          {
          //从传送带上获取产品
          Production production = channel.takeProduction();
          System.out.println(getName() + " process the " + production);
          //对产品进行加工
          production.create();
          TimeUnit.SECONDS.sleep(random.nextInt(10));
          } catch (InterruptedException e)
          {
          e.printStackTrace();
          }
          }
          }
          }

          产品流水线测试

          关于 Worker-Thread 模式我们已经实现完成,下面写一个简单的程序进行测试,代码如清单所示。
            public class Test
            {
            public static void main(String[] args)
            {
            //流水线上有5个工人
            final ProductionChannel channel = new ProductionChannel(5);
            AtomicInteger productionNo = new AtomicInteger();
            //流水线上有8个工作人员往传送带上不断地放置等待加工的半成品
            IntStream.range(1, 8).forEach(i ->
            new Thread(() ->
            {
            while (true)
            {
            channel.offerProduction(new Production(productionNo.getAndIncrement()));
            try
            {
            TimeUnit.SECONDS.sleep(current().nextInt(10));
            } catch (InterruptedException e)
            {
            e.printStackTrace();
            }
            }
            }).start()
            );
            }
            }
            在测试中,假设上游的流水线上有8个工人将产品放到传送带上,我们的传送带上定义了5个工人,运行上面的程序,Worker 将根据产品的使用说明书对产品进行再次加工,程序输出如下:
              ...省略
              execute the 3 second process
              Worker-0 process the PROD:5
              execute the 5 first process
              execute the 5 second process
              Worker-4 process the PROD:7
              Worker-0 process the PROD:6
              execute the 7 first process
              execute the 6 first process
              execute the 7 second process
              execute the 6 second process
              ...省略


              Worker-Thread 和 Producer-Consumer

              很多朋友觉得无法区分 Worker-Thread 和 Producer-Consumer 模式,其实这也可以理解,毕竟多线程的架构设计模式是很多程序员在日常的开发过程中累积沉淀下来的优秀编程模式,并不像 GoF 23种设计模式那样具有官方的认可,但是两者之间的区别还是很明显的。


              Producer-Consumer 模式

              首先 Producer、Consumer 对 Queue 都是依赖关系,其次 Producer 要做的就是不断地往 Queue 中生产数据,而 Consumer 则是不断地从 Queue 中获取数据,Queue 既不知道 Producer 的存在也不知道 Consumer 的存在,最后 Consumer 对 Queue 中数据的消费并不依赖于数据本身的方法(使用说明书)。


              Worker-Thread 模式

              左侧的线程,也就是传送带上游的线程,同样在不断地往传送带(Queue)中生产数据,而当 Channel 被启动的时候,就会同时创建并启动若干数量的 Worker 线程,因此我们可以看出 Worker 于 Channel 来说并不是单纯的依赖关系,而是聚合关系,Channel 必须知道 Worker 的存在。

              文章转载自Alleria Windrunner,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论