mirror of https://github.com/sunface/rust-course
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
265 lines
8.4 KiB
265 lines
8.4 KiB
//! Minimal blocking Redis client implementation
|
|
//!
|
|
//! Provides a blocking connect and methods for issuing the supported commands.
|
|
|
|
use bytes::Bytes;
|
|
use std::time::Duration;
|
|
use tokio::net::ToSocketAddrs;
|
|
use tokio::runtime::Runtime;
|
|
|
|
pub use crate::client::Message;
|
|
|
|
/// Established connection with a Redis server.
|
|
///
|
|
/// Backed by a single `TcpStream`, `BlockingClient` provides basic network
|
|
/// client functionality (no pooling, retrying, ...). Connections are
|
|
/// established using the [`connect`](fn@connect) function.
|
|
///
|
|
/// Requests are issued using the various methods of `Client`.
|
|
pub struct BlockingClient {
|
|
/// The asynchronous `Client`.
|
|
inner: crate::client::Client,
|
|
|
|
/// A `current_thread` runtime for executing operations on the asynchronous
|
|
/// client in a blocking manner.
|
|
rt: Runtime,
|
|
}
|
|
|
|
/// A client that has entered pub/sub mode.
|
|
///
|
|
/// Once clients subscribe to a channel, they may only perform pub/sub related
|
|
/// commands. The `BlockingClient` type is transitioned to a
|
|
/// `BlockingSubscriber` type in order to prevent non-pub/sub methods from being
|
|
/// called.
|
|
pub struct BlockingSubscriber {
|
|
/// The asynchronous `Subscriber`.
|
|
inner: crate::client::Subscriber,
|
|
|
|
/// A `current_thread` runtime for executing operations on the asynchronous
|
|
/// `Subscriber` in a blocking manner.
|
|
rt: Runtime,
|
|
}
|
|
|
|
/// The iterator returned by `Subscriber::into_iter`.
|
|
struct SubscriberIterator {
|
|
/// The asynchronous `Subscriber`.
|
|
inner: crate::client::Subscriber,
|
|
|
|
/// A `current_thread` runtime for executing operations on the asynchronous
|
|
/// `Subscriber` in a blocking manner.
|
|
rt: Runtime,
|
|
}
|
|
|
|
/// Establish a connection with the Redis server located at `addr`.
|
|
///
|
|
/// `addr` may be any type that can be asynchronously converted to a
|
|
/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
|
|
/// trait is the Tokio version and not the `std` version.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use mini_redis::blocking_client;
|
|
///
|
|
/// fn main() {
|
|
/// let client = match blocking_client::connect("localhost:6379") {
|
|
/// Ok(client) => client,
|
|
/// Err(_) => panic!("failed to establish connection"),
|
|
/// };
|
|
/// # drop(client);
|
|
/// }
|
|
/// ```
|
|
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
|
|
let rt = tokio::runtime::Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()?;
|
|
|
|
let inner = rt.block_on(crate::client::connect(addr))?;
|
|
|
|
Ok(BlockingClient { inner, rt })
|
|
}
|
|
|
|
impl BlockingClient {
|
|
/// Get the value of key.
|
|
///
|
|
/// If the key does not exist the special value `None` is returned.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Demonstrates basic usage.
|
|
///
|
|
/// ```no_run
|
|
/// use mini_redis::blocking_client;
|
|
///
|
|
/// fn main() {
|
|
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
|
|
///
|
|
/// let val = client.get("foo").unwrap();
|
|
/// println!("Got = {:?}", val);
|
|
/// }
|
|
/// ```
|
|
pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
|
|
self.rt.block_on(self.inner.get(key))
|
|
}
|
|
|
|
/// Set `key` to hold the given `value`.
|
|
///
|
|
/// The `value` is associated with `key` until it is overwritten by the next
|
|
/// call to `set` or it is removed.
|
|
///
|
|
/// If key already holds a value, it is overwritten. Any previous time to
|
|
/// live associated with the key is discarded on successful SET operation.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Demonstrates basic usage.
|
|
///
|
|
/// ```no_run
|
|
/// use mini_redis::blocking_client;
|
|
///
|
|
/// fn main() {
|
|
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
|
|
///
|
|
/// client.set("foo", "bar".into()).unwrap();
|
|
///
|
|
/// // Getting the value immediately works
|
|
/// let val = client.get("foo").unwrap().unwrap();
|
|
/// assert_eq!(val, "bar");
|
|
/// }
|
|
/// ```
|
|
pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
|
|
self.rt.block_on(self.inner.set(key, value))
|
|
}
|
|
|
|
/// Set `key` to hold the given `value`. The value expires after `expiration`
|
|
///
|
|
/// The `value` is associated with `key` until one of the following:
|
|
/// - it expires.
|
|
/// - it is overwritten by the next call to `set`.
|
|
/// - it is removed.
|
|
///
|
|
/// If key already holds a value, it is overwritten. Any previous time to
|
|
/// live associated with the key is discarded on a successful SET operation.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Demonstrates basic usage. This example is not **guaranteed** to always
|
|
/// work as it relies on time based logic and assumes the client and server
|
|
/// stay relatively synchronized in time. The real world tends to not be so
|
|
/// favorable.
|
|
///
|
|
/// ```no_run
|
|
/// use mini_redis::blocking_client;
|
|
/// use std::thread;
|
|
/// use std::time::Duration;
|
|
///
|
|
/// fn main() {
|
|
/// let ttl = Duration::from_millis(500);
|
|
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
|
|
///
|
|
/// client.set_expires("foo", "bar".into(), ttl).unwrap();
|
|
///
|
|
/// // Getting the value immediately works
|
|
/// let val = client.get("foo").unwrap().unwrap();
|
|
/// assert_eq!(val, "bar");
|
|
///
|
|
/// // Wait for the TTL to expire
|
|
/// thread::sleep(ttl);
|
|
///
|
|
/// let val = client.get("foo").unwrap();
|
|
/// assert!(val.is_some());
|
|
/// }
|
|
/// ```
|
|
pub fn set_expires(
|
|
&mut self,
|
|
key: &str,
|
|
value: Bytes,
|
|
expiration: Duration,
|
|
) -> crate::Result<()> {
|
|
self.rt
|
|
.block_on(self.inner.set_expires(key, value, expiration))
|
|
}
|
|
|
|
/// Posts `message` to the given `channel`.
|
|
///
|
|
/// Returns the number of subscribers currently listening on the channel.
|
|
/// There is no guarantee that these subscribers receive the message as they
|
|
/// may disconnect at any time.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Demonstrates basic usage.
|
|
///
|
|
/// ```no_run
|
|
/// use mini_redis::blocking_client;
|
|
///
|
|
/// fn main() {
|
|
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
|
|
///
|
|
/// let val = client.publish("foo", "bar".into()).unwrap();
|
|
/// println!("Got = {:?}", val);
|
|
/// }
|
|
/// ```
|
|
pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
|
|
self.rt.block_on(self.inner.publish(channel, message))
|
|
}
|
|
|
|
/// Subscribes the client to the specified channels.
|
|
///
|
|
/// Once a client issues a subscribe command, it may no longer issue any
|
|
/// non-pub/sub commands. The function consumes `self` and returns a
|
|
/// `BlockingSubscriber`.
|
|
///
|
|
/// The `BlockingSubscriber` value is used to receive messages as well as
|
|
/// manage the list of channels the client is subscribed to.
|
|
pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
|
|
let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
|
|
Ok(BlockingSubscriber {
|
|
inner: subscriber,
|
|
rt: self.rt,
|
|
})
|
|
}
|
|
}
|
|
|
|
impl BlockingSubscriber {
|
|
/// Returns the set of channels currently subscribed to.
|
|
pub fn get_subscribed(&self) -> &[String] {
|
|
self.inner.get_subscribed()
|
|
}
|
|
|
|
/// Receive the next message published on a subscribed channel, waiting if
|
|
/// necessary.
|
|
///
|
|
/// `None` indicates the subscription has been terminated.
|
|
pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
|
|
self.rt.block_on(self.inner.next_message())
|
|
}
|
|
|
|
/// Convert the subscriber into an `Iterator` yielding new messages published
|
|
/// on subscribed channels.
|
|
pub fn into_iter(self) -> impl Iterator<Item = crate::Result<Message>> {
|
|
SubscriberIterator {
|
|
inner: self.inner,
|
|
rt: self.rt,
|
|
}
|
|
}
|
|
|
|
/// Subscribe to a list of new channels
|
|
pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
|
|
self.rt.block_on(self.inner.subscribe(channels))
|
|
}
|
|
|
|
/// Unsubscribe to a list of new channels
|
|
pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
|
|
self.rt.block_on(self.inner.unsubscribe(channels))
|
|
}
|
|
}
|
|
|
|
impl Iterator for SubscriberIterator {
|
|
type Item = crate::Result<Message>;
|
|
|
|
fn next(&mut self) -> Option<crate::Result<Message>> {
|
|
self.rt.block_on(self.inner.next_message()).transpose()
|
|
}
|
|
}
|