烤面筋 Day 02
类型转换
1. 向上转型 vs 向下转型?
这是多态中两个方向完全相反的操作:
向上转型:
- 定义:将子类的指针或引用转换成父类
- 安全性:绝对安全。因为子类也是一种父类
- 用法:这种转换是隐式,不需要显式调用
static_cast ,是实现多态的基础
向下转型:
- 定义:将父类的指针或引用转换为子类。
- 安全性: 不安全。父类对象不一定是子类。
- 用法:必须使用显式转换,出于安全保证使用
dynamic_cast
2. static_cast 与 dynamic_cast 的区别?
本质区别在于 “转换发生的时机” 和 “安全检查的机制”
static_cast:
- 在编译阶段完成,不进行运行时类型检查
- 主要用于基本类型转换(如
int 转 float),非多态层级结构内的指针/引用转换。
dynamic_cast:
3. static_cast 在什么场景有风险?
static_cast 的风险主要发生在 向下转型(Downcasting),即把基类指针转换为派生类指针时:
4. dynamic_cast 的优势是什么?
它的核心优势是 “安全性”:
Reactor + 线程池
1. Reactor 模式是怎么实现的?
Reactor 模式本质上是 “I/O 复用 + 派发”,其核心组件包括:
- Event Demultiplexer:底层通常是
epoll或者 poll ,监听注册了的一堆文件描述符有哪些动静
- Reactor:核心循环。通过多路分离器等待事件发生,一旦有事件(如可读、可写),就将其分发(Dispatch)给对应的 Handler。
- Handlers:绑定在事件上的回调函数,负责非阻塞的读写操作。
2. 为什么采用主从 Reactor + 线程池?
主要是为了解决单线程 Reactor 的性能瓶颈:
分工明确:Main Reactor 只负责监听连接(Accept),Sub Reactor 负责处理已连接套接字的 I/O 事件。这避免了因为某个请求的 I/O 耗时过长导致新连接无法进入。
充分利用多核: 多个 Sub Reactor 可以运行在不同的 CPU 核心上,并行处理 I/O。
解耦计算:线程池 将业务计算逻辑从 I/O 线程中剥离。如果业务逻辑耗时较长,它不会阻塞 I/O 事件的分发,从而极大提高了系统的吞吐量。
accept() 仅需内核拷贝 socket 描述符(轻量高频操作),单线程 BossGroup 足以应对万级连接请求。如果是多线程竞争锁,反而会降低效率。IO 事件是重量级操作,涉及数据读写,业务逻辑,可能阻塞(数据库查询)。WorkGroup 多线程并行处理,充分利用多核 CPU。
Ps. 这里的 I/O 线程处理的是 I/O 读写,指的是从网卡驱动缓存拷贝到用户缓存区,或者将用户态数据拷贝到内核发送缓存区。业务计算逻辑则是加工数据(数据读上来以后,程序需要处理它),例如协议解析(二进制流解包成 Protobuf 或 JSON),数据库查询(根据请求差 SQL)等等
3. 从 Reactor 读到的数据如何传递给线程池?能不能实现一下?
是通过任务队列来实现的。Sub Reactor 将 (数据,处理函数)封装成任务丢进队列,线程池里的 worker 线程通过竞争来获取任务。
线程安全的任务队列通常有使用条件变量的阻塞队列和无锁队列。
阻塞队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #include <mutex> #include <condition_variable> #include <queue>
template <typename T> class BlockQueue { public: bool push(const T& val) { std::lock_guard lock(mtx_); if (stop_) return false; que_.push(val); cv_.notify_one(); return true; } bool pop(T& val) { std::unique_lock lock(mtx_); cv_.wait(lock, [this](){ return stop_ || !que_.empty(); }); if(stop_ && que_.empty()) return false; val = que_.front(); que_.pop(); return true; } void stop() { { std::lock_guard lock(mtx_); stop_ = true; } cv_.notify_all(); } private: std::condition_variable cv_; std::queue<T> que_; std::mutex mtx_; bool stop_ = false; };
|
无锁队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| #include <atomic> #include <cstdio>
template <typename T> class LockFreeQueue { public: LockFreeQueue(); ~LockFreeQueue();
void push(const T& value) { Node* newNode = new Node(value); while(true) { Node* current_tail = tail.load(); Node* next = current_tail->next.load(); if(tail.load() == current_tail) { if(next == nullptr) { if(current_tail->next.compare_exchange_weak(next, newNode)) { tail.compare_exchange_weak(current_tail, newNode); return; } } else { tail.compare_exchange_weak(current_tail, next); } } } }
bool pop(T& result) { while(true) { Node* current_head = head.load(); Node* current_tail = tail.load(); Node* next = current_head->next.load();
if(current_head == head.load()) { if(current_head == current_tail) { if(next == nullptr) return false; tail.compare_exchange_weak(current_tail, next); } else { result = next->data; if(head.compare_exchange_weak(current_head, next)) { delete current_head; return true; } } } } }
private: struct Node { T data; std::atomic<Node*> next; Node(const T& val) : data(val), next(nullptr) {} }; std::atomic<Node*> head; std::atomic<Node*> tail; };
|
4. CPU 密集型和 I/O 密集型怎么判断?
主要是看瓶颈在哪里:$N_{cpu}$ 是 CPU 核心数的意思
CPU 密集型:
- 特点:大部分时间在做复杂的运算(图像处理、科学计算)
- 判断: 程序运行时,CPU 占用率极高,但几乎没有磁盘或网络 I/O。
- 线程池策略: 线程数通常设置为 $N_{CPU} + 1$。
I/O 密集型:
- 特点: 大部分时间在等待磁盘读写、数据库查询、网络响应。
- 判断: CPU 占用率较低,系统大量时间处于等待状态(Wait)。
- 线程池策略: 线程数可以设置得大一些,如 $2N_{CPU}$ 甚至更多。
5. 线程池线程数如何确定?
首先针对于 I/O 密集型任务来说,一般设置为内核数 * 2。
当一个线程因为 I/O 阻塞或等待时,CPU 可以切换到另一个线程。避免过多的上下文切换(Context Switch)带来的损耗。
不过在实际生产环境下,我会优先通过 压力测试 和 性能监控(如查看 top 中的 iowait 指标)来动态调整线程数,而不会死守
公式。
6. 手撕一下线程池?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <atomic>
class ThreadPool { private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; std::atomic<bool> stop; public: ThreadPool(size_t num_threads = std::thread::hardware_concurrency()) : stop(false) { for(size_t i = 0; i < num_threads; ++i) { workers.emplace_back([this] { while(true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } }); } } ~ThreadPool() { stop = true; condition.notify_all(); for(std::thread &worker : workers) { if(worker.joinable()) worker.join(); } } template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if(stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task](){ (*task)(); }); } condition.notify_one(); return res; } };
|