20 min read

内核同步机制1

1、原子操作

原子操作的核心和是完成命令整个操作不会被打断。

原子操作(atomic operation),不可分割的操作。其通过原子变量来实现,以保证单个CPU周期内,读写该变量,不能被打断,进而判断该变量的值,来解决并发引起的互斥。

由于原子操作的原理,所以在功能实现上需要硬件平台的支持,以x86平台为例,在执行原子操作的时候会在指令前增加一个LOCK_PREFIX前缀,

static __always_inline void arch_atomic_sub(int i, atomic_t *v)
{
	asm volatile(LOCK_PREFIX "subl %1,%0"
		     : "+m" (v->counter)
		     : "ir" (i) : "memory");
}

从硬件层面来看,LOCK_PREFIX前缀的工作原理如下:

  1. 当处理器遇到带有LOCK_PREFIX前缀的指令时,它会发出一个特殊的锁定信号(如总线锁定信号)。
  2. 这个锁定信号会通知其他处理器或设备暂时停止访问共享内存资源,直到当前指令完成。
  3. 在锁定期间,处理器会执行带有LOCK_PREFIX前缀的指令,对内存中的数据进行修改。
  4. 当指令完成后,处理器会释放锁定信号,允许其他处理器或设备重新访问共享内存资源。

linux中提供了两套api实现原子操作,下边具体介绍一下这两套api。

1.1-原子整数操作

原子整数操作针对的是整数类型的变量,例如int或long。这些操作通常涉及到对整个整数进行修改,如加、减、递增、递减等。原子整数操作确保在多线程或多处理器环境中,对整数的修改不会被其他线程或处理器干扰,从而避免了数据竞争和不一致的问题。

原子整数是一种特殊的数据结构,即通过atomic_t申明。

typedef struct {
	int counter;
} atomic_t;

可以看到,atomic_t声明的函数就是一个普通的int整数,在这里主要是为了在申明的时候与普通int进行区分,但是在命名原理上是没有区别的。

申明和初始化的方式为

atomic_t v; /*定义v */
atomic_t u=ATOMIC_INIT(0); /*定义u并把它初始化为0*/

这里需要进一步分析的事ATOMIC_INIT,很奇怪的是,看起来原子操作的初始化也就是普通的声明变量赋值,那是不是原子操作的区别是什么呢?

#define ATOMIC_INIT(i)	{ (i) }

(上述的操作是在v2.6的书里给出的,怀疑在v6.6上已经被替换成了atomic_set,但是感觉也不太对,内核中还是很有很多处应用的,另外一种可能就是原子变量的初始化其实也不需要原子操作,毕竟原形还是一个int型,在初始化的时候还是没开始同步操作的,所以也不会发生问题)

其他的加减操作api如下

static __always_inline int arch_atomic_read(const atomic_t *v)
{
	/*
	 * Note for KASAN: we deliberately don't use READ_ONCE_NOCHECK() here,
	 * it's non-inlined function that increases binary size and stack usage.
	 */
	return __READ_ONCE((v)->counter);
}

static __always_inline void arch_atomic_set(atomic_t *v, int i)
{
	__WRITE_ONCE(v->counter, i);
}

static __always_inline void arch_atomic_add(int i, atomic_t *v)
{
	asm volatile(LOCK_PREFIX "addl %1,%0"
		     : "+m" (v->counter)
		     : "ir" (i) : "memory");
}

static __always_inline void arch_atomic_sub(int i, atomic_t *v)
{
	asm volatile(LOCK_PREFIX "subl %1,%0"
		     : "+m" (v->counter)
		     : "ir" (i) : "memory");
}

这里的定义可以看到只是针对具体arch的底层api,实际在内核中还会进一步封装,例如

atomic_add->raw_atomic_add->arch_atomic_add

raw_atomic_sub->arch_atomic_sub->arch_atomic_sub

atomic_set->raw_atomic_set->arch_atomic_set

atomic_read->raw_atomic_read->arch_atomic_read

1.2 原子位操作

原子位操作针对的是单个位或位图(bitmaps)。这些操作通常涉及到对特定位进行设置、清除、测试等。原子位操作确保在多线程或多处理器环境中,对位的修改不会影响同一变量中的其他位,从而避免了数据竞争和不一致的问题。

位操作常见的api如下,其中非原子性的

void set_bit(int nr, void p)	// 将 p 地址的第 nr 位置 1。
void clear_bit(int nr,void p)	// 将 p 地址的第 nr 位清零。
void change_bit(int nr, void p) //	将 p 地址的第 nr 位进行翻转。
int test_bit(int nr, void p)	// 获取 p址的第 nr 位的值。
int test_and_set_bit(int nr, void p)	// 将 p 地址的第 nr 位置 1,并且返回 nr 位原来的值。
int test_and_clear_bit(int nr, void p)	// 将 p 地址的第 nr 位清零,并且返回 nr 位原来的值。
int test_and_change_bit(int nr, void *p)	// 将 p 地址的第 nr 位翻转,并且返回 nr 位原来的值

但是内核中也存在一些未原子性的位操作,对于非原子性的,其中以set_bit为例

static inline void set_bit(int nr, void *addr)
{
	asm("btsl %1,%0" : "+m" (*(u32 *)addr) : "Ir" (nr));
}

而原子性的操作如下,可以看到其中LOCK_PREFIX则是之前提过的原子操作的前缀

set_bit->arch_set_bit->raw_atomic_long_or->arch_atomic64_or,假如我们关注64位操作系统,那么最终的原子性位操作如下

static __always_inline void
raw_atomic_long_or(long i, atomic_long_t *v)
{
#ifdef CONFIG_64BIT
	raw_atomic64_or(i, v);
#else
	raw_atomic_or(i, v);
#endif
}

static __always_inline void arch_atomic64_or(s64 i, atomic64_t *v)
{
	asm volatile(LOCK_PREFIX "orq %1,%0"
			: "+m" (v->counter)
			: "er" (i)
			: "memory");
}

2、spin_lock

spin_lock也就是自旋锁,是内核中比较常见的也很常用的锁,他同一时间内只允许一个线程持有该锁。而且他后持锁的线程会一直陷入自旋等待解锁,这也就意味着该进程会处于running状态持续,因此这就会导致进程持续占用CPU,故我们在使用自旋锁时,应该保证锁不会长时间持有。但是因此自旋锁也有了其优势,由于没有陷入睡眠,当进程拿到锁时,无需再次调度并且节约了上下文切换的时间。

2.1 自旋锁的使用

自选锁的使用很简单:

  1. 首先定义一个自旋锁变量spinlock_t a

  2. 然后调用spin_lock_init(&a)初始化这个spinlock

  3. 加锁的时候调用spin_lock,解锁的时候spin_unlock

但是值得注意的事,除了基础的加/解锁外,如下三个变形的api

2.1.1-spin_lock_irq(spinlock_t *lock)

spin_lock_irq()函数用于获取自旋锁,同时禁用本地中断。在获取锁之前,该函数会关闭本地中断,以防止在持有锁的情况下发生中断。这样可以避免死锁和数据竞争。当调用spin_unlock_irq(spinlock_t *lock)释放锁时,本地中断会被重新启用。

2.1.2-spin_lock_irqsave(spinlock_t *lock, unsigned long flags)

spin_lock_irqsave()函数与spin_lock_irq()类似,但它还会保存当前的中断状态。在获取锁之前,该函数会关闭本地中断并将当前的中断状态保存在flags变量中。当调用spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)释放锁时,中断会被恢复到之前保存的状态。

spin_lock_irqsave()在以下情况下可能更为合适:

如果在获取锁之前,中断已经被禁用,那么在释放锁时,中断应该保持禁用状态。

如果在获取锁之前,中断已经被启用,那么在释放锁时,中断应该被重新启用。

2.1.3-spin_lock_bh(spinlock_t *lock)

spin_lock_bh()函数用于获取自旋锁,同时禁用软中断。在获取锁之前,该函数会关闭本地软中断,以防止在持有锁的情况下发生软中断。这样可以避免死锁和数据竞争。当调用spin_unlock_bh(spinlock_t *lock)释放锁时,本地软中断会被重新启用。

2.2-自旋锁的实现

自旋锁的核心是:

锁的实现基于一个32位的原子变量。锁的状态由该变量的高16位和低16位表示。当锁被持有时,高16位和低16位不相等;当锁未被持有时,高16位和低16位相等。

(目前内核主要是使用的队列自旋锁,也就是开启了CONFIG_QUEUED_RWLOCKS,然后使用的路径是qspinlock.c的代码)

其中2.2.1和2.2.2分析的是基于票号的自旋锁。2.2.3是基于队列的自旋锁。

2.2.1-加锁

spin_lock首先被封装成了raw_spin_lock->_raw_spin_lock->__raw_spin_lock

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
	preempt_disable();
	spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
	LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

这里可以看到,

首先会禁止内核抢占

然后再spin_acquire->lock_acquire_exclusivespin_acquire()函数的主要作用是在获取自旋锁之前,将锁的相关信息(例如调用者的返回地址、锁的类型等)注册到Lockdep子系统。Lockdep子系统会根据这些信息构建一个锁依赖图,用于检测潜在的死锁问题。

最后开始尝试拿锁do_raw_spin_trylock,这个函数里是为了直接拿锁,不会执行自旋。

static __always_inline bool arch_spin_trylock(arch_spinlock_t *lock)
{
	u32 old = atomic_read(lock);

	if ((old >> 16) != (old & 0xffff))
		return false;

	return atomic_try_cmpxchg(lock, &old, old + (1<<16)); /* SC, for RCsc */
}
  1. 使用atomic_read(lock)函数读取锁的值到变量old。这是一个原子操作,确保在多处理器环境中的数据一致性。
  2. 检查锁的值的高16位是否等于低16位。如果不相等,表示锁已经被持有(这种表示方法是特定于这个架构的实现),函数返回false
  3. 如果锁未被持有,使用atomic_try_cmpxchg(lock, &old, old + (1<<16))函数尝试将锁的值高16位加1。atomic_try_cmpxchg()函数首先比较锁的当前值是否等于old,如果相等,则将锁的值设置为old + (1<<16)。如果比较和交换操作成功,则返回true,表示成功获取锁;否则,返回false,表示锁已被其他处理器或线程获取。

如果上来加锁失败了,那最终会调用到核心函数do_raw_spin_lock,并调用到arch_spin_lock

static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{
	u32 val = atomic_fetch_add(1<<16, lock);
	u16 ticket = val >> 16;

	if (ticket == (u16)val)
		return;

	/*
	 * atomic_cond_read_acquire() is RCpc, but rather than defining a
	 * custom cond_read_rcsc() here we just emit a full fence.  We only
	 * need the prior reads before subsequent writes ordering from
	 * smb_mb(), but as atomic_cond_read_acquire() just emits reads and we
	 * have no outstanding writes due to the atomic_fetch_add() the extra
	 * orderings are free.
	 */
	atomic_cond_read_acquire(lock, ticket == (u16)VAL);
	smp_mb();
}
  1. 使用atomic_fetch_add(1<<16, lock)函数原子地将锁的值增加2^16,并将原始值存储在变量val中。这是一个原子操作,确保在多处理器环境中的数据一致性。
  2. 计算ticket值,即val的高16位。ticket值表示当前处理器或线程的“票号”。
  3. 检查ticket值是否等于val的低16位。如果相等,表示锁未被持有,函数返回。
  4. 如果锁已被持有,使用atomic_cond_read_acquire(lock, ticket == (u16)VAL)函数自旋等待,直到锁的低16位等于ticket值。这意味着锁已被释放,当前处理器或线程可以获取锁。
  5. 调用smp_mb()函数在内存屏障上插入一个全屏障。这确保了在获取锁之后,所有之前的读操作在之后的写操作之前完成。这有助于确保内存访问的正确顺序。

其中自旋的核心是

#define smp_cond_load_relaxed(ptr, cond_expr) ({		\
	typeof(ptr) __PTR = (ptr);				\
	__unqual_scalar_typeof(*ptr) VAL;			\
	for (;;) {						\
		VAL = READ_ONCE(*__PTR);			\
		if (cond_expr)					\
			break;					\
		cpu_relax();					\
	}							\
	(typeof(*ptr))VAL;					\
})
  1. 定义一个指针变量__PTR,并将其初始化为传入的ptr参数。

  2. 定义一个变量VAL,其类型与*ptr相同。

  3. 使用一个无限循环(for (;;))来实现自旋等待。在循环中,执行以下操作:

    a. 使用READ_ONCE(*__PTR)宏读取*ptr的值并将其存储在VAL中。READ_ONCE()宏确保编译器不会优化掉这个读操作。

    b. 检查给定的条件表达式cond_expr。如果条件满足,跳出循环。

    c. 调用cpu_relax()函数。这个函数用于在自旋等待期间通知处理器,以便它可以执行一些优化,例如降低功耗或让出执行资源给其他处理器。

  4. 返回VAL的值。

(这里有个问题,看起来这里只会执行循环等待,并没有持锁,那是不是最终持锁还是在do_raw_spin_trylock中实现的)

2.2.2-解锁

spin_unlock->raw_spin_unlock->_raw_spin_unlock->__raw_spin_unlock

最终调用到

static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
	spin_release(&lock->dep_map, _RET_IP_);
	do_raw_spin_unlock(lock);
	preempt_enable();
}
  1. spin_release主要是在释放锁时检测潜在的锁使用问题,例如死锁

  2. 解锁核心函数。

  3. 使能抢占

do_raw_spin_unlock

static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock)
{
	mmiowb_spin_unlock();
	arch_spin_unlock(&lock->raw_lock);
	__release(lock);
}

最终调用到smp_store_release把低地址也加1,这样就和持锁的时候高地址+1(atomic_try_cmpxchg(lock, &old, old + (1<<16)))相等了,从而达到解锁的操作。

static __always_inline void arch_spin_unlock(arch_spinlock_t *lock)
{
	u16 *ptr = (u16 *)lock + IS_ENABLED(CONFIG_CPU_BIG_ENDIAN);
	u32 val = atomic_read(lock);

	smp_store_release(ptr, (u16)val + 1);
}

2.2.3-队列自旋锁

基于队列的自旋锁(qspinlock)的实现相比于传统的自旋锁,更加公平且可扩展。当一个线程尝试获取已被其他线程持有的自旋锁时,它会在一个队列中等待,而不是不断地尝试获取锁。这样可以减少锁争用时的CPU使用率,并提高多核系统上的性能。

其中加解锁使用的是

#define arch_spin_is_locked(l)		queued_spin_is_locked(l)
#define arch_spin_is_contended(l)	queued_spin_is_contended(l)
#define arch_spin_value_unlocked(l)	queued_spin_value_unlocked(l)
#define arch_spin_lock(l)		queued_spin_lock(l)
#define arch_spin_trylock(l)		queued_spin_trylock(l)
#define arch_spin_unlock(l)		queued_spin_unlock(l)

旋锁的等待最终会调用到queued_spin_lock_slowpath

具体可以参考下深入理解Linux内核之自旋锁,简单来说,在会把锁加入到一个链表里,然后按照链表顺序先后加锁。

queued_spin_lock_slowpath函数中,基于队列的自旋锁排队加锁的实现主要依赖于MCS(Mellor-Crummey and Scott)锁算法。MCS锁是一种公平的、可扩展的自旋锁算法,它使用链表结构将等待锁的线程组织在一起,确保线程按照先后顺序获取锁。

以下是代码如何实现先后进入的spinlock进行排队加锁的:

  1. 当线程尝试获取锁失败时(慢速路径),它会进入基于队列的等待(queue标签)。在这个阶段,线程会初始化一个MCS节点,用于在等待队列中表示自己。
pv_queue:
node = this_cpu_ptr(&qnodes[0].mcs);
idx = node->count++;
tail = encode_tail(smp_processor_id(), idx);
  1. 线程将自己的MCS节点加入等待队列。它首先将节点的locked字段设置为0(表示锁未被获取),next字段设置为NULL。然后将节点的尾部信息(tail)更新到锁的状态中,表示自己已经加入等待队列。
node = grab_mcs_node(node, idx);
barrier();
node->locked = 0;
node->next = NULL;
pv_init_node(node);
old = xchg_tail(lock, tail);
  1. 如果等待队列中有前一个节点(old & _Q_TAIL_MASK),线程会将自己的MCS节点链接到前一个节点的next字段中。这样就形成了一个链表结构,表示等待锁的线程顺序。
if (old & _Q_TAIL_MASK) {
	prev = decode_tail(old);
	WRITE_ONCE(prev->next, node);
}
  1. 线程等待前一个节点释放锁。当前一个节点释放锁时,它会将当前线程的MCS节点的locked字段设置为1。当前线程会使用arch_mcs_spin_lock_contended函数自旋等待,直到locked字段变为1。
if (old & _Q_TAIL_MASK) {
	pv_wait_node(node, prev);
	arch_mcs_spin_lock_contended(&node->locked);
}
  1. 当前线程获取锁,并更新锁的状态。如果当前线程是等待队列的头部,它会将锁的状态设置为_Q_LOCKED_VAL。否则,它会将锁的状态设置为_Q_LOCKED_MASK
if ((val & _Q_TAIL_MASK) == tail) {
	if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
		goto release;
}
set_locked(lock);
  1. 当前线程释放锁,并通知等待队列中的下一个节点。它会将下一个节点的locked字段设置为1,表示下一个节点可以获取锁。
next = smp_cond_load_relaxed(&node->next, (VAL));
arch_mcs_spin_unlock_contended(&next->locked);

通过这种基于队列的自旋锁实现,线程会按照先后顺序获取锁,确保公平性和可扩展性。当一个线程尝试获取已被其他线程持有的自旋锁时,它会在一个链表结构的等待队列中等待,而不是不断地尝试获取锁。这样可以减少锁争用时的CPU使用率,并提高多核系统上的性能。

3、读写锁

spin_lock同一时间内,只允许一个进程持有该锁,这个粒度比较大,因此内核中提供了另外一个机制读写锁。

  1. 无锁状态:当没有线程持有读写锁时,任何线程都可以获取读锁或写锁。在这种情况下,不会发生阻塞。
  2. 读锁持有状态:当一个或多个线程持有读锁时,其他线程可以继续获取读锁,但尝试获取写锁的线程将被阻塞。这是因为允许多个线程同时读取共享资源,但不允许在读取过程中进行写入。因此,当有线程持有读锁时,写锁的获取将被阻塞。
  3. 写锁持有状态:当一个线程持有写锁时,其他尝试获取读锁或写锁的线程都将被阻塞。

3.1-读写锁的使用

  1. 声明

    rwlock_t a

  2. 初始化

    rwlock_init(&a)

  3. 加/锁

    write_lock/read_lock->write_unlock/read_unlock

    另外,和自旋锁一样,也是有三个变型,并且含义也是相同的

    • xxx_lock_irqsave
    • xxx_lock_irq
    • xxx_lock_bg

3.2-读写锁的实现

3.2.1-加锁

首先是写锁:write_lock->_raw_write_lock->__raw_write_lock

static inline void __raw_write_lock(rwlock_t *lock)
{
	preempt_disable();
	rwlock_acquire(&lock->dep_map, 0, 0, _RET_IP_);
	LOCK_CONTENDED(lock, do_raw_write_trylock, do_raw_write_lock);
}

和spin_lock一样,其中do_raw_write_trylock是用来加锁的,而do_raw_write_lock用来自旋。

其中do_raw_write_trylock->arch_write_trylock->queued_write_trylock

static inline int queued_write_trylock(struct qrwlock *lock)
{
	int cnts;

	cnts = atomic_read(&lock->cnts);
	if (unlikely(cnts))
		return 0;

	return likely(atomic_try_cmpxchg_acquire(&lock->cnts, &cnts,
				_QW_LOCKED));
}
  1. int cnts;:定义一个整数变量cnts,用于存储读写锁的当前状态。
  2. cnts = atomic_read(&lock->cnts);:使用atomic_read函数读取lock结构体中的cnts原子变量的值,并将其赋值给cnts变量。cnts变量中的值表示当前持有读锁的线程数量以及写锁的状态。
  3. if (unlikely(cnts)) return 0;:这行代码判断cnts变量的值。如果cnts不为0,表示存在持有读锁的线程或者写锁已被获取,此时不能获取写锁,因此函数返回0。unlikely宏表示这种情况发生的概率较低,用于优化编译器的指令调度。
  4. return likely(atomic_try_cmpxchg_acquire(&lock->cnts, &cnts, _QW_LOCKED));:这行代码尝试使用atomic_try_cmpxchg_acquire函数原子地比较并交换lock->cnts的值。期望的旧值为cnts(即0),新值为_QW_LOCKED(表示写锁已被获取)。如果比较并交换成功,表示成功获取到写锁,函数返回非零值;否则返回0。`l

do_raw_write_lock->arch_write_lock->queued_write_lock->queued_write_lock_slowpath

void __lockfunc queued_write_lock_slowpath(struct qrwlock *lock)
{
	int cnts;

	trace_contention_begin(lock, LCB_F_SPIN | LCB_F_WRITE);

	/* Put the writer into the wait queue */
	arch_spin_lock(&lock->wait_lock);

	/* Try to acquire the lock directly if no reader is present */
	if (!(cnts = atomic_read(&lock->cnts)) &&
	    atomic_try_cmpxchg_acquire(&lock->cnts, &cnts, _QW_LOCKED))
		goto unlock;

	/* Set the waiting flag to notify readers that a writer is pending */
	atomic_or(_QW_WAITING, &lock->cnts);

	/* When no more readers or writers, set the locked flag */
	do {
		cnts = atomic_cond_read_relaxed(&lock->cnts, VAL == _QW_WAITING);
	} while (!atomic_try_cmpxchg_acquire(&lock->cnts, &cnts, _QW_LOCKED));
unlock:
	arch_spin_unlock(&lock->wait_lock);

	trace_contention_end(lock, 0);
}

先把状态或上一个写锁等待加锁的状态,再通过do_while进行自旋,如果没有读写锁的时候,并且将状态标记为_QW_LOCKED,标记为一个写锁状态。

可以看到在写锁的加锁中,也是调用了arch_spin_lock来实现的,是这里只是用来保护尝试加写锁的时候的同步性。

下边是读锁。

首先是快速路径

static inline int queued_read_trylock(struct qrwlock *lock)
{
	int cnts;

	cnts = atomic_read(&lock->cnts);
	if (likely(!(cnts & _QW_WMASK))) {
		cnts = (u32)atomic_add_return_acquire(_QR_BIAS, &lock->cnts);
		if (likely(!(cnts & _QW_WMASK)))
			return 1;
		atomic_sub(_QR_BIAS, &lock->cnts);
	}
	return 0;
}

先读状态,如果没有写锁(包括写等待以及写持锁),则增加一个读者,不然的则快速解锁失败,进入慢速路径,进行等待。

void __lockfunc queued_read_lock_slowpath(struct qrwlock *lock)
{
	/*
	 * Readers come here when they cannot get the lock without waiting
	 */
	if (unlikely(in_interrupt())) {
		/*
		 * Readers in interrupt context will get the lock immediately
		 * if the writer is just waiting (not holding the lock yet),
		 * so spin with ACQUIRE semantics until the lock is available
		 * without waiting in the queue.
		 */
		atomic_cond_read_acquire(&lock->cnts, !(VAL & _QW_LOCKED));
		return;
	}
	atomic_sub(_QR_BIAS, &lock->cnts);

	trace_contention_begin(lock, LCB_F_SPIN | LCB_F_READ);

	/*
	 * Put the reader into the wait queue
	 */
	arch_spin_lock(&lock->wait_lock);
	atomic_add(_QR_BIAS, &lock->cnts);

	/*
	 * The ACQUIRE semantics of the following spinning code ensure
	 * that accesses can't leak upwards out of our subsequent critical
	 * section in the case that the lock is currently held for write.
	 */
	atomic_cond_read_acquire(&lock->cnts, !(VAL & _QW_LOCKED));

	/*
	 * Signal the next one in queue to become queue head
	 */
	arch_spin_unlock(&lock->wait_lock);

	trace_contention_end(lock, 0);
}

其中使用atomic_cond_read_acquire自旋等待,直到写锁被释放。

3.2.2-解锁

读写锁的解锁很简单。写锁的解锁是清空wlocked位(可能是wlocked是cnt的某个偏移什么的吧),读锁则是给读者减1。

static inline void queued_write_unlock(struct qrwlock *lock)
{
	smp_store_release(&lock->wlocked, 0);
}

static inline void queued_read_unlock(struct qrwlock *lock)
{
	/*
	 * Atomically decrement the reader count
	 */
	(void)atomic_sub_return_release(_QR_BIAS, &lock->cnts);
}
typedef struct qrwlock {
	union {
		atomic_t cnts;
		struct {
#ifdef __LITTLE_ENDIAN
			u8 wlocked;	/* Locked for write? */
			u8 __lstate[3];
#else
			u8 __lstate[3];
			u8 wlocked;	/* Locked for write? */
#endif
		};
	};
	arch_spinlock_t		wait_lock;
} arch_rwlock_t;

queued_write_unlock函数通过将lock->wlocked设置为0来实现写锁的解锁。这是因为在这个特定的qrwlock结构中,使用了一个联合体(union),其中atomic_t cntsstruct共享相同的内存空间。因此,在这个实现中,lock->wlocked实际上是用于表示写锁状态的。

在这个qrwlock结构中,lock->cnts变量包含了读者数量、写者等待状态和写锁状态。lock->wlocked字段是一个8位无符号整数,表示写锁是否已经被获取(1表示已获取,0表示未获取)。而lock->__lstate则表示剩余的状态信息,包括读者数量和写者等待状态。

回到queued_write_unlock函数,通过将lock->wlocked设置为0,表示写锁已经被释放。这样,在写锁被解锁后,其他线程可以尝试获取读锁或写锁。