libsignal-net-grpc: add pbjson mode

Adds a mode where gRPC requests are encoded as protobuf JSON (using 
pbjson + serde_json) instead of binary protobuf (using prost). Because
tonic-build hardcodes that the codec initialization is done via
default(), the choice is done based on whether the current tokio
runtime's Id is in a "JSON mode" set; this is not perfect because
tokio will reuse Ids across runtimes, but we're only planning to use
this for testing anyway.
This commit is contained in:
Jordan Rose 2026-05-06 11:35:09 -07:00 committed by GitHub
parent 8b4eff395e
commit 9be982cbf3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 369 additions and 13 deletions

8
Cargo.lock generated
View File

@ -2880,6 +2880,7 @@ dependencies = [
"rand 0.9.4",
"rand_chacha",
"ref-cast",
"scopeguard",
"serde",
"serde_json",
"serde_with",
@ -2905,10 +2906,17 @@ name = "libsignal-net-grpc"
version = "0.1.0"
dependencies = [
"const-str",
"derive-where",
"libsignal-core",
"pbjson",
"pbjson-build",
"pbjson-types",
"prost",
"prost-types",
"serde",
"serde_json",
"strum",
"tokio",
"tonic",
"tonic-prost",
"tonic-prost-build",

View File

@ -9,6 +9,9 @@ license.workspace = true
[lints]
workspace = true
[features]
json-grpc-codec = ["libsignal-net-grpc/json-grpc-codec"]
[dependencies]
libsignal-core = { workspace = true }
libsignal-keytrans = { workspace = true }
@ -68,6 +71,7 @@ nonzero_ext = { workspace = true }
pretty_assertions = { workspace = true }
rand = { workspace = true }
rand_chacha = { workspace = true }
scopeguard = { workspace = true }
test-case = { workspace = true }
test-log = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "test-util"] }

View File

@ -314,7 +314,7 @@ fn request_error_from_server_side_error_info<E>(
}
"RESOURCE_EXHAUSTED" | "UNAVAILABLE" => {
// UNAVAILABLE is unlikely to have RetryInfo, but it doesn't really hurt to check.
if let Some(mut retry_delay) =
if let Some(retry_delay) =
matching_details::<google::rpc::RetryInfo>(&grpc_status.details)
.at_most_one()
.unwrap_or_else(|mut e| {
@ -325,10 +325,23 @@ fn request_error_from_server_side_error_info<E>(
})
.and_then(|info| info.retry_delay)
{
retry_delay.normalize();
// TODO: Use i32::div_ceil when that's stabilized.
// https://github.com/rust-lang/rust/issues/88581
fn nanos_to_secs_ceil(dividend: i32) -> i32 {
const DIVISOR: i32 = 1_000_000_000;
// Normal Div rounds towards 0.
let result = dividend / DIVISOR;
if dividend > 0 && dividend % DIVISOR != 0 {
result + 1
} else {
result
}
}
// Round up so that we're guaranteed to wait *at least* this long.
let retry_after_seconds =
retry_delay.seconds + i64::from(retry_delay.nanos.clamp(0, 1));
let retry_after_seconds = retry_delay
.seconds
.saturating_add(nanos_to_secs_ceil(retry_delay.nanos).into());
return RequestError::RetryLater(RetryLater {
retry_after_seconds: u32::try_from(
retry_after_seconds.clamp(0, u32::MAX.into()),
@ -424,10 +437,14 @@ pub(crate) mod testutil {
use crate::api::testutil::TEST_SELF_ACI;
use crate::ws::WsConnection;
pub(crate) fn req(uri: &str, body: impl prost::Message + 'static) -> http::Request<Vec<u8>> {
let body = tonic::codec::EncodeBody::new_client(
tonic_prost::ProstEncoder::new(Default::default()),
futures_util::stream::iter([Ok(body)]),
pub(crate) fn encode_for_grpc<C: tonic::codec::Encoder<Error = Status>>(
encoder: C,
item: C::Item,
) -> Vec<u8> {
// The difference between client and server only seems to matter when using compression.
tonic::codec::EncodeBody::new_client(
encoder,
futures_util::stream::iter([Ok(item)]),
None,
None,
)
@ -436,8 +453,11 @@ pub(crate) mod testutil {
.expect("non-blocking encoding")
.expect("can read entire message")
.to_bytes()
.into();
.into()
}
pub(crate) fn req(uri: &str, body: impl prost::Message + 'static) -> http::Request<Vec<u8>> {
let body = encode_for_grpc(tonic_prost::ProstEncoder::new(Default::default()), body);
req_typed(uri, body)
}
@ -887,13 +907,13 @@ mod test {
fn test_retry_later(reason: &str) {
let info = vec![
google::rpc::RetryInfo {
retry_delay: Some(prost_types::Duration {
retry_delay: Some(libsignal_net_grpc::Duration {
seconds: 10,
nanos: 2,
}),
},
google::rpc::RetryInfo {
retry_delay: Some(prost_types::Duration {
retry_delay: Some(libsignal_net_grpc::Duration {
seconds: 20,
nanos: 5,
}),
@ -1054,4 +1074,93 @@ mod test {
) -> Result<RateLimitChallenge, RequestError<Infallible>> {
RateLimitChallenge::try_from(input)
}
#[cfg(feature = "json-grpc-codec")]
#[tokio::test]
async fn test_json_mode() {
use uuid::{Uuid, uuid};
use crate::api::Unauth;
use crate::api::usernames::UnauthenticatedChatApi as _;
let rt = tokio::runtime::Handle::current();
libsignal_net_grpc::json::set_json_mode_for_tokio_runtime(&rt, true);
scopeguard::defer! {
libsignal_net_grpc::json::set_json_mode_for_tokio_runtime(
&rt,
false,
);
}
/// A tonic encoder and decoder that passes byte buffers through unchanged, letting tonic
/// add the gRPC framing and nothing else.
struct PassthroughCodec;
impl tonic::codec::Encoder for PassthroughCodec {
type Item = Vec<u8>;
type Error = tonic::Status;
fn encode(
&mut self,
item: Self::Item,
dst: &mut tonic::codec::EncodeBuf<'_>,
) -> Result<(), Self::Error> {
use bytes::BufMut;
dst.put(&item[..]);
Ok(())
}
}
impl tonic::codec::Decoder for PassthroughCodec {
type Item = Vec<u8>;
type Error = tonic::Status;
fn decode(
&mut self,
src: &mut tonic::codec::DecodeBuf<'_>,
) -> Result<Option<Self::Item>, Self::Error> {
use bytes::Buf;
Ok(Some(src.copy_to_bytes(src.remaining()).into()))
}
}
// Not realistic, but not likely to show up by accident.
let hash = &[0x00, 0xff, 0xff, 0xff];
const ACI_UUID: Uuid = uuid!("9d0652a3-dcc3-4d11-975f-74d61598733f");
let validator = testutil::RequestValidator {
expected: testutil::req_typed(
"/org.signal.chat.account.AccountsAnonymous/LookupUsernameHash",
testutil::encode_for_grpc(
PassthroughCodec,
serde_json::to_vec(
&libsignal_net_grpc::proto::chat::account::LookupUsernameHashRequest {
username_hash: hash.to_vec(),
},
)
.expect("can serialize expected request"),
),
),
response: http::Response::new(testutil::encode_for_grpc(
PassthroughCodec,
serde_json::to_vec(&libsignal_net_grpc::proto::chat::account::LookupUsernameHashResponse {
response: Some(
libsignal_net_grpc::proto::chat::account::lookup_username_hash_response::Response::ServiceIdentifier(
libsignal_net_grpc::proto::chat::common::ServiceIdentifier {
identity_type:
libsignal_net_grpc::proto::chat::common::IdentityType::Aci
.into(),
uuid: ACI_UUID.as_bytes().to_vec(),
},
),
),
})
.expect("can serialize response"),
)),
};
let result = Unauth(&validator)
.look_up_username_hash(hash)
.await
.expect("success");
assert_eq!(result, Some(libsignal_core::Aci::from(ACI_UUID)));
}
}

View File

@ -5,17 +5,34 @@ edition = "2021"
authors = ["Signal Messenger LLC"]
license = "AGPL-3.0-only"
[features]
json-grpc-codec = [
"dep:pbjson",
"dep:pbjson-build",
"dep:pbjson-types",
"dep:serde",
"dep:serde_json",
"dep:tokio",
]
[dependencies]
libsignal-core = { workspace = true }
const-str = { workspace = true }
derive-where = { workspace = true }
pbjson = { workspace = true, optional = true }
pbjson-types = { workspace = true, optional = true }
prost = { workspace = true }
prost-types = { workspace = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
strum = { workspace = true, features = ["derive"] }
tokio = { workspace = true, optional = true }
tonic = { workspace = true, default-features = false, features = ["codegen"] }
tonic-prost = { workspace = true }
[build-dependencies]
pbjson-build = { workspace = true, optional = true }
tonic-prost-build = { workspace = true }
[lib]

View File

@ -35,9 +35,33 @@ fn main() {
service_method_file.push("service_methods.rs");
std::fs::write(service_method_file, service_method_contents).expect("can write to OUT_DIR");
tonic_prost_build::configure()
#[cfg(feature = "json-grpc-codec")]
{
let mut json_build = pbjson_build::Builder::new();
for fd in &fds.file {
json_build.register_file_descriptor(fd.clone());
}
json_build
.build(&[".org.signal.chat"])
.expect("can compile with pbjson");
}
let mut tonic_build = tonic_prost_build::configure()
.build_server(false)
.build_transport(false)
.build_transport(false);
if cfg!(feature = "json-grpc-codec") {
tonic_build = tonic_build
.codec_path("crate::json::JsonOrProstCodec")
.compile_well_known_types(true)
.extern_path(".google.protobuf", "::pbjson_types")
// Note that this diverges from proper protobuf JSON in the interest of simplicity and
// prost_types compatibility. (Empty would normally be encoded as `{}`, but `()` is
// encoded as `null`.)
.extern_path(".google.protobuf.Empty", "()")
// These are only used for generic errors, not requests and responses.
.extern_path(".google.protobuf.Any", "::prost_types::Any");
}
tonic_build
.compile_fds_with_config(fds, config)
.expect("can generate code");
}

172
rust/net/grpc/src/json.rs Normal file
View File

@ -0,0 +1,172 @@
//
// Copyright 2026 Signal Messenger, LLC.
// SPDX-License-Identifier: AGPL-3.0-only
//
use std::collections::HashSet;
use std::marker::PhantomData;
use std::sync::RwLock;
use derive_where::derive_where;
use prost::bytes::{Buf as _, BufMut as _};
#[derive_where(Default)]
pub struct Decoder<T>(PhantomData<fn() -> T>);
impl<T: serde::de::DeserializeOwned> tonic::codec::Decoder for Decoder<T> {
type Item = T;
type Error = tonic::Status;
fn decode(
&mut self,
src: &mut tonic::codec::DecodeBuf<'_>,
) -> Result<Option<Self::Item>, Self::Error> {
Ok(Some(
serde_json::from_reader(src.reader()).map_err(std::io::Error::from)?,
))
}
}
#[derive_where(Default)]
pub struct Encoder<T>(PhantomData<fn(T)>);
impl<T: serde::Serialize> tonic::codec::Encoder for Encoder<T> {
type Item = T;
type Error = tonic::Status;
fn encode(
&mut self,
item: Self::Item,
dst: &mut tonic::codec::EncodeBuf<'_>,
) -> Result<(), Self::Error> {
Ok(serde_json::to_writer(dst.writer(), &item).map_err(std::io::Error::from)?)
}
}
#[derive_where(Default)]
pub struct Codec<T, U>(PhantomData<(Encoder<T>, Decoder<U>)>);
impl<T, U> tonic::codec::Codec for Codec<T, U>
where
T: serde::Serialize + Send + 'static,
U: serde::de::DeserializeOwned + Send + 'static,
{
type Encode = T;
type Decode = U;
type Encoder = Encoder<T>;
type Decoder = Decoder<U>;
fn encoder(&mut self) -> Self::Encoder {
Encoder::default()
}
fn decoder(&mut self) -> Self::Decoder {
Decoder::default()
}
}
pub struct MaybeJson<T> {
json: bool,
fallback: T,
}
/// An alias compatible with tonic-build's `codec_path` option.
pub type JsonOrProstCodec<T, U> = MaybeJson<tonic_prost::ProstCodec<T, U>>;
// From https://doc.rust-lang.org/std/collections/struct.HashSet.html#usage-in-const-and-static
// A HashSet without a random seed, so it can be `const`.
static RUNTIMES_WITH_JSON_MODE: RwLock<
HashSet<tokio::runtime::Id, std::hash::BuildHasherDefault<std::hash::DefaultHasher>>,
> = RwLock::new(HashSet::with_hasher(std::hash::BuildHasherDefault::new()));
pub fn set_json_mode_for_tokio_runtime(runtime: &tokio::runtime::Handle, json_mode: bool) {
let mut state = RUNTIMES_WITH_JSON_MODE.write().expect("not poisoned");
let id = runtime.id();
if json_mode {
state.insert(id);
} else {
state.remove(&id);
}
}
impl<T: Default> Default for MaybeJson<T> {
fn default() -> Self {
let json_mode_active = tokio::runtime::Handle::try_current()
.ok()
.and_then(|rt| {
let id = rt.id();
let state = RUNTIMES_WITH_JSON_MODE.read().ok()?;
Some(state.contains(&id))
})
.unwrap_or_default();
Self {
json: json_mode_active,
fallback: Default::default(),
}
}
}
impl<T, U, C> tonic::codec::Codec for MaybeJson<C>
where
T: serde::Serialize + Send + 'static,
U: serde::de::DeserializeOwned + Send + 'static,
C: tonic::codec::Codec<Encode = T, Decode = U>,
{
type Encode = T;
type Decode = U;
type Encoder = MaybeJson<C::Encoder>;
type Decoder = MaybeJson<C::Decoder>;
fn encoder(&mut self) -> Self::Encoder {
MaybeJson {
json: self.json,
fallback: self.fallback.encoder(),
}
}
fn decoder(&mut self) -> Self::Decoder {
MaybeJson {
json: self.json,
fallback: self.fallback.decoder(),
}
}
}
impl<C> tonic::codec::Encoder for MaybeJson<C>
where
C: tonic::codec::Encoder<Item: serde::Serialize, Error = tonic::Status>,
{
type Item = C::Item;
type Error = tonic::Status;
fn encode(
&mut self,
item: Self::Item,
dst: &mut tonic::codec::EncodeBuf<'_>,
) -> Result<(), Self::Error> {
if self.json {
Encoder::default().encode(item, dst)
} else {
self.fallback.encode(item, dst)
}
}
}
impl<C> tonic::codec::Decoder for MaybeJson<C>
where
C: tonic::codec::Decoder<Item: serde::de::DeserializeOwned, Error = tonic::Status>,
{
type Item = C::Item;
type Error = tonic::Status;
fn decode(
&mut self,
src: &mut tonic::codec::DecodeBuf<'_>,
) -> Result<Option<Self::Item>, Self::Error> {
if self.json {
Decoder::default().decode(src)
} else {
self.fallback.decode(src)
}
}
}

View File

@ -5,28 +5,45 @@
#![warn(clippy::unwrap_used)]
#[cfg(feature = "json-grpc-codec")]
pub mod json;
pub mod proto {
pub mod chat {
pub mod common {
tonic::include_proto!("org.signal.chat.common");
#[cfg(feature = "json-grpc-codec")]
tonic::include_proto!("org.signal.chat.common.serde");
}
pub mod errors {
tonic::include_proto!("org.signal.chat.errors");
#[cfg(feature = "json-grpc-codec")]
tonic::include_proto!("org.signal.chat.errors.serde");
}
pub mod account {
tonic::include_proto!("org.signal.chat.account");
#[cfg(feature = "json-grpc-codec")]
tonic::include_proto!("org.signal.chat.account.serde");
}
pub mod attachments {
tonic::include_proto!("org.signal.chat.attachments");
#[cfg(feature = "json-grpc-codec")]
tonic::include_proto!("org.signal.chat.attachments.serde");
}
pub mod backup {
tonic::include_proto!("org.signal.chat.backup");
#[cfg(feature = "json-grpc-codec")]
tonic::include_proto!("org.signal.chat.backup.serde");
}
pub mod device {
tonic::include_proto!("org.signal.chat.device");
#[cfg(feature = "json-grpc-codec")]
tonic::include_proto!("org.signal.chat.device.serde");
}
pub mod messages {
tonic::include_proto!("org.signal.chat.messages");
#[cfg(feature = "json-grpc-codec")]
tonic::include_proto!("org.signal.chat.messages.serde");
}
// Not actually a proto, we just make sure to generate our helper file in the same place.
@ -49,6 +66,11 @@ pub mod proto {
}
}
#[cfg(not(feature = "json-grpc-codec"))]
pub type Duration = prost_types::Duration;
#[cfg(feature = "json-grpc-codec")]
pub type Duration = pbjson_types::Duration;
impl From<libsignal_core::ServiceId> for proto::chat::common::ServiceIdentifier {
fn from(value: libsignal_core::ServiceId) -> Self {
let kind = match value.kind() {