"); //-->
本文分享自天翼云开发者社区《Rust多线程:Worker 结构体与线程池中任务的传递机制》,作者:l****n
Rust多线程:Worker 结构体与线程池中任务的传递机制**在实现一个多线程的 Web 服务器时,我们会遇到一个问题:如何在创建线程之后让它们在没有任务时保持等待状态,并且在任务到来时可以立即执行。这是一个典型的线程池设计问题。在 Rust 中,我们需要通过自定义设计来实现这个功能,因为标准库中的 **thread::spawn 并不直接支持这种用法。
问题描述**Rust 的 **thread::spawn 方法会立即执行传入的闭包。如果我们想要在线程池中创建线程并让它们等待任务(即在创建时不执行任何任务),我们就需要自己设计一种机制,能够在稍后将任务传递给这些已经创建好的线程。
解决方案:引入 Worker 结构体**为了解决这个问题,我们引入了一个 **Worker 结构体来管理线程池中的每个线程。Worker 的作用类似于一个工人,它等待任务的到来并在接收到任务时执行。
1. Worker 结构体的定义Worker 结构体包含两个字段:
id:用于标识每个 Worker。
thread:存放线程的 JoinHandle<()>,它是由 thread::spawn 返回的。
代码如下:
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}2. 创建 Worker 实例**为了让 **Worker 在没有任务时处于等待状态,我们可以在 Worker::new 函数中使用 thread::spawn 创建线程,并传入一个空的闭包:
impl Worker {
fn new(id: usize) -> Worker { let thread = thread::spawn(|| {});
Worker { id, thread }
}
}**在这里,我们创建了一个 **Worker 实例,每个 Worker 都会启动一个线程。但这个线程目前还什么都不做,因为我们传递给 spawn 的闭包是空的。
3. 将 Worker 集成到线程池中**接下来,我们修改 **ThreadPool 的实现,使其存储 Worker 的实例而不是直接存储线程的 JoinHandle<()>。在 ThreadPool::new 中,我们使用一个 for 循环创建多个 Worker 实例,并将它们存储在一个 Vec<Worker> 中:
pub struct ThreadPool {
workers: Vec<Worker>,
}impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
}**这样,我们就为线程池创建了一个由多个 **Worker 组成的集合。每个 Worker 都有一个唯一的 ID,并且都启动了一个线程,虽然这些线程目前还没有执行任何有用的任务。
向 Worker 发送任务现在,我们解决了创建线程并让它们等待任务的问题。接下来,我们需要设计一个机制,使得线程池能够在任务到来时将任务发送给等待中的线程。
1. 使用信道传递任务**在 Rust 中,信道(channel)是一种非常适合在线程之间传递数据的工具。我们可以使用一个信道来传递任务。线程池会创建一个信道的发送端,每个 **Worker 会拥有信道的接收端。任务通过信道从线程池传递到 Worker,再由 Worker 中的线程执行。
use std::{sync::mpsc, thread};pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}struct Job;impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
}2. Worker 处理任务**为了让 **Worker 能够处理任务,我们将信道的接收端传递给每个 Worker 的线程。线程会不断地从信道中接收任务,并执行这些任务。
impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(move || {
receiver;
});
Worker { id, thread }
}
}**不过在这段代码中,存在一个问题:信道的接收端 **receiver 被移交给了第一个 Worker,导致无法将其传递给其他 Worker。
3. 使用 Arc 和 Mutex 共享接收端**为了解决这个问题,我们需要使用 **Arc<Mutex<T>> 来共享信道的接收端,这样所有的 Worker 都可以安全地从同一个信道接收任务:
use std::{sync::{mpsc, Arc, Mutex}, thread};type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{ let job = Box::new(f); self.sender.send(job).unwrap();
}
}impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job();
});
Worker { id, thread }
}
}**在 **Worker::new 中,线程会不断地尝试获取锁来接收任务,并在收到任务后执行。这里我们使用了 Arc 来共享接收端,使用 Mutex 来确保一次只有一个 Worker 能够接收任务。
type Job = Box<dyn FnOnce() + Send + 'static>;**这行代码定义了一个类型别名 **Job。它代表了一个特定的任务类型:
Box<dyn FnOnce() + Send + 'static> 是一个动态分发的闭包(或函数),其具体实现类型在编译时不确定。Box 是一个堆分配的智能指针,用于将闭包存储在堆上。
dyn FnOnce() 表示这个闭包实现了 FnOnce trait,可以被调用一次。
Send 表示这个闭包可以在线程之间安全地传递。
'static 表示闭包的生命周期是整个程序的生命周期,确保闭包在多个线程中可以安全使用。
**这个方法的功能是将一个新的任务(闭包)添加到线程池的任务队列中,以供线程池中的工作线程执行。下面是对 **F: FnOnce() + Send + 'static 的解释:
F: FnOnce() + Send + 'static
** 是一个泛型约束,表示必须是一个实现了 FnOnce、Send和 'static的闭包类型。**
FnOnce() 确保闭包可以被调用一次。
Send 确保闭包可以安全地在线程之间传递。
'static 确保闭包的生命周期足够长,可以在整个程序运行期间有效。
**在 **execute 方法中,你将传入的闭包 f 转换成 Job 类型(即 Box<dyn FnOnce() + Send + 'static>),然后通过 self.sender 将其发送到任务队列中。这使得线程池的工作线程可以从队列中接收并执行这些任务。
总结**通过引入 **Worker 结构体并使用信道进行任务传递,我们成功地实现了一个可以延迟分配任务的线程池。每个 Worker 都是在创建时启动的,但它们会等待任务的到来,只有在接收到任务后才会开始执行。这种设计不仅提高了服务器的吞吐量,还确保了线程资源的高效利用。
专栏文章内容及配图由作者撰写发布,仅供工程师学习之用,如有侵权或者其他违规问题,请联系本站处理。 联系我们
相关推荐
由Memfault驱动的Nordic nRF Cloud荣获移动突破奖所颁发之年度云计算创新奖
台积电AI产能:英伟达的需求可能迫使实现翻倍
何为“云计算”
边缘计算与人工智能(Edge AI)如何引领新一轮技术革命
云计算掀起智能硬件变革的浪潮
云的数据安全与监控
云计算驱动联想持续创新
将AI工作负载推向边缘
Upwind筹集2.5亿美元以实现云安全规模化
今年五月份是汽车电子,测试测量和云计算三大热点!
全球云计算市场迎来重大价格调整
“英伟达亲儿子”CoreWeave提交IPO申请,去年收入暴涨8倍
传说中的云计算、云存储是不是和这东西有关?
仿人型机器人能同时流利地说15种语言
消息称谷歌首款 AR 眼镜年内上市售卖,将在本周 I/O 大会发布
软件安全成为嵌入式云计算的热点
Android云计算之移动点餐系统分析与设计
美图获阿里巴巴2.5亿美元战略投资,将在AI与电商领域深度合作
基于云计算的元器件查找软件,包含海量在线数据库
IT有明天:大转换中的云计算
F5基于云计算平台的虚拟桌面连接解决方案
云计算掀起智能硬件变革的浪潮
2016物联网大会宣传片
基于云计算技术的日志管理系统
消费电子云方案
应用于数据中心(DataCenter)的 Smarter Solution
打造可靠的云存储技术
云计算在IC设计中的应用