烤面筋 Day 02

烤面筋 Day 02


类型转换

1. 向上转型 vs 向下转型?

这是多态中两个方向完全相反的操作:

向上转型:

  • 定义:将子类的指针或引用转换成父类
  • 安全性:绝对安全。因为子类也是一种父类
  • 用法:这种转换是隐式,不需要显式调用 static_cast ,是实现多态的基础

向下转型:

  • 定义:将父类的指针或引用转换为子类
  • 安全性: 不安全。父类对象不一定是子类。
  • 用法:必须使用显式转换,出于安全保证使用 dynamic_cast

2. static_castdynamic_cast 的区别?

本质区别在于 “转换发生的时机” 和 “安全检查的机制”

static_cast

  • 在编译阶段完成,不进行运行时类型检查
  • 主要用于基本类型转换(如 intfloat),非多态层级结构内的指针/引用转换。

dynamic_cast

  • 在运行阶段利用 RTTI(运行时类型信息) 检查转换是否合法

  • 专门用于处理**多态(Polymorphism)**层级结构中的转换

3. static_cast 在什么场景有风险?

static_cast 的风险主要发生在 向下转型(Downcasting),即把基类指针转换为派生类指针时:

  • 风险点: 如果该基类指针实际上并没有指向那个派生类对象,static_cast 依然会强行转换成功,返回一个地址。

  • 后果: 当你通过这个转换后的指针访问派生类特有的成员变量或虚函数时,会发生未定义行为(Undefined Behavior),通常表现为内存越界访问或程序崩溃,且这种错误在编译期无法察觉。

4. dynamic_cast 的优势是什么?

它的核心优势是 “安全性”

  • 类型安全检查: 它会检查目标类型是否与对象的实际类型匹配。如果转换非法,对于指针会返回 nullptr,对于引用会抛出 std::bad_cast 异常。

  • 支持虚继承转换: 在复杂的深层或菱形继承中,dynamic_cast 能够正确处理指针偏移。


Reactor + 线程池

1. Reactor 模式是怎么实现的?

Reactor 模式本质上是 “I/O 复用 + 派发”,其核心组件包括:

  1. Event Demultiplexer:底层通常是 epoll或者 poll ,监听注册了的一堆文件描述符有哪些动静
  2. Reactor:核心循环。通过多路分离器等待事件发生,一旦有事件(如可读、可写),就将其分发(Dispatch)给对应的 Handler。
  3. Handlers:绑定在事件上的回调函数,负责非阻塞的读写操作。

2. 为什么采用主从 Reactor + 线程池?

主要是为了解决单线程 Reactor 的性能瓶颈:

  • 分工明确:Main Reactor 只负责监听连接(Accept),Sub Reactor 负责处理已连接套接字的 I/O 事件。这避免了因为某个请求的 I/O 耗时过长导致新连接无法进入。

  • 充分利用多核: 多个 Sub Reactor 可以运行在不同的 CPU 核心上,并行处理 I/O。

  • 解耦计算:线程池 将业务计算逻辑从 I/O 线程中剥离。如果业务逻辑耗时较长,它不会阻塞 I/O 事件的分发,从而极大提高了系统的吞吐量。

accept() 仅需内核拷贝 socket 描述符(轻量高频操作),单线程 BossGroup 足以应对万级连接请求。如果是多线程竞争锁,反而会降低效率。IO 事件是重量级操作,涉及数据读写,业务逻辑,可能阻塞(数据库查询)。WorkGroup 多线程并行处理,充分利用多核 CPU。

Ps. 这里的 I/O 线程处理的是 I/O 读写,指的是从网卡驱动缓存拷贝到用户缓存区,或者将用户态数据拷贝到内核发送缓存区。业务计算逻辑则是加工数据(数据读上来以后,程序需要处理它),例如协议解析(二进制流解包成 Protobuf 或 JSON),数据库查询(根据请求差 SQL)等等

3. 从 Reactor 读到的数据如何传递给线程池?能不能实现一下?

是通过任务队列来实现的。Sub Reactor 将 (数据,处理函数)封装成任务丢进队列,线程池里的 worker 线程通过竞争来获取任务。

线程安全的任务队列通常有使用条件变量的阻塞队列和无锁队列。

阻塞队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <mutex>
#include <condition_variable>
#include <queue>

template <typename T>
class BlockQueue {
public:
bool push(const T& val) {
std::lock_guard lock(mtx_);
if (stop_) return false; // 队列已停止,返回false
que_.push(val);
cv_.notify_one();
return true;
}
bool pop(T& val) {
std::unique_lock lock(mtx_);
cv_.wait(lock, [this](){
return stop_ || !que_.empty();
});
if(stop_ && que_.empty()) return false;
val = que_.front();
que_.pop();
return true;
}
void stop() {
{
std::lock_guard lock(mtx_);
stop_ = true;
}
cv_.notify_all();
}
private:
std::condition_variable cv_;
std::queue<T> que_;
std::mutex mtx_;
bool stop_ = false;
};

无锁队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <atomic>
#include <cstdio>

/*
* push: 先检查尾部的一致性, tail->next = new_node, tail = new_node(用 CAS 版本实现)
* pop:先检查头部的一致性,如果队列为空 tail = tail->next, 不为空 head = head->next
*/

template <typename T>
class LockFreeQueue {
public:
LockFreeQueue();
~LockFreeQueue();

void push(const T& value) {
Node* newNode = new Node(value);
while(true) {
Node* current_tail = tail.load();
Node* next = current_tail->next.load();

// 一致性检查
if(tail.load() == current_tail) {
// tail 是真正的尾部
if(next == nullptr) {
// 一致性检查
if(current_tail->next.compare_exchange_weak(next, newNode)) {
// 进行更新
tail.compare_exchange_weak(current_tail, newNode);
return;
}
} else {
tail.compare_exchange_weak(current_tail, next);
}
}
}
}

bool pop(T& result) {
while(true) {
Node* current_head = head.load();
Node* current_tail = tail.load();
Node* next = current_head->next.load();

if(current_head == head.load()) {
// 队列有可能为空
if(current_head == current_tail) {
if(next == nullptr) return false;
// 进行更新 tail = tail->next
tail.compare_exchange_weak(current_tail, next);
} else {
result = next->data;
if(head.compare_exchange_weak(current_head, next)) {
delete current_head;
return true;
}
}
}
}
}

private:
struct Node {
T data;
std::atomic<Node*> next;
Node(const T& val) : data(val), next(nullptr) {}
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
};

4. CPU 密集型和 I/O 密集型怎么判断?

主要是看瓶颈在哪里:$N_{cpu}$ 是 CPU 核心数的意思

CPU 密集型:

  • 特点:大部分时间在做复杂的运算(图像处理、科学计算)
  • 判断: 程序运行时,CPU 占用率极高,但几乎没有磁盘或网络 I/O。
  • 线程池策略: 线程数通常设置为 $N_{CPU} + 1$

I/O 密集型:

  • 特点: 大部分时间在等待磁盘读写、数据库查询、网络响应。
  • 判断: CPU 占用率较低,系统大量时间处于等待状态(Wait)。
  • 线程池策略: 线程数可以设置得大一些,如 $2N_{CPU}$ 甚至更多。

5. 线程池线程数如何确定?

首先针对于 I/O 密集型任务来说,一般设置为内核数 * 2。

当一个线程因为 I/O 阻塞或等待时,CPU 可以切换到另一个线程。避免过多的上下文切换(Context Switch)带来的损耗。

不过在实际生产环境下,我会优先通过 压力测试性能监控(如查看 top 中的 iowait 指标)来动态调整线程数,而不会死守

公式。

6. 手撕一下线程池?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <atomic>

class ThreadPool {
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;

std::mutex queue_mutex;
std::condition_variable condition;
std::atomic<bool> stop;

public:
ThreadPool(size_t num_threads = std::thread::hardware_concurrency())
: stop(false) {
for(size_t i = 0; i < num_threads; ++i) {
workers.emplace_back([this] {
while(true) {
std::function<void()> task;

{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] {
return this->stop || !this->tasks.empty();
});

if(this->stop && this->tasks.empty()) return;

task = std::move(this->tasks.front());
this->tasks.pop();
}

task();
}
});
}
}

~ThreadPool() {
stop = true;
condition.notify_all();

for(std::thread &worker : workers) {
if(worker.joinable()) worker.join();
}
}

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {

using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();

{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop) throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}

condition.notify_one();
return res;
}
};

烤面筋 Day 02
https://dxblacksmith.github.io/2026/01/14/烤面筋_Day02/
作者
DxBlackSmith
发布于
2026年1月14日
许可协议