JUC并发—9.并发安全集合四
大纲
1.并发安全的数组列表CopyOnWriteArrayList
2.并发安全的链表队列ConcurrentLinkedQueue
3.并发编程中的阻塞队列概述
4.JUC的各种阻塞队列介绍
5.LinkedBlockingQueue的具体实现原理
6.基于两个队列实现的集群同步机制
4.JUC的各种阻塞队列介绍
(1)基于数组的阻塞队列ArrayBlockingQueue
(2)基于链表的阻塞队列LinkedBlockingQueue
(3)优先级阻塞队列PriorityBlockingQueue
(4)延迟阻塞队列DelayQueue
(5)无存储结构的阻塞队列SynchronousQueue
(6)阻塞队列结合体LinkedTransferQueue
(7)双向阻塞队列LinkedBlockingDeque
(1)基于数组的阻塞队列ArrayBlockingQueue
ArrayBlockingQueue是一个基于数组实现的阻塞队列。其构造方法可以指定:数组的长度、公平还是非公平、数组的初始集合。
ArrayBlockingQueue会通过ReentrantLock来解决线程竞争的问题,以及采用Condition来解决线程的唤醒与阻塞的问题。
//A bounded BlockingQueue backed by an array.
//This queue orders elements FIFO (first-in-first-out).
//The head of the queue is that element that has been on the queue the longest time.
//The tail of the queue is that element that has been on the queue the shortest time.
//New elements are inserted at the tail of the queue,
//and the queue retrieval operations obtain elements at the head of the queue.
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //The queued itemsfinal Object[] items;//items index for next take, poll, peek or removeint takeIndex;//items index for next put, offer, or addint putIndex;//Number of elements in the queueint count;//Main lock guarding all accessfinal ReentrantLock lock;//Condition for waiting takesprivate final Condition notEmpty;//Condition for waiting putsprivate final Condition notFull;//Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy.public ArrayBlockingQueue(int capacity) {this(capacity, false);}//Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy.public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0) {throw new IllegalArgumentException();}this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}//Inserts the specified element at the tail of this queue, //waiting for space to become available if the queue is full.public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {notFull.await();}enqueue(e);} finally {lock.unlock();}}//Inserts element at current put position, advances, and signals.//Call only when holding lock.private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length) {putIndex = 0;}count++;notEmpty.signal();}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {notEmpty.await();}return dequeue();} finally {lock.unlock();}}//Returns the number of elements in this queue.public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}...
}
(2)基于链表的阻塞队列LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表实现的阻塞队列,它可以不指定阻塞队列的长度,它的默认长度是Integer.MAX_VALUE。由于这个默认长度非常大,一般也称LinkedBlockingQueue为无界队列。
//An optionally-bounded BlockingQueue based on linked nodes.
//This queue orders elements FIFO (first-in-first-out).
//The head of the queue is that element that has been on the queue the longest time.
//The tail of the queue is that element that has been on the queue the shortest time.
//New elements are inserted at the tail of the queue,
//and the queue retrieval operations obtain elements at the head of the queue.
//Linked queues typically have higher throughput than array-based queues
//but less predictable performance in most concurrent applications.
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...//The capacity bound, or Integer.MAX_VALUE if noneprivate final int capacity;//Current number of elementsprivate final AtomicInteger count = new AtomicInteger();//Head of linked list.transient Node<E> head;//Tail of linked list.private transient Node<E> last;//Lock held by take, poll, etcprivate final ReentrantLock takeLock = new ReentrantLock();//Lock held by put, offer, etcprivate final ReentrantLock putLock = new ReentrantLock();//Wait queue for waiting takesprivate final Condition notEmpty = takeLock.newCondition();//Wait queue for waiting putsprivate final Condition notFull = putLock.newCondition();//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {last = last.next = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}public int size() {return count.get();}...
}
(3)优先级阻塞队列PriorityBlockingQueue
PriorityBlockingQueue是一个支持自定义元素优先级的无界阻塞队列。默认情况下添加的元素采用自然顺序升序排列,当然可以通过实现元素的compareTo()方法自定义优先级规则。
PriorityBlockingQueue是基于数组实现的,这个数组会自动进行动态扩容。在应用方面,消息中间件可以基于优先级阻塞队列来实现。
//An unbounded BlockingQueue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.
//While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError).
//This class does not permit null elements.
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, //if comparator is null: For each node n in the heap and each descendant d of n, n <= d.//The element with the lowest value is in queue[0], assuming the queue is nonempty.private transient Object[] queue;//The number of elements in the priority queue.private transient int size;//The comparator, or null if priority queue uses elements' natural ordering.private transient Comparator<? super E> comparator;//Lock used for all public operationsprivate final ReentrantLock lock;//Condition for blocking when emptyprivate final Condition notEmpty;//Spinlock for allocation, acquired via CAS.private transient volatile int allocationSpinLock;//Creates a PriorityBlockingQueue with the default initial capacity (11) that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that orders its elements according to the specified comparator.public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {if (initialCapacity < 1) {throw new IllegalArgumentException();}this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never block.public void put(E e) {offer(e); // never need to block}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never return false.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length)) {tryGrow(array, cap);}try {Comparator<? super E> cmp = comparator;if (cmp == null) {siftUpComparable(n, e, array);} else {siftUpUsingComparator(n, e, array, cmp);}size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}//Tries to grow array to accommodate at least one more element (but normally expand by about 50%), //giving up (allowing retry) on contention (which we expect to be rare). Call only while holding lock.private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {int minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE) {throw new OutOfMemoryError();}newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array) {newArray = new Object[newCap];}} finally {allocationSpinLock = 0;}}if (newArray == null) {// back off if another thread is allocatingThread.yield();}lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {while(k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = x;}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null) {notEmpty.await();}} finally {lock.unlock();}return result;}public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return size;} finally {lock.unlock();}}...
}
(4)延迟阻塞队列DelayQueue
DelayQueue是一个支持延迟获取元素的无界阻塞队列,它是基于优先级队列PriorityQueue实现的。
往DelayQueue队列插入元素时,可以按照自定义的delay时间进行排序。也就是队列中的元素顺序是按照到期时间排序的,只有delay时间小于或等于0的元素才能够被取出。
DelayQueue的应用场景有:
一.订单超时支付需要自动取消订单
二.任务超时处理需要自动丢弃任务
三.消息中间件的实现
//An unbounded BlockingQueue of Delayed elements, in which an element can only be taken when its delay has expired.
//The head of the queue is that Delayed element whose delay expired furthest in the past.
//If no delay has expired there is no head and poll will return null.
//Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.
//Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements.
//For example, the size method returns the count of both expired and unexpired elements.
//This queue does not permit null elements.
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();//Thread designated to wait for the element at the head of the queue.//When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.//The leader thread must signal some other thread before returning from take() or poll(...), //unless some other thread becomes leader in the interim. //Whenever the head of the queue is replaced with an element with an earlier expiration time, //the leader field is invalidated by being reset to null, and some waiting thread, //but not necessarily the current leader, is signalled. //So waiting threads must be prepared to acquire and lose leadership while waiting.private Thread leader = null;//Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader.private final Condition available = lock.newCondition();//Creates a new {@code DelayQueue} that is initially empty.public DelayQueue() {}//Inserts the specified element into this delay queue. //As the queue is unbounded this method will never block.public void put(E e) {offer(e);}//Inserts the specified element into this delay queue.public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}//Retrieves and removes the head of this queue, //waiting if necessary until an element with an expired delay is available on this queue.public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {available.await();} else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) {return q.poll();} first = null; // don't retain ref while waitingif (leader != null) {available.await();} else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread) {leader = null;}}}}}} finally {if (leader == null && q.peek() != null) {available.signal();}lock.unlock();}}...
}public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable {//Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, if comparator is null: //For each node n in the heap and each descendant d of n, n <= d. //The element with the lowest value is in queue[0], assuming the queue is nonempty.transient Object[] queue; //The number of elements in the priority queue.private int size = 0;//The comparator, or null if priority queue uses elements' natural ordering.private final Comparator<? super E> comparator;public E peek() {return (size == 0) ? null : (E) queue[0];}//Inserts the specified element into this priority queue.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}modCount++;int i = size;if (i >= queue.length) {grow(i + 1);}size = i + 1;if (i == 0) {queue[0] = e;} else {siftUp(i, e);}return true;}//Increases the capacity of the array.private void grow(int minCapacity) {int oldCapacity = queue.length;// Double size if small; else grow by 50%int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1));// overflow-conscious codeif (newCapacity - MAX_ARRAY_SIZE > 0) {newCapacity = hugeCapacity(minCapacity);}queue = Arrays.copyOf(queue, newCapacity);}private void siftUp(int k, E x) {if (comparator != null) {siftUpUsingComparator(k, x);} else {siftUpComparable(k, x);}}@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (key.compareTo((E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = key;}@SuppressWarnings("unchecked")private void siftUpUsingComparator(int k, E x) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (comparator.compare(x, (E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = x;}...
}
(5)无存储结构的阻塞队列SynchronousQueue
SynchronousQueue的内部没有容器来存储数据,因此当生产者往其添加一个元素而没有消费者去获取元素时,生产者会阻塞。当消费者往其获取一个元素而没有生产者去添加元素时,消费者也会阻塞。
SynchronousQueue的本质是借助了无容量存储的特点,来实现生产者线程和消费者线程的即时通信,所以它特别适合在两个线程之间及时传递数据。
线程池是基于阻塞队列来实现生产者/消费者模型的。当向线程池提交任务时,首先会把任务放入阻塞队列中,然后线程池中会有对应的工作线程专门处理阻塞队列中的任务。
Executors.newCachedThreadPool()就是基于SynchronousQueue来实现的,它会返回一个可以缓存的线程池。如果这个线程池大小超过处理当前任务所需的数量,会灵活回收空闲线程。当任务数量增加时,这个线程池会不断创建新的工作线程来处理这些任务。
public class Executors {...//Creates a thread pool that creates new threads as needed, //but will reuse previously constructed threads when they are available.//These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.//Calls to execute will reuse previously constructed threads if available. //If no existing thread is available, a new thread will be created and added to the pool. //Threads that have not been used for sixty seconds are terminated and removed from the cache. //Thus, a pool that remains idle for long enough will not consume any resources. //Note that pools with similar properties but different details (for example, timeout parameters)//may be created using ThreadPoolExecutor constructors.public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}...
}
(6)阻塞队列结合体LinkedTransferQueue
LinkedTransferQueue是一个基于链表实现的无界阻塞TransferQueue。
阻塞队列的特性是根据队列的数据情况来阻塞生产者线程或消费者线程,TransferQueue的特性是生产者线程生产数据后必须等消费者消费才返回。
LinkedTransferQueue是TransferQueue和LinkedBlockingQueue的结合体,而SynchronousQueue内部其实也是基于TransferQueue来实现的,所以LinkedTransferQueue是带有阻塞队列功能的SynchronousQueue。
(7)双向阻塞队列LinkedBlockingDeque
LinkedBlockingDeque是一个基于链表实现的双向阻塞队列,双向队列的两端都可以插入和移除元素,可减少多线程并发下的一半竞争。
5.LinkedBlockingQueue的具体实现原理
(1)阻塞队列的设计分析
(2)有界队列LinkedBlockingQueue
(3)LinkedBlockingQueue的put()方法
(4)LinkedBlockingQueue的take()方法
(5)LinkedBlockingQueue使用两把锁拆分锁功能
(6)LinkedBlockingQueue的size()方法和迭代
(7)对比LinkedBlockingQueue链表队列和ArrayBlockingQueue数组队列
(1)阻塞队列的设计分析
阻塞队列的特性为:如果队列为空,消费者线程会被阻塞。如果队列满了,生产者线程会被阻塞。
为了实现这个特性:如何让线程在满足某个特定条件的情况下实现阻塞和唤醒?阻塞队列中的数据应该用什么样的容器来存储?
线程的阻塞和唤醒,可以使用wait/notify或者Condition。阻塞队列中数据的存储,可以使用数组或者链表。
(2)有界队列LinkedBlockingQueue
一.并发安全的无界队列
比如ConcurrentLinkedQueue,是没有边界没有大小限制的。它就是一个单向链表,可以无限制的往里面去存放数据。如果不停地往无界队列里添加数据,那么可能会导致内存溢出。
二.并发安全的有界队列
比如LinkedBlockingQueue,是有边界的有大小限制的。它也是一个单向链表,如果超过了限制,往队列里添加数据就会被阻塞。因此可以限制内存队列的大小,避免内存队列无限增长,最后撑爆内存。
(3)LinkedBlockingQueue的put()方法
put()方法是阻塞添加元素的方法,当队列满时,阻塞添加元素的线程。
首先把添加的元素封装成一个Node对象,该对象表示链表中的一个结点。
然后使用ReentrantLock.lockInterruptibly()方法来获取一个可被中断的锁,加锁的目的是保证数据添加到队列过程中的安全性 + 避免队列长度超阈值。
接着调用enqueue()方法把封装的Node对象存储到链表尾部,然后通过AtomicInteger来递增当前阻塞队列中的元素个数。
最后根据AtomicInteger类型的变量判断队列元素是否已超阈值。
注意:这里用到了一个很重要的属性notFull。notFull是一个Condition对象,用来阻塞和唤醒生产者线程。如果队列元素个数等于最大容量,就调用notFull.await()阻塞生产者线程。如果队列元素个数小于最大容量,则调用notFull.signal()唤醒生产者线程。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final int capacity;//阻塞队列的最大容量,默认是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁//使用两个Condition,分别阻塞和唤醒出队时的线程和入队时的线程private final Condition notEmpty = takeLock.newCondition();//出队的等待队列conditionprivate final Condition notFull = putLock.newCondition();//入队的等待队列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;//将添加的元素封装成一个Node对象Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//当前队列元素的数量putLock.lockInterruptibly();//加可被中断的锁try {//注意:这里用到了一个很重要的属性notFull,它是一个Condition对象,用来阻塞和唤醒生产者线程//如果阻塞队列当前的元素个数等于最大容量,就调用notFull.await()方法来阻塞生产者线程while (count.get() == capacity) {notFull.await();//阻塞当前线程,并释放锁}//把封装的Node对象存储到链表中enqueue(node);//通过AtomicInteger来递增当前阻塞队列中的元素个数,用于后续判断是否已超阻塞队列的最大容量c = count.getAndIncrement();//根据AtomicInteger类型的变量判断队列元素是否已超阈值if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();//释放锁}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {//node先成为当前last的next//然后last又指向last的next(即node)last = last.next = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}...
}
(4)LinkedBlockingQueue的take()方法
take()方法是阻塞获取元素的方法,当队列为空时,阻塞获取元素的线程。
首先使用ReentrantLock.lockInterruptibly()方法来获取一个可被中断的锁。
然后判断元素个数是否为0,若是则通过notEmpty.await()阻塞消费者线程。
否则接着调用dequeue()方法从链表头部获取一个元素,并通过AtomicInteger来递减当前阻塞队列中的元素个数。
最后判断阻塞队列中的元素个数是否大于1,如果是,则调用notEmpty.signal()唤醒被阻塞的消费者线程。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final int capacity;//阻塞队列的最大容量,默认是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁//使用两个Condition,分别阻塞和唤醒出队时的线程和入队时的线程private final Condition notEmpty = takeLock.newCondition();//出队的等待队列conditionprivate final Condition notFull = putLock.newCondition();//入队的等待队列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//获取一个可中断的锁takeLock.lockInterruptibly();try {//判断元素个数是否为0while (count.get() == 0) {notEmpty.await();//阻塞当前线程并释放锁}//调用dequeue()方法从链表中获取一个元素x = dequeue();//通过AtomicInteger来递减当前阻塞队列中的元素个数c = count.getAndDecrement();//判断阻塞队列中的元素个数是否大于1//如果是,则调用notEmpty.signal()唤醒被阻塞的消费者消除if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}//首先获取链表的头结点head//然后拿到头结点的下一个结点first//然后把原来的头结点从队列中移除,设置first结点的数据为null,并将first结点设置为新的头结点//最后返回first结点的数据private E dequeue() {Node<E> h = head;//h指向headNode<E> first = h.next;//first指向h的nexth.next = h;// help GChead = first;E x = first.item;first.item = null;return x;}...
}
(5)LinkedBlockingQueue使用两把锁拆分锁功能
两把独占锁可以提升并发性能,因为出队和入队用的是不同的锁。这样在并发出队和入队的时候,出队和入队就可以同时执行,不会锁冲突。
这也是锁优化的一种思想,通过将一把锁按不同的功能进行拆分,使用不同的锁控制不同功能下的并发冲突,从而提升性能。
(6)LinkedBlockingQueue的size()方法和迭代
一.size()方法获取的结果也不是100%准确
LinkedBlockingQueue的size()方法获取元素个数是通过AtomicInteger获取的。
相比ConcurrentLinkedQueue通过遍历队列获取,准确性大很多。
相比CopyOnWriteArrayList通过遍历老副本数组获取,准确性也大很多。
但是相比ConcurrentHashMap通过分段CAS统计,那么准确性则差不多。
注意:LinkedBlockingQueue也不能获取到100%准确的队列元素的个数。除非锁掉整个队列,调用size()时不允许入队和出队,才会是100%准确。因为是完成入队或出队之后,才会对AtomicInteger变量进行递增或递减。
二.迭代时获取两把锁来锁整个队列
LinkedBlockingQueue的遍历会直接锁整个队列,即会先获取两把锁。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁public int size() {return count.get();}public Iterator<E> iterator() {return new Itr();}private class Itr implements Iterator<E> {private Node<E> current;private Node<E> lastRet;private E currentElement;Itr() {fullyLock();try {current = head.next;if (current != null) {currentElement = current.item;}} finally {fullyUnlock();}}...}void fullyLock() {putLock.lock();takeLock.lock();}void fullyUnlock() {takeLock.unlock();putLock.unlock();}...
}
(7)对比LinkedBlockingQueue链表队列和ArrayBlockingQueue数组队列
一.LinkedBlockingQueue是基于链表实现的有界阻塞队列,ArrayBlockingQueue是基于数组实现的有界阻塞队列
二.ArrayBlockingQueue的整体实现原理与LinkedBlockingQueue的整体实现原理是一样的
三.LinkedBlockingQueue需要使用两把独占锁,分别锁出队和入队的场景
四.ArrayBlockingQueue只使用一把锁,锁整个数组,所以其入队和出队不能同时进行
五.ArrayBlockingQueue执行size()方法获取元素个数时会直接加独占锁
六.ArrayBlockingQueue和LinkedBlockingQueue执行迭代方法时都会锁数据
6.基于两个队列实现的集群同步机制
(1)服务注册中心集群需要实现的功能
(2)基于两个队列实现的集群同步机制
(3)使用ConcurrentLinkedQueue实现第一个队列
(4)使用LinkedBlockingQueue实现第二个队列
(5)集群同步机制的具体实现
(1)服务注册中心集群需要实现的功能
服务实例向任何一个服务注册中心实例发送注册、下线、心跳的请求,该服务注册中心实例都需要将这些信息同步到其他的服务注册中心实例上,从而确保所有服务注册中心实例的内存注册表的数据是一致的。
(2)基于两个队列实现的集群同步机制
某服务注册中心实例接收到服务实例A的请求时,首先会把服务实例A的服务请求信息存储到本地的内存注册表里,也就是把服务实例A的服务请求信息写到第一个内存队列中,之后该服务注册中心实例对服务实例A的请求处理就可以结束并返回。
接着该服务注册中心实例会有一个后台线程消费第一个内存队列里的数据,把消费到的第一个内存队列的数据batch打包然后写到第二个内存队列里。
最后该服务注册中心实例还有一个后台线程消费第二个内存队列里的数据,把消费到的第二个内存队列的数据同步到其他服务注册中心实例中。
(3)使用ConcurrentLinkedQueue实现第一个队列
首先有两种队列:
一是无界队列ConcurrentLinkedQueue,基于CAS实现,并发性能很高。
二是有界队列LinkedBlockingQueue,基于两把锁实现,并发性能一般。
LinkedBlockingQueue默认的队列长度是MAX_VALUE,所以可以看成是无界队列。但是也可以指定正常大小的队列长度,从而实现入队的阻塞,避免耗尽内存。
当服务注册中心实例接收到各种请求时,会先将请求信息放入第一个队列。所以第一个队列会存在高并发写的情况,因此LinkedBlockingQueue不合适。
因为LinkedBlockingQueue属于阻塞队列,如果LinkedBlockingQueue满了,那么服务注册中心实例中的,处理服务请求的线程,就会被阻塞住。而且LinkedBlockingQueue的并发性能也不是太高,要获取独占锁才能写,所以最好还是使用无界队列ConcurrentLinkedQueue来实现第一个队列。
(4)使用LinkedBlockingQueue实现第二个队列
消费第一个内存队列的数据时,可以按时间来进行batch打包,比如每隔500ms才将消费到的所有数据打包成一个batch消息。接着再将这个batch信息放入到第二个内存队列中,这样消费第二个队列的数据时,只需同步batch信息到集群其他实例即可。
可见对第二个队列进行的入队和出队操作是由少数的后台线程来执行的,因此可以使用有界队列LinkedBlockingQueue来实现第二个内存队列。
此外还要估算有界队列LinkedBlockingQueue的队列长度应设置多少才合适。假如每一条需要同步给集群其他实例的请求信息,有6个字段,占30字节。平均每一条batch信息会包含100条请求信息,也就是会占3000字节 = 3KB。那么1000条batch消息,才占用3000KB = 3MB。因此可以设置第二个内存队列LinkedBlockingQueue的长度为1000。
(5)集群同步机制的具体实现
//集群同步组件
public class PeersReplicator {//集群同步生成batch的间隔时间:500msprivate static final long PEERS_REPLICATE_BATCH_INTERVAL = 500;private static final PeersReplicator instance = new PeersReplicator();private PeersReplicator() {//启动接收请求和打包batch的线程AcceptorBatchThread acceptorBatchThread = new AcceptorBatchThread();acceptorBatchThread.setDaemon(true); acceptorBatchThread.start();//启动同步发送batch的线程PeersReplicateThread peersReplicateThread = new PeersReplicateThread();peersReplicateThread.setDaemon(true);peersReplicateThread.start(); }public static PeersReplicator getInstance() {return instance;}//第一个内存队列:处理高并发的服务请求,所以存在高并发的写入情况,无界队列private ConcurrentLinkedQueue<AbstractRequest> acceptorQueue = new ConcurrentLinkedQueue<AbstractRequest>();//第二个内存队列:有界队列,用于同步batch消息到其他集群实例private LinkedBlockingQueue<PeersReplicateBatch> replicateQueue = new LinkedBlockingQueue<PeersReplicateBatch>(10000); //同步服务注册请求public void replicateRegister(RegisterRequest request) {request.setType(AbstractRequest.REGISTER_REQUEST); //将请求消息放入第一个内存队列acceptorQueue.offer(request);}//同步服务下线请求public void replicateCancel(CancelRequest request) {request.setType(AbstractRequest.CANCEL_REQUEST);//将请求消息放入第一个内存队列acceptorQueue.offer(request);}//同步发送心跳请求public void replicateHeartbeat(HeartbeatRequest request) {request.setType(AbstractRequest.HEARTBEAT_REQUEST);//将请求消息放入第一个内存队列acceptorQueue.offer(request);}//负责接收数据以及打包为batch的后台线程class AcceptorBatchThread extends Thread {long latestBatchGeneration = System.currentTimeMillis();@Overridepublic void run() {while(true) {try {//每隔500ms生成一个batchPeersReplicateBatch batch = new PeersReplicateBatch();long now = System.currentTimeMillis();if (now - latestBatchGeneration >= PEERS_REPLICATE_BATCH_INTERVAL) {//已经到了500ms的时间间隔//将batch消息放入第二个内存队列replicateQueue.offer(batch);//更新latestBatchGenerationlatestBatchGeneration = System.currentTimeMillis();//重置batchbatch = new PeersReplicateBatch();} else {//还没到500ms的时间间隔//从第一层队列获取数据,然后batch放入到第二层队列中AbstractRequest request = acceptorQueue.poll();if (request != null) {batch.add(request); } else {Thread.sleep(100);} }} catch (Exception e) {e.printStackTrace(); }}}}//集群同步线程class PeersReplicateThread extends Thread {@Overridepublic void run() {while(true) {try {PeersReplicateBatch batch = replicateQueue.take();if (batch != null) {//遍历其他的register-server地址//给每个地址的register-server都发送一个http请求同步batchSystem.out.println("给其他的register-server发送请求,同步batch......"); }} catch (Exception e) {e.printStackTrace(); }}}}
}//用于进行批量同步的batch消息
public class PeersReplicateBatch {private List<AbstractRequest> requests = new ArrayList<AbstractRequest>();public void add(AbstractRequest request) {this.requests.add(request);}public List<AbstractRequest> getRequests() {return requests;}public void setRequests(List<AbstractRequest> requests) {this.requests = requests;}
}//负责接收和处理register-client发送过来的请求的
public class RegisterServerController {//服务注册表private ServiceRegistry registry = ServiceRegistry.getInstance();//服务注册表的缓存private ServiceRegistryCache registryCache = ServiceRegistryCache.getInstance();//集群同步组件private PeersReplicator peersReplicator = PeersReplicator.getInstance();//服务注册public RegisterResponse register(RegisterRequest registerRequest) {RegisterResponse registerResponse = new RegisterResponse();try {//在注册表中加入这个服务实例ServiceInstance serviceInstance = new ServiceInstance();serviceInstance.setHostname(registerRequest.getHostname()); serviceInstance.setIp(registerRequest.getIp()); serviceInstance.setPort(registerRequest.getPort()); serviceInstance.setServiceInstanceId(registerRequest.getServiceInstanceId()); serviceInstance.setServiceName(registerRequest.getServiceName());registry.register(serviceInstance);//更新自我保护机制的阈值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() + 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//过期掉注册表缓存registryCache.invalidate();//进行集群同步peersReplicator.replicateRegister(registerRequest);registerResponse.setStatus(RegisterResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); registerResponse.setStatus(RegisterResponse.FAILURE);}return registerResponse;}//服务下线 public void cancel(CancelRequest cancelRequest) {//从服务注册中摘除实例registry.remove(cancelRequest.getServiceName(), cancelRequest.getServiceInstanceId());//更新自我保护机制的阈值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//过期掉注册表缓存registryCache.invalidate();//进行集群同步peersReplicator.replicateCancel(cancelRequest); }//发送心跳public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) { HeartbeatResponse heartbeatResponse = new HeartbeatResponse();try {//获取服务实例ServiceInstance serviceInstance = registry.getServiceInstance(heartbeatRequest.getServiceName(), heartbeatRequest.getServiceInstanceId());if (serviceInstance != null) {serviceInstance.renew();}//记录一下每分钟的心跳的次数HeartbeatCounter heartbeatMessuredRate = HeartbeatCounter.getInstance();heartbeatMessuredRate.increment();//进行集群同步peersReplicator.replicateHeartbeat(heartbeatRequest);heartbeatResponse.setStatus(HeartbeatResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); heartbeatResponse.setStatus(HeartbeatResponse.FAILURE); }return heartbeatResponse;}//同步batch数据public void replicateBatch(PeersReplicateBatch batch) {for (AbstractRequest request : batch.getRequests()) {if (request.getType().equals(AbstractRequest.REGISTER_REQUEST)) {register((RegisterRequest) request);} else if (request.getType().equals(AbstractRequest.CANCEL_REQUEST)) {cancel((CancelRequest) request);} else if (request.getType().equals(AbstractRequest.HEARTBEAT_REQUEST)) {heartbeat((HeartbeatRequest) request); }}}//拉取全量注册表public Applications fetchFullRegistry() {return (Applications) registryCache.get(CacheKey.FULL_SERVICE_REGISTRY);}//拉取增量注册表public DeltaRegistry fetchDeltaRegistry() {return (DeltaRegistry) registryCache.get(CacheKey.DELTA_SERVICE_REGISTRY); }
}
相关文章:
JUC并发—9.并发安全集合四
大纲 1.并发安全的数组列表CopyOnWriteArrayList 2.并发安全的链表队列ConcurrentLinkedQueue 3.并发编程中的阻塞队列概述 4.JUC的各种阻塞队列介绍 5.LinkedBlockingQueue的具体实现原理 6.基于两个队列实现的集群同步机制 4.JUC的各种阻塞队列介绍 (1)基于数组的阻塞…...
爱普生 SG-8101CE 可编程晶振在笔记本电脑的应用
在笔记本电脑的精密架构中,每一个微小的元件都如同精密仪器中的齿轮,虽小却对整体性能起着关键作用。如今的笔记本电脑早已不再局限于简单的办公用途,其功能愈发丰富多样。从日常轻松的文字处理、网页浏览,到专业领域中对图形处理…...
k8s网络插件详解(flannel)
1、介绍 Flannel 是一个轻量级、易于配置的网络插件,旨在简化 Kubernetes 集群中 Pod 网络的管理。Flannel 的核心功能是提供一个虚拟的网络,允许每个 Pod 获取一个独立的 IP 地址,并实现不同节点间的 Pod 之间的通信 2、网络模式 vxlan&am…...
基于flask+vue框架的的医院预约挂号系统i1616(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
系统程序文件列表 项目功能:用户,医生,科室信息,就诊信息,医院概况,挂号信息,诊断信息,取消挂号 开题报告内容 基于FlaskVue框架的医院预约挂号系统开题报告 一、研究背景与意义 随着医疗技术的不断进步和人们健康意识的日益增强,医院就诊量逐年增加。传统的现场…...
JUC并发—8.并发安全集合一
大纲 1.JDK 1.7的HashMap的死循环与数据丢失 2.ConcurrentHashMap的并发安全 3.ConcurrentHashMap的设计介绍 4.ConcurrentHashMap的put操作流程 5.ConcurrentHashMap的Node数组初始化 6.ConcurrentHashMap对Hash冲突的处理 7.ConcurrentHashMap的并发扩容机制 8.Concu…...
Linux 内核网络设备驱动编程:私有协议支持
一、struct net_device的通用性与私有协议的使用 struct net_device是Linux内核中用于描述网络设备的核心数据结构,它不仅限于TCP/IP协议,还可以用于支持各种类型的网络协议,包括私有协议。其原因如下: 协议无关性:struct net_device的设计是通用的,它本身并不依赖于任何…...
机器学习的数学基础(三)——概率与信息论
目录 1. 随机变量2. 概率分布2.1 离散型变量和概率质量函数2.2 连续型变量和概率密度函数 3. 边缘概率4. 条件概率5. 条件概率的链式法则6. 独立性和条件独立性7. 期望、方差和协方差7.1 期望7.2 方差7.3 协方差 8. 常用概率分布8.1 均匀分布 U ( a , b ) U(a, b) U(a,b)8.2 Be…...
使用Docker Desktop部署GitLab
1. 环境准备 确保Windows 10/11系统支持虚拟化技术(需在BIOS中开启Intel VT-x/AMD-V)内存建议≥8GB,存储空间≥100GB 2. 安装Docker Desktop 访问Docker官网下载安装包安装时勾选"Use WSL 2 instead of Hyper-V"(推荐…...
推理模型时代:大语言模型如何从对话走向深度思考?
一、对话模型和推理模型的区别概述 对话模型是专门用于问答交互的语言模型,符合人类的聊天方式,返回的内容可能仅仅只是一个简短的答案,一般模型名称后面会带有「chat」字样。 推理模型是比较新的产物,没有明确的定义,一般是指输出过程中带有<think>和</think&…...
GESP2024年3月认证C++七级( 第三部分编程题(1)交流问题)
参考程序: #include <iostream> #include <vector> #include <unordered_map> using namespace std;// 深度优先搜索,给每个节点染色,交替染色以模拟两校同学的划分 void dfs(vector<vector<int>>& graph…...
DeepSeek:AI商业化的新引擎与未来蓝图
摘要 在人工智能迅猛发展的浪潮中,DeepSeek以其卓越的技术实力和高超的商业化能力崭露头角。作为一款现象级AI产品,它不仅在算法性能上位居行业前列,还通过灵活的定制解决方案渗透到金融、医疗、零售等多个领域。DeepSeek以创新的商业模式和场…...
2025年度福建省职业院校技能大赛中职组“网络建设与运维”赛项规程模块三
模块三:服务搭建与运维 任务描述: 随着信息技术的快速发展,集团计划把部分业务由原有的 X86 服 务器上迁移到ARM 架构服务器上,同时根据目前的部分业务需求进行 了部分调整和优化。 一、X86 架构计算机操作系统安装与管理 1&…...
Python----数据结构(队列,顺序队列,链式队列,双端队列)
一、队列 1.1、概念 队列(Queue):也是一种基本的数据结构,在队列中的插入和删除都遵循先进先出(First in First out,FIFO)的原则。元素可以在任何时刻从队尾插入,但是只有在队列最前面 的元素才能被取出或…...
【自学笔记】Spring Boot框架技术基础知识点总览-持续更新
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 Spring Boot框架技术基础知识点总览一、Spring Boot简介1.1 什么是Spring Boot?1.2 Spring Boot的主要特性 二、Spring Boot快速入门2.1 搭建Spring Boo…...
神经网络剪枝技术的重大突破:sGLP-IB与sTLP-IB
神经网络剪枝技术的重大突破:sGLP-IB与sTLP-IB 在人工智能飞速发展的今天,深度学习技术已经成为推动计算机视觉、自然语言处理等领域的核心力量。然而,随着模型规模的不断膨胀,如何在有限的计算资源和存储条件下高效部署这些复杂的神经网络模型,成为了研究者们亟待解决的…...
Django-Vue 学习-VUE
主组件中有多个Vue组件 是指在Vue.js框架中,主组件是一个父组件,它包含了多个子组件(Vue组件)。这种组件嵌套的方式可以用于构建复杂的前端应用程序,通过拆分功能和视图,使代码更加模块化、可复用和易于维…...
【Gin】2:快速上手Gin框架(模版、cookie、session)
本文目录 一、模版渲染二、自定义模版函数三、cookie四、Session五、cookie、session区别六、会话攻击 一、模版渲染 在 Gin 框架中,模板主要用于动态生成 HTML 页面,结合 Go 语言的模板引擎功能,实现数据与视图的分离。 模板渲染是一种动态…...
Linux修改主机名称
hostnamectl set-hostname 主机名称 exit 退出登录重新进入即可...
亲测Windows部署Ollama+WebUI可视化
一. Ollama下载 登录Ollama官网(Ollama)点击Download进行下载 如果下载很慢可用以下地址下载: https://github.com/ollama/ollama/releases/download/v0.5.7/OllamaSetup.exe 在DeepSeek官网上,你可以直接点击【model】 到达这个界面之后,…...
Java四大框架深度剖析:MyBatis、Spring、SpringMVC与SpringBoot
目录 前言: 一、MyBatis框架 1. 概述 2. 核心特性 3. 应用场景 4. 示例代码 二、Spring框架 1. 概述 2. 核心模块 3. 应用场景 4. 示例代码 三、SpringMVC框架 1. 概述 2. 核心特性 3. 应用场景 4. 示例代码 四、SpringBoot框架 1. 概述 2. 核心…...
ubuntu部署小笔记-采坑
ubuntu部署小笔记 搭建前端控制端后端前端nginx反向代理使用ubuntu部署nextjs项目问题一 如何访问端口号配置后台运行该进程pm2 问题二 包体过大生产环境下所需文件 问题三 部署在vercel时出现的问题需要魔法访问后端api时,必须使用https协议电脑端访问正常…...
23. AI-大语言模型-DeepSeek简介
文章目录 前言一、DeepSeek是什么1. 简介2. 产品版本1. 类型2. 版本3. 参数规模与模型能力 3. 特征4. 三种访问方式1. 网页端和APP2. DeepSeek API 二、DeepSeek可以做什么1. 应用场景2. 文本生成1. 文本创作2. 摘要与改写3. 结构化生成 3. 自然语言理解与分析1. 语义分析2. 文…...
基于SpringBoot的智慧家政服务平台系统设计与实现的设计与实现(源码+SQL脚本+LW+部署讲解等)
专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…...
Java NIO与传统IO性能对比分析
Java NIO与传统IO性能对比分析 在Java中,I/O(输入输出)操作是开发中最常见的任务之一。传统的I/O方式基于阻塞模型,而Java NIO(New I/O)引入了非阻塞和基于通道(Channel)和缓冲区&a…...
基于YOLO11深度学习的果园苹果检测与计数系统设计与实现【python源码+Pyqt5界面+数据集+训练代码】
《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…...
基于SpringBoot畅购行汽车购票系统
作者简介:✌CSDN新星计划导师、Java领域优质创作者、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流。✌ 主要内容:🌟Java项目、Python项目、前端项目、PHP、ASP.NET、人工智能…...
基于 Spring Boot + 微信小程序的短文写作竞赛管理系统设计与实现(源码+文档)
大家好,今天要和大家聊的是一款基于 Spring Boot 微信小程序的“短文写作竞赛管理系统”的设计与实现。项目源码以及部署相关事宜请联系我,文末附上联系方式。 项目简介 基于 Spring Boot 微信小程序的“短文写作竞赛管理系统”设计与实现的主要使用…...
pytest运行用例的常见方式及参数
标题pytest运行用例方式及参数 用例结构目录 “”" 在最外层目录下执行所有的用例 参数说明: -s:显示用例的打印信息 -v:显示用例执行的详细信息 –alluredir:指定allure报告的路径 –clean-alluredir:清除allure报告的路径 -n:指定并发的进程数 -x:出现一条用…...
Miniconda + VSCode 的Python环境搭建
目录: 安装 VScode 安装 miniconda 在VScode 使用conda虚拟环境 运行Python程序 1.安装 vscode 编辑器 官网链接:Visual Studio Code - Code Editing. Redefined 下载得到:,双击安装。 安装成功…...
图解MySQL【日志】——Redo Log
Redo Log(重做日志) 为什么需要 Redo Log? 1. 崩溃恢复 数据库崩溃时,系统通过 Redo Log 来恢复尚未写入磁盘的数据。Redo Log 记录了所有已提交事务的操作,系统在重启后会重做这些操作,以保证数据不会丢…...
Trae AI驱动开发实战:30分钟从0到1实现Django REST天气服务
目录 一、Trae 安装 1、Trae 介绍 2、Trae 安装 二、项目构建 1、项目背景与技术选型 2、开发环境准备 三、需求分析 1、功能模块设计 2、数据库设计 四、功能实现 1、用户系统开发 2、天气服务实现 3、测试用例编写 五、Trae 体验总结 随着人工智能技术的迅猛发…...
【Linux网络编程】IP协议格式,解包步骤
目录 解析步骤 1.版本字段(大小:4比特位) 2.首部长度(大小:4比特位)(单位:4字节) 🍜细节解释: 3.服务类型(大小:8比特…...
中诺CHINO-E G076大容量录音电话产品使用注意事项
•本机需插上随机配置的电源适配器才能正常工作,切勿插入其它的适配器,以免损坏话机; •当本机出现异常时,请按“Δ/上查”键3秒,屏幕弹出确定恢复,按“设置”键恢复出厂设置; 注:…...
2025最新智能优化算法:改进型雪雁算法(Improved Snow Geese Algorithm, ISGA)求解23个经典函数测试集,MATLAB
一、改进型雪雁算法 雪雁算法(Snow Geese Algorithm,SGA)是2024年提出的一种新型元启发式算法,其灵感来源于雪雁的迁徙行为,特别是它们在迁徙过程中形成的独特“人字形”和“直线”飞行模式。该算法通过模拟雪雁的飞行…...
✨ 索引有哪些缺点以及具体有哪些索引类型
索引的定义与原理 索引是数据库中用于提高数据检索效率的数据结构。它就像是书籍的目录,通过目录可以快速定位到所需内容的页码,而在数据库中,索引可以帮助数据库系统快速找到符合查询条件的数据行,而不必对整个表进行扫描。 其…...
Promptic:Python 中的 LLM 应用开发利器
Promptic 是一个基于 Python 的轻量级库,旨在简化与大型语言模型(LLMs)的交互。它通过提供简洁的装饰器 API 和强大的功能,帮助开发者高效地构建 LLM 应用程序。Promptic 的设计理念是提供 90% 的 LLM 应用开发所需功能,同时保持代码的简洁和易用性。 1. Promptic 的核心…...
本地部署DeepSeek R1大模型
一、安装软件 1.1 安装Ollama 你可以访问Ollama的官方网站https://ollama.com/download,选择适合你操作系统的安装包进行下载。老周这里是Mac系统,所以选择下载macOS系统。 1.2 安装cherry studio 前往官网https://cherry-ai.com/download下载对应操…...
搅局外卖,京东连出三张牌
明牌暗牌,都不如民牌。 作者|古廿 编辑|杨舟 “京东来整顿外卖了”,这一网络热梗正在成为外界对京东近期一系列动作的高度概括。 0佣金、五险一金、品质外卖,京东连出三张牌打破外卖市场的旧秩序。此前这三项分别对应着长期被社会所诟病的…...
【ELK】【Elasticsearch】数据查询方式
1. 简单查询(URI Search) 通过 URL 参数直接进行查询,适合简单的搜索场景。 示例: bash 复制 GET /index_name/_search?qfield_name:search_value 说明: index_name:索引名称。 field_name…...
基于 JavaWeb 的 Spring Boot 网上商城系统设计和实现(源码+文档+部署讲解)
技术范围:SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论…...
C++17中的std::scoped_lock:简化多锁管理的利器
文章目录 1. 为什么需要std::scoped_lock1.1 死锁问题1.2 异常安全性1.3 锁的管理复杂性 2. std::scoped_lock的使用方法2.1 基本语法2.2 支持多种互斥锁类型2.3 自动处理异常 3. std::scoped_lock的优势3.1 避免死锁3.2 简化代码3.3 提供异常安全保证 4. 实际应用场景4.1 数据…...
Linux内核实时机制7 - 实时改造机理 - 软中断优化下
Linux内核实时机制7 - 实时改造机理 - 软中断优化下 https://blog.csdn.net/u010971180/article/details/145722641以下分别以Linux4.19、Linux5.4、Linux5.10、Linux5.15 展开分析,深入社区实时改造机理的软中断优化过程。https://blog.csdn.net/weixin_41028621/article/det…...
计算机网络:应用层 —— 文件传送协议 FTP
文章目录 FTP 是什么?FTP 的应用FTP 的基本工作原理主动模式被动模式 总结 FTP 是什么? 将某台计算机中的文件通过网络传送到可能相很远的另一台计算机中,是一项基本的网络应用,即文件传送。 文件传送协议FTP(File T…...
[笔记.AI]如何判断模型是否通过剪枝、量化、蒸馏生成?
以下摘自与DeepSeek-R1在线联网版的对话 一、基础判断维度 技术类型核心特征验证方法剪枝模型参数减少、结构稀疏化1. 检查模型参数量是否显著小于同类标准模型1 2. 分析权重矩阵稀疏性(如非零参数占比<30%)4量化权重/激活值精度降低、推理速度提升1…...
python: SQLAlchemy (ORM) Simple example using mysql in Ubuntu 24.04
mysql sql script: create table School 表 (SchoolId char(5) NOT NULL comment主鍵primary key,學校編號,SchoolName nvarchar(500) NOT NULL DEFAULT comment 學校名稱,SchoolTelNo varchar(8) NULL DEFAULT comment電話號碼,PRIMARY KEY (SchoolId) #主…...
【前端】【nuxt】nuxt优势(MVP开发),转换SSR与SPA模式
Nuxt.js 核心优势 自动化路由系统 无需手动配置路由:在 pages/ 目录下创建 .vue 文件即可自动生成路由,支持动态路由(如 pages/user/[id].vue → /user/:id)。嵌套路由:通过 parent.vue parent/child.vue 目录结构自动…...
洛谷B3619(B3620)
B3619 10 进制转 x 进制 - 洛谷 B3620 x 进制转 10 进制 - 洛谷 代码区: #include<algorithm> #include<iostream> #include<vector> using namespace std;int main(){int n,x;cin >> n >> x;vector<char> arry;while(n){if(…...
基于springboot+vue的酒店管理系统的设计与实现
开发语言:Java框架:springbootJDK版本:JDK1.8服务器:tomcat7数据库:mysql 5.7(一定要5.7版本)数据库工具:Navicat11开发软件:eclipse/myeclipse/ideaMaven包:…...
android调用ffmpeg解析rtsp协议的视频流
文章目录 一、背景二、解析rtsp数据1、C层功能代码2、jni层的定义3、app层的调用 三、源码下载 一、背景 本demo主要介绍android调用ffmpeg中的接口解析rtsp协议的视频流(不解析音频),得到yuv数据,把yuv转bitmap在android设备上显…...
cursor使用记录
一、如何查看自己登录的是哪个账号 操作路径:Cursor -- 首选项 -- Cursor Setting (有快捷键) 二、状态修改为竖排(默认是横排) 默认如图展示,想要像vscode、idea等等在左侧竖着展示 操作路径࿱…...