"); //-->
本文分享自天翼云开发者社区《Rust 中的 Tokio 线程同步机制》,作者:l****n
Rust 中的 Tokio 线程同步机制在并发编程中,线程同步是一个重要的概念,用于确保多个线程在访问共享资源时能够正确地协调。Tokio 是一个强大的异步运行时库,为 Rust 提供了多种线程同步机制。以下是一些常见的同步机制:
Mutex
RwLock
Barrier
Semaphore
Notify
oneshot 和 mpsc 通道
watch 通道
Mutex(互斥锁)是最常见的同步原语之一,用于保护共享数据。它确保同一时间只有一个线程能够访问数据,从而避免竞争条件。
use tokio::sync::Mutex;use std::sync::Arc;#[tokio::main]async fn main() { let data = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let data = data.clone(); let handle = tokio::spawn(async move { let mut lock = data.lock().await;
*lock += 1;
});
handles.push(handle);
} for handle in handles {
handle.await.unwrap();
} println!("Result: {}", *data.lock().await);
}2. RwLockRwLock(读写锁)允许多线程同时读取数据,但只允许一个线程写入数据。它比 Mutex 更加灵活,因为在读取多于写入的场景下,它能提高性能。功能上,他是读写互斥、写写互斥、读读兼容。
use tokio::sync::RwLock;use std::sync::Arc;#[tokio::main]async fn main() { let data = Arc::new(RwLock::new(0)); let read_data = data.clone(); let read_handle = tokio::spawn(async move { let lock = read_data.read().await; println!("Read: {}", *lock);
}); let write_data = data.clone(); let write_handle = tokio::spawn(async move { let mut lock = write_data.write().await;
*lock += 1; println!("Write: {}", *lock);
});
read_handle.await.unwrap();
write_handle.await.unwrap();
}3. BarrierBarrier 是一种同步机制,允许多个线程在某个点上进行同步。当线程到达屏障时,它们会等待直到所有线程都到达,然后一起继续执行。
use tokio::sync::Barrier;use std::sync::Arc;#[tokio::main]async fn main() { let barrier = Arc::new(Barrier::new(3)); let mut handles = vec![]; for i in 0..3 { let barrier = barrier.clone(); let handle = tokio::spawn(async move { println!("Before wait: {}", i);
barrier.wait().await; println!("After wait: {}", i);
});
handles.push(handle);
} for handle in handles {
handle.await.unwrap();
}
}4. SemaphoreSemaphore(信号量)是一种用于控制对资源访问的同步原语。它允许多个线程访问资源,但有一个最大并发数限制。
#[tokio::test]async fn test_sem() { let semaphore = Arc::new(Semaphore::new(3)); let mut handles = vec![]; for i in 0..5 { let semaphore = semaphore.clone(); let handle = tokio::spawn(async move { let permit = semaphore.acquire().await.unwrap(); let now = Local::now(); println!("Got permit: {} at {:?}", i, now); println!( "Semaphore available permits before sleep: {}",
semaphore.available_permits()
); sleep(Duration::from_secs(5)).await; drop(permit); println!( "Semaphore available permits after sleep: {}",
semaphore.available_permits()
);
});
handles.push(handle);
} for handle in handles {
handle.await.unwrap();
}
}最终的结果如下
Got permit: 0 at 2024-08-08T21:03:04.374666+08:00Semaphore available permits before sleep: 2Got permit: 1 at 2024-08-08T21:03:04.375527800+08:00Semaphore available permits before sleep: 1Got permit: 2 at 2024-08-08T21:03:04.375563+08:00Semaphore available permits before sleep: 0Semaphore available permits after sleep: 0Semaphore available permits after sleep: 0Semaphore available permits after sleep: 1Got permit: 3 at 2024-08-08T21:03:09.376722800+08:00Semaphore available permits before sleep: 1Got permit: 4 at 2024-08-08T21:03:09.376779200+08:00Semaphore available permits before sleep: 1Semaphore available permits after sleep: 2Semaphore available permits after sleep: 35. Notify
Notify 是一种用于线程间通知的简单机制。它允许一个线程通知其他线程某些事件的发生。
use tokio::sync::Notify;use std::sync::Arc;#[tokio::main]async fn main() { let notify = Arc::new(Notify::new()); let notify_clone = notify.clone(); let handle = tokio::spawn(async move {
notify_clone.notified().await; println!("Received notification");
});
notify.notify_one();
handle.await.unwrap();
}6. oneshot 和 mpsc 通道oneshot 通道用于一次性发送消息,而 mpsc 通道则允许多个生产者发送消息到一个消费者。一般地onshot用于异常通知、启动分析等功能。mpsc用于实现异步消息同步
oneshotuse tokio::sync::oneshot;#[tokio::main]async fn main() { let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send("Hello, world!").unwrap();
}); let message = rx.await.unwrap(); println!("Received: {}", message);
}mpscuse tokio::sync::mpsc;#[tokio::main]async fn main() { let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
tx.send("Hello, world!").await.unwrap();
}); while let Some(message) = rx.recv().await { println!("Received: {}", message);
}
}7. watch 通道watch 通道用于发送和接收共享状态的更新。它允许多个消费者监听状态的变化。
use tokio::sync::watch;#[tokio::main]async fn main() { let (tx, mut rx) = watch::channel("initial");
tokio::spawn(async move {
tx.send("updated").unwrap();
}); while rx.changed().await.is_ok() { println!("Received: {}", *rx.borrow());
}
}watch通道:
用于广播状态更新,一个生产者更新状态,多个消费者获取最新状态。
适合配置变更、状态同步等场景。
mpsc通道:
用于传递消息队列,多个生产者发送消息,一个消费者逐条处理。
适合任务队列、事件驱动等场景。
Rust 中的 Tokio 提供了丰富的线程同步机制,可以根据具体需求选择合适的同步原语。常用的同步机制包括:
Mutex:互斥锁,保护共享数据。
RwLock:读写锁,允许并发读,写时独占。
Barrier:屏障,同步多个线程在某一点。
Semaphore:信号量,控制并发访问资源。
Notify:通知机制,用于线程间通知。
oneshot 和 mpsc 通道:消息传递机制。
watch 通道:状态更新机制。
通过这些同步机制,可以在 Rust 中编写高效、安全的并发程序。
专栏文章内容及配图由作者撰写发布,仅供工程师学习之用,如有侵权或者其他违规问题,请联系本站处理。 联系我们
相关推荐
基于云计算的元器件查找软件,包含海量在线数据库
“英伟达亲儿子”CoreWeave提交IPO申请,去年收入暴涨8倍
何为“云计算”
全球云计算市场迎来重大价格调整
Upwind筹集2.5亿美元以实现云安全规模化
应用于数据中心(DataCenter)的 Smarter Solution
2016物联网大会宣传片
今年五月份是汽车电子,测试测量和云计算三大热点!
基于云计算技术的日志管理系统
仿人型机器人能同时流利地说15种语言
由Memfault驱动的Nordic nRF Cloud荣获移动突破奖所颁发之年度云计算创新奖
边缘计算与人工智能(Edge AI)如何引领新一轮技术革命
台积电AI产能:英伟达的需求可能迫使实现翻倍
云计算掀起智能硬件变革的浪潮
将AI工作负载推向边缘
美图获阿里巴巴2.5亿美元战略投资,将在AI与电商领域深度合作
传说中的云计算、云存储是不是和这东西有关?
软件安全成为嵌入式云计算的热点
消息称谷歌首款 AR 眼镜年内上市售卖,将在本周 I/O 大会发布
F5基于云计算平台的虚拟桌面连接解决方案
云计算驱动联想持续创新
云计算在IC设计中的应用
云计算掀起智能硬件变革的浪潮
IT有明天:大转换中的云计算
消费电子云方案
打造可靠的云存储技术
云的数据安全与监控
Android云计算之移动点餐系统分析与设计