userspace rcu

urcu

Posted by icecube on July 31, 2025

用户态rcu锁

点我看清晰大图

读者角度

写者角度-同步

写者角度-异步

urcu_qsbr_gp.ctr

读者状态(ctr)和全局状态(gp.ctr)之间的交互关系图

urcu_qsbr_gp.futex

第一个写者同步等待gp结束,在urcu_qsbr_gp.futex上休眠,
其它写者在局部链表waiters上休眠,等待被第一个写者唤醒。

sychronize_rcu
    // 只有第一个进入gp_waiter队列的写者(也可能是call rcu thread线程)才会wait_gp
    if (urcu_wait_add(&shmctx->gp_waiters, wait) != 0) {
            /* Not first in queue: will be awakened by another thread. */
            urcu_adaptative_busy_wait(wait);  // 其它writers就忙等,后续由第一个writer负责唤醒
            goto gp_end;
    }
    wait_for_readers
       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS)  { // 仍有读者处于临界区未经过QS,准备休眠
         uatomic_store(&urcu_qsbr_gp.futex, -1);    //
         cds_list_for_each_entry(index, input_readers, node)
          uatomic_store(&index->waiting, 1);
      }

      if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS)  // 写者进程休眠,等待宽限期结束
        wait_gp();   
          while (uatomic_load(&shmctx->urcu_qsbr_gp.futex) == -1) {
            if (!futex_noasync(&shmctx->urcu_qsbr_gp.futex, FUTEX_WAIT, -1, NULL, NULL, 0))

    // gp结束,第一个写者唤醒其它写者
    urcu_wake_all_waiters(&waiters);

读者宣告QS状态,更新本地ctr;通过urcu_qsbr_gp.futex唤醒写者

rcu_quiescent_state
	_urcu_qsbr_quiescent_state
		_urcu_qsbr_quiescent_state_update_and_wakeup
			urcu_qsbr_wake_up_gp
				if (uatomic_load(&urcu_qsbr_gp.futex) != -1)  
					return 0;    // 写者已经被其它读者唤醒,返回即可
				uatomic_store(&urcu_qsbr_gp.futex, 0);   // 清零等待标记
				futex_noasync(&urcu_qsbr_gp.futex, FUTEX_WAKE, 1 ,NULL, NULL, 0);  // 唤醒写者

crdp->futex

写者调用call_rcu,注册自己的释放内存的回调函数,并通过crdp->futex唤醒 call rcu thread

call_rcu
	_call_rcu
		cds_wfcq_enqueue(&crdp->cbs_head, &crdp->cbs_tail, &head->next); // 回调加入队列
		wake_call_rcu_thread  // 唤醒回调线程 call rcu thread
			call_rcu_wake_up
				futex_async(&crdp->futex, FUTEX_WAKE, 1, NULL, NULL, 0) < 0)  // crdp.futex

回收线程处理完在crdp->futex上休眠

call_rcu_thread
	if (splice_ret != CDS_WFCQ_RET_SRC_EMPTY) { // 遍历链表并执行回调
		__cds_wfcq_for_each_blocking_safe(&cbs_tmp_head, &cbs_tmp_tail, cbs, cbs_tmp_n) {
			rhp = caa_container_of(cbs, struct rcu_head, next);  
			rhp->func(rhp);
		}
	}

	if (cds_wfcq_empty())
		call_rcu_wait(crdp);  // 回调链表空,负责call rcu thread 线程进入等待
			while (uatomic_load(&crdp->futex) == -1)
                  if (!futex_async(&crdp->futex, FUTEX_WAIT, -1, NULL, NULL, 0))

参考

liburcu