mirror of https://github.com/sunface/rust-course
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
474 lines
17 KiB
474 lines
17 KiB
3 years ago
|
//! Minimal Redis client implementation
|
||
|
//!
|
||
|
//! Provides an async connect and methods for issuing the supported commands.
|
||
|
|
||
|
use crate::cmd::{Get, Publish, Set, Subscribe, Unsubscribe};
|
||
|
use crate::{Connection, Frame};
|
||
|
|
||
|
use async_stream::try_stream;
|
||
|
use bytes::Bytes;
|
||
|
use std::io::{Error, ErrorKind};
|
||
|
use std::time::Duration;
|
||
|
use tokio::net::{TcpStream, ToSocketAddrs};
|
||
|
use tokio_stream::Stream;
|
||
|
use tracing::{debug, instrument};
|
||
|
|
||
|
/// Established connection with a Redis server.
|
||
|
///
|
||
|
/// Backed by a single `TcpStream`, `Client` provides basic network client
|
||
|
/// functionality (no pooling, retrying, ...). Connections are established using
|
||
|
/// the [`connect`](fn@connect) function.
|
||
|
///
|
||
|
/// Requests are issued using the various methods of `Client`.
|
||
|
pub struct Client {
|
||
|
/// The TCP connection decorated with the redis protocol encoder / decoder
|
||
|
/// implemented using a buffered `TcpStream`.
|
||
|
///
|
||
|
/// When `Listener` receives an inbound connection, the `TcpStream` is
|
||
|
/// passed to `Connection::new`, which initializes the associated buffers.
|
||
|
/// `Connection` allows the handler to operate at the "frame" level and keep
|
||
|
/// the byte level protocol parsing details encapsulated in `Connection`.
|
||
|
connection: Connection,
|
||
|
}
|
||
|
|
||
|
/// A client that has entered pub/sub mode.
|
||
|
///
|
||
|
/// Once clients subscribe to a channel, they may only perform pub/sub related
|
||
|
/// commands. The `Client` type is transitioned to a `Subscriber` type in order
|
||
|
/// to prevent non-pub/sub methods from being called.
|
||
|
pub struct Subscriber {
|
||
|
/// The subscribed client.
|
||
|
client: Client,
|
||
|
|
||
|
/// The set of channels to which the `Subscriber` is currently subscribed.
|
||
|
subscribed_channels: Vec<String>,
|
||
|
}
|
||
|
|
||
|
/// A message received on a subscribed channel.
|
||
|
#[derive(Debug, Clone)]
|
||
|
pub struct Message {
|
||
|
pub channel: String,
|
||
|
pub content: Bytes,
|
||
|
}
|
||
|
|
||
|
/// Establish a connection with the Redis server located at `addr`.
|
||
|
///
|
||
|
/// `addr` may be any type that can be asynchronously converted to a
|
||
|
/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
|
||
|
/// trait is the Tokio version and not the `std` version.
|
||
|
///
|
||
|
/// # Examples
|
||
|
///
|
||
|
/// ```no_run
|
||
|
/// use mini_redis::client;
|
||
|
///
|
||
|
/// #[tokio::main]
|
||
|
/// async fn main() {
|
||
|
/// let client = match client::connect("localhost:6379").await {
|
||
|
/// Ok(client) => client,
|
||
|
/// Err(_) => panic!("failed to establish connection"),
|
||
|
/// };
|
||
|
/// # drop(client);
|
||
|
/// }
|
||
|
/// ```
|
||
|
///
|
||
|
pub async fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<Client> {
|
||
|
// The `addr` argument is passed directly to `TcpStream::connect`. This
|
||
|
// performs any asynchronous DNS lookup and attempts to establish the TCP
|
||
|
// connection. An error at either step returns an error, which is then
|
||
|
// bubbled up to the caller of `mini_redis` connect.
|
||
|
let socket = TcpStream::connect(addr).await?;
|
||
|
|
||
|
// Initialize the connection state. This allocates read/write buffers to
|
||
|
// perform redis protocol frame parsing.
|
||
|
let connection = Connection::new(socket);
|
||
|
|
||
|
Ok(Client { connection })
|
||
|
}
|
||
|
|
||
|
impl Client {
|
||
|
/// Get the value of key.
|
||
|
///
|
||
|
/// If the key does not exist the special value `None` is returned.
|
||
|
///
|
||
|
/// # Examples
|
||
|
///
|
||
|
/// Demonstrates basic usage.
|
||
|
///
|
||
|
/// ```no_run
|
||
|
/// use mini_redis::client;
|
||
|
///
|
||
|
/// #[tokio::main]
|
||
|
/// async fn main() {
|
||
|
/// let mut client = client::connect("localhost:6379").await.unwrap();
|
||
|
///
|
||
|
/// let val = client.get("foo").await.unwrap();
|
||
|
/// println!("Got = {:?}", val);
|
||
|
/// }
|
||
|
/// ```
|
||
|
#[instrument(skip(self))]
|
||
|
pub async fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
|
||
|
// Create a `Get` command for the `key` and convert it to a frame.
|
||
|
let frame = Get::new(key).into_frame();
|
||
|
|
||
|
debug!(request = ?frame);
|
||
|
|
||
|
// Write the frame to the socket. This writes the full frame to the
|
||
|
// socket, waiting if necessary.
|
||
|
self.connection.write_frame(&frame).await?;
|
||
|
|
||
|
// Wait for the response from the server
|
||
|
//
|
||
|
// Both `Simple` and `Bulk` frames are accepted. `Null` represents the
|
||
|
// key not being present and `None` is returned.
|
||
|
match self.read_response().await? {
|
||
|
Frame::Simple(value) => Ok(Some(value.into())),
|
||
|
Frame::Bulk(value) => Ok(Some(value)),
|
||
|
Frame::Null => Ok(None),
|
||
|
frame => Err(frame.to_error()),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Set `key` to hold the given `value`.
|
||
|
///
|
||
|
/// The `value` is associated with `key` until it is overwritten by the next
|
||
|
/// call to `set` or it is removed.
|
||
|
///
|
||
|
/// If key already holds a value, it is overwritten. Any previous time to
|
||
|
/// live associated with the key is discarded on successful SET operation.
|
||
|
///
|
||
|
/// # Examples
|
||
|
///
|
||
|
/// Demonstrates basic usage.
|
||
|
///
|
||
|
/// ```no_run
|
||
|
/// use mini_redis::client;
|
||
|
///
|
||
|
/// #[tokio::main]
|
||
|
/// async fn main() {
|
||
|
/// let mut client = client::connect("localhost:6379").await.unwrap();
|
||
|
///
|
||
|
/// client.set("foo", "bar".into()).await.unwrap();
|
||
|
///
|
||
|
/// // Getting the value immediately works
|
||
|
/// let val = client.get("foo").await.unwrap().unwrap();
|
||
|
/// assert_eq!(val, "bar");
|
||
|
/// }
|
||
|
/// ```
|
||
|
#[instrument(skip(self))]
|
||
|
pub async fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
|
||
|
// Create a `Set` command and pass it to `set_cmd`. A separate method is
|
||
|
// used to set a value with an expiration. The common parts of both
|
||
|
// functions are implemented by `set_cmd`.
|
||
|
self.set_cmd(Set::new(key, value, None)).await
|
||
|
}
|
||
|
|
||
|
/// Set `key` to hold the given `value`. The value expires after `expiration`
|
||
|
///
|
||
|
/// The `value` is associated with `key` until one of the following:
|
||
|
/// - it expires.
|
||
|
/// - it is overwritten by the next call to `set`.
|
||
|
/// - it is removed.
|
||
|
///
|
||
|
/// If key already holds a value, it is overwritten. Any previous time to
|
||
|
/// live associated with the key is discarded on a successful SET operation.
|
||
|
///
|
||
|
/// # Examples
|
||
|
///
|
||
|
/// Demonstrates basic usage. This example is not **guaranteed** to always
|
||
|
/// work as it relies on time based logic and assumes the client and server
|
||
|
/// stay relatively synchronized in time. The real world tends to not be so
|
||
|
/// favorable.
|
||
|
///
|
||
|
/// ```no_run
|
||
|
/// use mini_redis::client;
|
||
|
/// use tokio::time;
|
||
|
/// use std::time::Duration;
|
||
|
///
|
||
|
/// #[tokio::main]
|
||
|
/// async fn main() {
|
||
|
/// let ttl = Duration::from_millis(500);
|
||
|
/// let mut client = client::connect("localhost:6379").await.unwrap();
|
||
|
///
|
||
|
/// client.set_expires("foo", "bar".into(), ttl).await.unwrap();
|
||
|
///
|
||
|
/// // Getting the value immediately works
|
||
|
/// let val = client.get("foo").await.unwrap().unwrap();
|
||
|
/// assert_eq!(val, "bar");
|
||
|
///
|
||
|
/// // Wait for the TTL to expire
|
||
|
/// time::sleep(ttl).await;
|
||
|
///
|
||
|
/// let val = client.get("foo").await.unwrap();
|
||
|
/// assert!(val.is_some());
|
||
|
/// }
|
||
|
/// ```
|
||
|
#[instrument(skip(self))]
|
||
|
pub async fn set_expires(
|
||
|
&mut self,
|
||
|
key: &str,
|
||
|
value: Bytes,
|
||
|
expiration: Duration,
|
||
|
) -> crate::Result<()> {
|
||
|
// Create a `Set` command and pass it to `set_cmd`. A separate method is
|
||
|
// used to set a value with an expiration. The common parts of both
|
||
|
// functions are implemented by `set_cmd`.
|
||
|
self.set_cmd(Set::new(key, value, Some(expiration))).await
|
||
|
}
|
||
|
|
||
|
/// The core `SET` logic, used by both `set` and `set_expires.
|
||
|
async fn set_cmd(&mut self, cmd: Set) -> crate::Result<()> {
|
||
|
// Convert the `Set` command into a frame
|
||
|
let frame = cmd.into_frame();
|
||
|
|
||
|
debug!(request = ?frame);
|
||
|
|
||
|
// Write the frame to the socket. This writes the full frame to the
|
||
|
// socket, waiting if necessary.
|
||
|
self.connection.write_frame(&frame).await?;
|
||
|
|
||
|
// Wait for the response from the server. On success, the server
|
||
|
// responds simply with `OK`. Any other response indicates an error.
|
||
|
match self.read_response().await? {
|
||
|
Frame::Simple(response) if response == "OK" => Ok(()),
|
||
|
frame => Err(frame.to_error()),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Posts `message` to the given `channel`.
|
||
|
///
|
||
|
/// Returns the number of subscribers currently listening on the channel.
|
||
|
/// There is no guarantee that these subscribers receive the message as they
|
||
|
/// may disconnect at any time.
|
||
|
///
|
||
|
/// # Examples
|
||
|
///
|
||
|
/// Demonstrates basic usage.
|
||
|
///
|
||
|
/// ```no_run
|
||
|
/// use mini_redis::client;
|
||
|
///
|
||
|
/// #[tokio::main]
|
||
|
/// async fn main() {
|
||
|
/// let mut client = client::connect("localhost:6379").await.unwrap();
|
||
|
///
|
||
|
/// let val = client.publish("foo", "bar".into()).await.unwrap();
|
||
|
/// println!("Got = {:?}", val);
|
||
|
/// }
|
||
|
/// ```
|
||
|
#[instrument(skip(self))]
|
||
|
pub async fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
|
||
|
// Convert the `Publish` command into a frame
|
||
|
let frame = Publish::new(channel, message).into_frame();
|
||
|
|
||
|
debug!(request = ?frame);
|
||
|
|
||
|
// Write the frame to the socket
|
||
|
self.connection.write_frame(&frame).await?;
|
||
|
|
||
|
// Read the response
|
||
|
match self.read_response().await? {
|
||
|
Frame::Integer(response) => Ok(response),
|
||
|
frame => Err(frame.to_error()),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Subscribes the client to the specified channels.
|
||
|
///
|
||
|
/// Once a client issues a subscribe command, it may no longer issue any
|
||
|
/// non-pub/sub commands. The function consumes `self` and returns a `Subscriber`.
|
||
|
///
|
||
|
/// The `Subscriber` value is used to receive messages as well as manage the
|
||
|
/// list of channels the client is subscribed to.
|
||
|
#[instrument(skip(self))]
|
||
|
pub async fn subscribe(mut self, channels: Vec<String>) -> crate::Result<Subscriber> {
|
||
|
// Issue the subscribe command to the server and wait for confirmation.
|
||
|
// The client will then have been transitioned into the "subscriber"
|
||
|
// state and may only issue pub/sub commands from that point on.
|
||
|
self.subscribe_cmd(&channels).await?;
|
||
|
|
||
|
// Return the `Subscriber` type
|
||
|
Ok(Subscriber {
|
||
|
client: self,
|
||
|
subscribed_channels: channels,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
/// The core `SUBSCRIBE` logic, used by misc subscribe fns
|
||
|
async fn subscribe_cmd(&mut self, channels: &[String]) -> crate::Result<()> {
|
||
|
// Convert the `Subscribe` command into a frame
|
||
|
let frame = Subscribe::new(&channels).into_frame();
|
||
|
|
||
|
debug!(request = ?frame);
|
||
|
|
||
|
// Write the frame to the socket
|
||
|
self.connection.write_frame(&frame).await?;
|
||
|
|
||
|
// For each channel being subscribed to, the server responds with a
|
||
|
// message confirming subscription to that channel.
|
||
|
for channel in channels {
|
||
|
// Read the response
|
||
|
let response = self.read_response().await?;
|
||
|
|
||
|
// Verify it is confirmation of subscription.
|
||
|
match response {
|
||
|
Frame::Array(ref frame) => match frame.as_slice() {
|
||
|
// The server responds with an array frame in the form of:
|
||
|
//
|
||
|
// ```
|
||
|
// [ "subscribe", channel, num-subscribed ]
|
||
|
// ```
|
||
|
//
|
||
|
// where channel is the name of the channel and
|
||
|
// num-subscribed is the number of channels that the client
|
||
|
// is currently subscribed to.
|
||
|
[subscribe, schannel, ..]
|
||
|
if *subscribe == "subscribe" && *schannel == channel => {}
|
||
|
_ => return Err(response.to_error()),
|
||
|
},
|
||
|
frame => return Err(frame.to_error()),
|
||
|
};
|
||
|
}
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
/// Reads a response frame from the socket.
|
||
|
///
|
||
|
/// If an `Error` frame is received, it is converted to `Err`.
|
||
|
async fn read_response(&mut self) -> crate::Result<Frame> {
|
||
|
let response = self.connection.read_frame().await?;
|
||
|
|
||
|
debug!(?response);
|
||
|
|
||
|
match response {
|
||
|
// Error frames are converted to `Err`
|
||
|
Some(Frame::Error(msg)) => Err(msg.into()),
|
||
|
Some(frame) => Ok(frame),
|
||
|
None => {
|
||
|
// Receiving `None` here indicates the server has closed the
|
||
|
// connection without sending a frame. This is unexpected and is
|
||
|
// represented as a "connection reset by peer" error.
|
||
|
let err = Error::new(ErrorKind::ConnectionReset, "connection reset by server");
|
||
|
|
||
|
Err(err.into())
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
impl Subscriber {
|
||
|
/// Returns the set of channels currently subscribed to.
|
||
|
pub fn get_subscribed(&self) -> &[String] {
|
||
|
&self.subscribed_channels
|
||
|
}
|
||
|
|
||
|
/// Receive the next message published on a subscribed channel, waiting if
|
||
|
/// necessary.
|
||
|
///
|
||
|
/// `None` indicates the subscription has been terminated.
|
||
|
pub async fn next_message(&mut self) -> crate::Result<Option<Message>> {
|
||
|
match self.client.connection.read_frame().await? {
|
||
|
Some(mframe) => {
|
||
|
debug!(?mframe);
|
||
|
|
||
|
match mframe {
|
||
|
Frame::Array(ref frame) => match frame.as_slice() {
|
||
|
[message, channel, content] if *message == "message" => Ok(Some(Message {
|
||
|
channel: channel.to_string(),
|
||
|
content: Bytes::from(content.to_string()),
|
||
|
})),
|
||
|
_ => Err(mframe.to_error()),
|
||
|
},
|
||
|
frame => Err(frame.to_error()),
|
||
|
}
|
||
|
}
|
||
|
None => Ok(None),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Convert the subscriber into a `Stream` yielding new messages published
|
||
|
/// on subscribed channels.
|
||
|
///
|
||
|
/// `Subscriber` does not implement stream itself as doing so with safe code
|
||
|
/// is non trivial. The usage of async/await would require a manual Stream
|
||
|
/// implementation to use `unsafe` code. Instead, a conversion function is
|
||
|
/// provided and the returned stream is implemented with the help of the
|
||
|
/// `async-stream` crate.
|
||
|
pub fn into_stream(mut self) -> impl Stream<Item = crate::Result<Message>> {
|
||
|
// Uses the `try_stream` macro from the `async-stream` crate. Generators
|
||
|
// are not stable in Rust. The crate uses a macro to simulate generators
|
||
|
// on top of async/await. There are limitations, so read the
|
||
|
// documentation there.
|
||
|
try_stream! {
|
||
|
while let Some(message) = self.next_message().await? {
|
||
|
yield message;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Subscribe to a list of new channels
|
||
|
#[instrument(skip(self))]
|
||
|
pub async fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
|
||
|
// Issue the subscribe command
|
||
|
self.client.subscribe_cmd(channels).await?;
|
||
|
|
||
|
// Update the set of subscribed channels.
|
||
|
self.subscribed_channels
|
||
|
.extend(channels.iter().map(Clone::clone));
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
/// Unsubscribe to a list of new channels
|
||
|
#[instrument(skip(self))]
|
||
|
pub async fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
|
||
|
let frame = Unsubscribe::new(&channels).into_frame();
|
||
|
|
||
|
debug!(request = ?frame);
|
||
|
|
||
|
// Write the frame to the socket
|
||
|
self.client.connection.write_frame(&frame).await?;
|
||
|
|
||
|
// if the input channel list is empty, server acknowledges as unsubscribing
|
||
|
// from all subscribed channels, so we assert that the unsubscribe list received
|
||
|
// matches the client subscribed one
|
||
|
let num = if channels.is_empty() {
|
||
|
self.subscribed_channels.len()
|
||
|
} else {
|
||
|
channels.len()
|
||
|
};
|
||
|
|
||
|
// Read the response
|
||
|
for _ in 0..num {
|
||
|
let response = self.client.read_response().await?;
|
||
|
|
||
|
match response {
|
||
|
Frame::Array(ref frame) => match frame.as_slice() {
|
||
|
[unsubscribe, channel, ..] if *unsubscribe == "unsubscribe" => {
|
||
|
let len = self.subscribed_channels.len();
|
||
|
|
||
|
if len == 0 {
|
||
|
// There must be at least one channel
|
||
|
return Err(response.to_error());
|
||
|
}
|
||
|
|
||
|
// unsubscribed channel should exist in the subscribed list at this point
|
||
|
self.subscribed_channels.retain(|c| *channel != &c[..]);
|
||
|
|
||
|
// Only a single channel should be removed from the
|
||
|
// list of subscribed channels.
|
||
|
if self.subscribed_channels.len() != len - 1 {
|
||
|
return Err(response.to_error());
|
||
|
}
|
||
|
}
|
||
|
_ => return Err(response.to_error()),
|
||
|
},
|
||
|
frame => return Err(frame.to_error()),
|
||
|
};
|
||
|
}
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
}
|