首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
华为云
V2EX  ›  Java

用 BlockingQueue 出现了一个无法解释的问题

  •  
  •   zhady009 · 31 天前 · 718 次点击
    这是一个创建于 31 天前的主题,其中的信息可能已经有所发展或是发生改变。

    想试用一下阻塞列队 做了个生产者和消费者的 demo 预期结果就是相互交替执行也就是 生产一个之后,消费一个

    不允许连续生产或者连续消费

    但是如果不让生产线程 sleep 就会无法实现交替执行的效果 我是没想到是什么原因

    public static void main(String[] args) throws IOException {
    
            BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);
    
            Producer p1 = new Producer(bq);
            p1.setName("producer01");
            Customer c1 = new Customer(bq);
            c1.setName("customer01");
            p1.start();
            c1.start();
        }
    
    public class Producer extends Thread {
    
        private BlockingQueue<Integer> bq;
        public Producer(BlockingQueue<Integer> bq) {
            this.bq = bq;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    bq.put(produce());
                    Thread.sleep(0,1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
        private Integer produce() {
            Integer number = (new Random().nextInt(100));
            System.out.println(getName() + ":produced =====> " + number);
            return number;
        }
    }
    
    public class Customer extends Thread {
    
        private BlockingQueue<Integer> bq;
    
        public Customer(BlockingQueue<Integer> bq) {
            this.bq = bq;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    consume();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void consume() throws InterruptedException {
            System.out.println(getName() + ":consumed:" + bq.take());
        }
    }
    
    20 回复  |  直到 2018-07-19 22:13:26 +08:00
        1
    watzds   31 天前 via Android
    什么叫交替执行?看输出不准吧
        2
    chocotan   31 天前
    Thread.sleep(0,1) 实际上是 sleep 了 1ms 吧
    bq.take()耗时小于 1ms,所以看起来是交替执行
    去掉 sleep 之后,bq.take()拿到数据比循环到下一个 produce()时要慢,所以看起来不是交替执行
        3
    zhady009   31 天前
    producer01:produced =====> 63
    customer01:consumed:63
    producer01:produced =====> 70
    customer01:consumed:70
    producer01:produced =====> 16
    customer01:consumed:16
    producer01:produced =====> 25
    customer01:consumed:25

    像这样的如果不加 sleep 会如下,

    producer01:produced =====> 70
    producer01:produced =====> 16
    customer01:consumed:70
    customer01:consumed:16
    producer01:produced =====> 25
    customer01:consumed:25
        4
    sagaxu   31 天前 via Android
    take 和 put 是交替执行的,但 println 不是
        5
    zhady009   31 天前
    Thread.sleep(0,1) 是一纳秒吧 Thread.sleep(1)才是 1 毫秒

    put 方法如果队列满了,将阻塞当前线程
    take 方法列队为空,将阻塞当前线程
        6
    chocotan   31 天前
    @zhady009 你看一下这个方法的源码

    ```
    if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
    millis++;
    }
    sleep(millis);
    ```
        7
    zhady009   31 天前
    那如何让
    System.out.println(getName() + ":consumed:" + bq.take());
    变成原子性
        8
    lcorange   31 天前
    比如这句 System.out.println(getName() + ":consumed:" + bq.take());
    可以保证一定是 bq.take()之后,生产者才能 bq.put(),这个可以保证顺序
    但是外层的 System.out.println 函数你是无法保证他一定会紧接着 bq.take()后面执行,拖延到生产者 sysout 后也是有可能的
        9
    zhady009   31 天前
    @chocotan
    没注意..确实是 1ms
        10
    pwrliang   31 天前
    我一开始也是认为 sysout 的问题,但是我统计了调用序列,也是交替的啊。
    import java.io.IOException;
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;

    public class Test {

    public static void main(String[] args) throws IOException, InterruptedException {

    BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);

    AtomicInteger seq =new AtomicInteger(0);

    Producer p1 = new Producer(bq,seq);
    p1.setName("producer01");
    Customer c1 = new Customer(bq,seq);
    c1.setName("customer01");
    p1.start();
    c1.start();
    }

    }

    class Producer extends Thread {
    AtomicInteger seq;
    private BlockingQueue<Integer> bq;
    public Producer(BlockingQueue<Integer> bq,AtomicInteger seq) {
    this.bq = bq;
    this.seq = seq;
    }

    @Override
    public void run() {
    while (true) {
    try {
    bq.put(produce());
    // Thread.sleep(0,1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    }
    }

    private Integer produce() {
    Integer number = (new Random().nextInt(100));
    int sid=seq.addAndGet(1);
    System.out.println("seq:"+sid+getName() + ":produced =====> " + number);
    System.out.flush();
    return number;
    }
    }

    class Customer extends Thread {
    AtomicInteger seq;
    private BlockingQueue<Integer> bq;

    public Customer(BlockingQueue<Integer> bq,AtomicInteger seq) {
    this.bq = bq;
    this.seq = seq;
    }

    @Override
    public void run() {
    while (true) {
    try {
    consume();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    private void consume() throws InterruptedException {
    int sid=seq.addAndGet(1);
    System.out.println("seq:" + sid + getName() + ":consumed:" + bq.take());
    System.out.flush();
    }
    }





    ----------------------------------------


    seq:2producer01:produced =====> 44
    seq:3producer01:produced =====> 97
    seq:1customer01:consumed:44
    seq:4customer01:consumed:97
    seq:5producer01:produced =====> 19
    seq:7producer01:produced =====> 88
    seq:6customer01:consumed:19
    seq:8producer01:produced =====> 90
    seq:9customer01:consumed:88
    seq:10producer01:produced =====> 93
    seq:11customer01:consumed:90
    seq:12producer01:produced =====> 40
        11
    zhady009   31 天前
    @lcorange 但是我试了一下把 consume 方法弄成同步方法也不管用..
        12
    lcorange   31 天前   ♥ 1
    @zhady009 这个是无解的,除非整个函数都包上锁,这时这个队列就变得毫无疑义了

    如果按照命令的顺序拆分,生产者分成 P,消费者分成 C

    P1 print number
    P2 bq.put(number)
    P3 print number
    P4 bq.put(number)
    P5 print number
    P6 bq.put(number)

    C1 bq.take()
    C2 print number
    C3 bq.take()
    C4 print number
    C5 bq.take()
    C6 print number

    当按照以下顺序执行的时候
    P1 P2 P3 C1 C2 P4 C3 C4 ...就会出现你所说的两条日志
    其实内部的 P2 C1 P4 C3 还是保证了两边的顺序的
        13
    cheneydog   31 天前
    我觉得是打印输出的问题,队列本身应该没问题,只是两个线程共用一个输出流 System.out ,结果无法控制。
        14
    lcorange   31 天前
    @pwrliang AtomicInteger LinkedBlockingQueue 只保证调用这两个对象的函数时能够保证原子性,但是整个 product 和 consume 函数上没有这样的锁,所以执行顺序是不能保证的
        15
    zhady009   31 天前
    @lcorange 懂了一半 另外一半不懂的是为什么 sleep 之后就可以达到预期结果
        16
    lcorange   31 天前
    @zhady009 只是运气好加系统负载不大,sleep 的时间里让 print 函数有机会执行,加大负载,长时间测试一样会出现这个现象
        17
    sagaxu   30 天前 via Android
    @zhady009 因为 sleep 改变了占空比,cpu 大部分时间是空闲的,错开了你的两组操作。试想一下,往平底锅里,同时扔 8 个鸡蛋,鸡蛋之间一定会有碰撞,发生空间的争抢,但是同时扔 8 粒芝麻,很大概率是散落不碰撞的。
        18
    reus   30 天前
    线程是并发执行的,当然不能保证交替执行。
        19
    pwrliang   30 天前
    这回可以了,要保证 put+sysout, take+sysout 是原子性的,只能加个全局锁。

    public class Test {

    public static void main(String[] args) throws IOException, InterruptedException {
    Lock lock = new ReentrantLock();
    BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);

    AtomicInteger seq =new AtomicInteger(0);

    Producer p1 = new Producer(bq,seq,lock);
    p1.setName("producer01");
    Customer c1 = new Customer(bq,seq,lock);
    c1.setName("customer01");
    p1.start();
    c1.start();
    }

    }

    class Producer extends Thread {
    AtomicInteger seq;
    Lock lock;
    private BlockingQueue<Integer> bq;
    public Producer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
    this.bq = bq;
    this.seq = seq;
    this.lock = lock;
    }

    @Override
    public void run() {
    while (true) {
    try {
    if(bq.size()==1)continue;
    lock.lock();
    bq.put(produce());
    lock.unlock();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    }
    }

    private Integer produce() {
    Integer number = (new Random().nextInt(100));
    int sid=seq.addAndGet(1);
    System.out.println("seq:"+sid+getName() + ":produced =====> " + number);
    System.out.flush();
    return number;
    }
    }

    class Customer extends Thread {
    AtomicInteger seq;
    private BlockingQueue<Integer> bq;
    Lock lock;

    public Customer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
    this.bq = bq;
    this.seq = seq;
    this.lock = lock;
    }

    @Override
    public void run() {
    while (true) {
    try {
    if (bq.size()==0)continue;
    lock.lock();
    consume();
    lock.unlock();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    private void consume() throws InterruptedException {
    int tk = bq.take();
    int sid=seq.addAndGet(1);
    System.out.println("seq:" + sid + getName() + ":consumed:" + tk);
    System.out.flush();
    }
    }

    -------------------------------------------------

    public class Test {

    public static void main(String[] args) throws IOException, InterruptedException {
    Lock lock = new ReentrantLock();
    BlockingQueue<Integer> bq = new LinkedBlockingQueue<Integer>(1);

    AtomicInteger seq =new AtomicInteger(0);

    Producer p1 = new Producer(bq,seq,lock);
    p1.setName("producer01");
    Customer c1 = new Customer(bq,seq,lock);
    c1.setName("customer01");
    p1.start();
    c1.start();
    }

    }

    class Producer extends Thread {
    AtomicInteger seq;
    Lock lock;
    private BlockingQueue<Integer> bq;
    public Producer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
    this.bq = bq;
    this.seq = seq;
    this.lock = lock;
    }

    @Override
    public void run() {
    while (true) {
    try {
    if(bq.size()==1)continue;
    lock.lock();
    bq.put(produce());
    lock.unlock();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    }
    }

    private Integer produce() {
    Integer number = (new Random().nextInt(100));
    int sid=seq.addAndGet(1);
    System.out.println("seq:"+sid+getName() + ":produced =====> " + number);
    System.out.flush();
    return number;
    }
    }

    class Customer extends Thread {
    AtomicInteger seq;
    private BlockingQueue<Integer> bq;
    Lock lock;

    public Customer(BlockingQueue<Integer> bq,AtomicInteger seq,Lock lock) {
    this.bq = bq;
    this.seq = seq;
    this.lock = lock;
    }

    @Override
    public void run() {
    while (true) {
    try {
    if (bq.size()==0)continue;
    lock.lock();
    consume();
    lock.unlock();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    private void consume() throws InterruptedException {
    int tk = bq.take();
    int sid=seq.addAndGet(1);
    System.out.println("seq:" + sid + getName() + ":consumed:" + tk);
    System.out.flush();
    }
    }
        20
    pwrliang   30 天前
    @pwrliang 刚刚结果粘贴错了
    --------------------------------------------------
    seq:1producer01:produced =====> 45
    seq:2customer01:consumed:45
    seq:3producer01:produced =====> 20
    seq:4customer01:consumed:20
    seq:5producer01:produced =====> 78
    seq:6customer01:consumed:78
    seq:7producer01:produced =====> 45
    seq:8customer01:consumed:45
    seq:9producer01:produced =====> 90
    seq:10customer01:consumed:90
    seq:11producer01:produced =====> 57
    seq:12customer01:consumed:57
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   鸣谢   ·   实用小工具   ·   862 人在线   最高记录 3762   ·  
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.1 · 61ms · UTC 23:41 · PVG 07:41 · LAX 16:41 · JFK 19:41
    ♥ Do have faith in what you're doing.
    沪ICP备16043287号-1