provisioningServlet = new WebSocketResourceProviderFactory<>(
- provisioningEnvironment, AuthenticatedDevice.class, config.getWebSocketConfiguration(),
- RemoteAddressFilter.REMOTE_ADDRESS_ATTRIBUTE_NAME);
+ provisioningEnvironment, AuthenticatedDevice.class, RemoteAddressFilter.REMOTE_ADDRESS_ATTRIBUTE_NAME);
- ServletRegistration.Dynamic websocket = environment.servlets().addServlet("WebSocket", webSocketServlet);
- ServletRegistration.Dynamic provisioning = environment.servlets().addServlet("Provisioning", provisioningServlet);
+ JettyWebSocketServletContainerInitializer.configure(environment.getApplicationContext(),
+ (servletContext, container) -> {
+ container.addMapping(websocketServletPath, webSocketServlet);
+ container.addMapping(provisioningWebsocketServletPath, provisioningServlet);
- websocket.addMapping(websocketServletPath);
- websocket.setAsyncSupported(true);
+ PriorityFilter.ensureFilter(servletContext, new StripContentLengthOnConnectFilter());
+ PriorityFilter.ensureFilter(servletContext, new TimestampResponseFilter());
+ PriorityFilter.ensureFilter(servletContext, new RemoteAddressFilter());
+ PriorityFilter.ensureFilter(servletContext, remoteDeprecationFilter);
- provisioning.addMapping(provisioningWebsocketServletPath);
- provisioning.setAsyncSupported(true);
+ container.setMaxBinaryMessageSize(config.getWebSocketConfiguration().getMaxBinaryMessageSize());
+ container.setMaxTextMessageSize(config.getWebSocketConfiguration().getMaxTextMessageSize());
+
+ final WebSocketExtensionRegistry extensionRegistry = WebSocketServerComponents
+ .getWebSocketComponents(environment.getApplicationContext())
+ .getExtensionRegistry();
+ if (config.getWebSocketConfiguration().isDisablePerMessageDeflate()) {
+ extensionRegistry.unregister("permessage-deflate");
+ } else if (config.getWebSocketConfiguration().isDisableCrossMessageOutgoingCompression()) {
+ extensionRegistry.unregister("permessage-deflate");
+ extensionRegistry.register("permessage-deflate", NoContextTakeoverPerMessageDeflateExtension.class);
+ }
+ });
environment.admin().addTask(new SetRequestLoggingEnabledTask());
-
}
private void registerExceptionMappers(Environment environment,
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/auth/IdlePrimaryDeviceAuthenticatedWebSocketUpgradeFilter.java b/service/src/main/java/org/whispersystems/textsecuregcm/auth/IdlePrimaryDeviceAuthenticatedWebSocketUpgradeFilter.java
index 38ff7d8ef..e2853497c 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/auth/IdlePrimaryDeviceAuthenticatedWebSocketUpgradeFilter.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/auth/IdlePrimaryDeviceAuthenticatedWebSocketUpgradeFilter.java
@@ -10,10 +10,9 @@ import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.time.Clock;
import java.time.Duration;
-import java.time.Instant;
import java.util.Optional;
-import org.eclipse.jetty.websocket.server.JettyServerUpgradeRequest;
-import org.eclipse.jetty.websocket.server.JettyServerUpgradeResponse;
+import org.eclipse.jetty.ee10.websocket.server.JettyServerUpgradeRequest;
+import org.eclipse.jetty.ee10.websocket.server.JettyServerUpgradeResponse;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.websocket.auth.AuthenticatedWebSocketUpgradeFilter;
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java
index fb3c08742..295879b8f 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java
@@ -5,11 +5,33 @@
package org.whispersystems.textsecuregcm.configuration;
import jakarta.validation.constraints.NotNull;
+import java.time.Duration;
+
+/// Configuration for the gRPC Server
+///
+/// @param bindAddress The host to bind the omnibus server to
+/// @param port The port to bind the omnibus server to
+/// @param websocketAddress The address of a listening websocket server for handling legacy requests
+/// @param websocketPort The port of a listening websocket server for handling legacy requests
+/// @param idleTimeout The duration after which an idle connection may be disconnected
+/// @param h2c If true, listen for plaintext h2c with prior-knowledge
+public record GrpcConfiguration(
+ @NotNull String bindAddress,
+ @NotNull Integer port,
+ @NotNull String websocketAddress,
+ @NotNull Integer websocketPort,
+ @NotNull Duration idleTimeout,
+ boolean h2c) {
-public record GrpcConfiguration(@NotNull String bindAddress, @NotNull Integer port) {
public GrpcConfiguration {
if (bindAddress == null || bindAddress.isEmpty()) {
bindAddress = "localhost";
}
+ if (websocketAddress == null || websocketAddress.isEmpty()) {
+ websocketAddress = "localhost";
+ }
+ if (idleTimeout == null) {
+ idleTimeout = Duration.ofMinutes(5);
+ }
}
}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java
index 1bd2c1d68..c38912de8 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java
@@ -7,6 +7,9 @@ package org.whispersystems.textsecuregcm.configuration;
import jakarta.validation.constraints.NotNull;
import org.whispersystems.textsecuregcm.configuration.secrets.SecretString;
+import javax.annotation.Nullable;
-public record TlsKeyStoreConfiguration(@NotNull SecretString password) {
+public record TlsKeyStoreConfiguration(
+ @Nullable String path,
+ @NotNull SecretString password) {
}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/filters/PriorityFilter.java b/service/src/main/java/org/whispersystems/textsecuregcm/filters/PriorityFilter.java
new file mode 100644
index 000000000..03070cf00
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/filters/PriorityFilter.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2025 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+package org.whispersystems.textsecuregcm.filters;
+
+import jakarta.servlet.DispatcherType;
+import jakarta.servlet.Filter;
+import jakarta.servlet.ServletContext;
+import java.util.EnumSet;
+import java.util.Objects;
+import org.eclipse.jetty.ee10.servlet.FilterHolder;
+import org.eclipse.jetty.ee10.servlet.FilterMapping;
+import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
+import org.eclipse.jetty.ee10.servlet.ServletHandler;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.util.component.LifeCycle;
+
+public class PriorityFilter {
+
+ private PriorityFilter() {}
+
+ private static FilterHolder getFilter(ServletContext servletContext, final Class extends Filter> filterClass) {
+ final ContextHandler contextHandler = Objects.requireNonNull(ServletContextHandler.getServletContextHandler(servletContext));
+ final ServletHandler servletHandler = contextHandler.getDescendant(ServletHandler.class);
+ return servletHandler.getFilter(filterClass.getName());
+ }
+
+ /**
+ * Ensure a filter is available on the provided ServletContext, a new filter will added if one does not already
+ * exist.
+ *
+ * If a new filter is added, it will be added before all other filters.
+ *
+ * Modeled after {@link org.eclipse.jetty.ee10.websocket.servlet.WebSocketUpgradeFilter#ensureFilter(ServletContext)},
+ * since its use of {@link org.eclipse.jetty.ee10.servlet.ServletHandler#prependFilter(FilterHolder)} is what makes
+ * this necessary.
+ */
+ public static void ensureFilter(final ServletContext servletContext, final Filter filter) {
+ FilterHolder existingFilter = getFilter(servletContext, filter.getClass());
+ if (existingFilter != null) {
+ return;
+ }
+
+ final ContextHandler contextHandler = ServletContextHandler.getServletContextHandler(servletContext);
+ final ServletHandler servletHandler = contextHandler.getDescendant(ServletHandler.class);
+
+ final String pathSpec = "/*";
+ final FilterHolder holder = new FilterHolder(filter);
+ holder.setName(filter.getClass().getName());
+ holder.setAsyncSupported(true);
+
+ final FilterMapping mapping = new FilterMapping();
+ mapping.setFilterName(holder.getName());
+ mapping.setPathSpec(pathSpec);
+ mapping.setDispatcherTypes(EnumSet.of(DispatcherType.REQUEST));
+
+ // Add as the first filter in the list.
+ servletHandler.prependFilter(holder);
+ servletHandler.prependFilterMapping(mapping);
+
+ // If we create the filter we must also make sure it is removed if the context is stopped.
+ contextHandler.addEventListener(new LifeCycle.Listener()
+ {
+ @Override
+ public void lifeCycleStopping(LifeCycle event)
+ {
+ servletHandler.removeFilterHolder(holder);
+ servletHandler.removeFilterMapping(mapping);
+ contextHandler.removeEventListener(this);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%sCleanupListener", filter.getClass().getSimpleName());
+ }
+ });
+
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/filters/RemoteAddressFilter.java b/service/src/main/java/org/whispersystems/textsecuregcm/filters/RemoteAddressFilter.java
index c1ea020b6..3b8090e66 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/filters/RemoteAddressFilter.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/filters/RemoteAddressFilter.java
@@ -26,10 +26,6 @@ public class RemoteAddressFilter implements Filter {
public static final String REMOTE_ADDRESS_ATTRIBUTE_NAME = RemoteAddressFilter.class.getName() + ".remoteAddress";
private static final Logger logger = LoggerFactory.getLogger(RemoteAddressFilter.class);
-
- public RemoteAddressFilter() {
- }
-
@Override
public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain)
throws ServletException, IOException {
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/filters/StripContentLengthOnConnectFilter.java b/service/src/main/java/org/whispersystems/textsecuregcm/filters/StripContentLengthOnConnectFilter.java
new file mode 100644
index 000000000..9265f8c46
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/filters/StripContentLengthOnConnectFilter.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.filters;
+
+import jakarta.servlet.Filter;
+import jakarta.servlet.FilterChain;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.ServletRequest;
+import jakarta.servlet.ServletResponse;
+import jakarta.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.eclipse.jetty.ee10.servlet.ServletContextRequest;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.server.HttpStream;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.util.Callback;
+
+/// Our current version of jetty (12.1.5) has a bug where it includes content-length:0 on
+/// CONNECT websocket upgrade requests. Providing an HTTP/2 header frame with a
+/// content-length that does not match the sum of the lengths of the data frames is technically
+/// a malformed HTTP/2 stream and our netty-based reverse proxy implementation rejects it. This
+/// filter strips out the superfluous content-length at stream-send time. It can be removed once
+/// we update to a jetty version that fixes [jetty/jetty.project#15074](https://github.com/jetty/jetty.project/issues/15074)
+public class StripContentLengthOnConnectFilter implements Filter {
+
+ @Override
+ public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain)
+ throws IOException, ServletException {
+ if (request instanceof HttpServletRequest hsr &&
+ HttpVersion.HTTP_2.is(hsr.getProtocol()) &&
+ HttpMethod.CONNECT.is(hsr.getMethod())) {
+ final Request coreRequest = ServletContextRequest.getServletContextRequest(hsr);
+ if (coreRequest != null) {
+ coreRequest.addHttpStreamWrapper(StripContentLengthStream::new);
+ }
+ }
+ chain.doFilter(request, response);
+ }
+
+ private static class StripContentLengthStream extends HttpStream.Wrapper {
+
+ StripContentLengthStream(final HttpStream wrapped) {
+ super(wrapped);
+ }
+
+ @Override
+ public void send(MetaData.Request request, MetaData.Response response, boolean last, ByteBuffer content,
+ Callback callback) {
+ if (response != null && response.getStatus() == 200 && response.getHttpFields()
+ .contains(HttpHeader.CONTENT_LENGTH)) {
+ final HttpFields fieldsWithoutContentLengthHeader =
+ HttpFields.build(response.getHttpFields()).remove(HttpHeader.CONTENT_LENGTH);
+ response = new MetaData.Response(
+ response.getStatus(),
+ response.getReason(),
+ response.getHttpVersion(),
+ fieldsWithoutContentLengthHeader,
+ -1,
+ response.getTrailersSupplier());
+ }
+ super.send(request, response, last, content, callback);
+ }
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java
index eeeab6684..45fd52943 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java
@@ -48,6 +48,8 @@ public class RequestAttributesInterceptor implements ServerInterceptor {
final String acceptLanguageHeader = headers.get(ACCEPT_LANG_KEY);
final String xForwardedForHeader = headers.get(X_FORWARDED_FOR_KEY);
+ // This assumes that X-Forwarded-For has been set by a trusted intermediate proxy. For example, this may be set by
+ // OmnibusH2Server which itself sets X-Forwarded-For using a PPv2 header that comes from a trusted load-balancer.
final Optional remoteAddress = getMostRecentProxy(xForwardedForHeader)
.flatMap(mostRecentProxy -> {
try {
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandler.java
new file mode 100644
index 000000000..e4167a4d4
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import io.micrometer.core.instrument.Metrics;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http2.Http2StreamFrame;
+import io.netty.util.ReferenceCountUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
+
+/// Writes all inbound H2 frames to [this#peerStream], renumbering the inbound H2 stream-id for peer H2 stream
+public class H2FrameProxyHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger logger = LoggerFactory.getLogger(H2FrameProxyHandler.class);
+ private static final String WRITABILITY_CHANGED_COUNTER_NAME = MetricsUtil.name(H2FrameProxyHandler.class, "writabilityChanged");
+
+ private final Channel peerStream;
+ private final String proxyNameTag;
+
+ // If we fail to write to the peerStream, we want to close the inbound channel. Rather than allocate a new listener
+ // that captures the inbound ChannelHandlerContext on every message, we capture the ChannelHandlerContext in
+ // handlerAdded and use it on all forwarded writes. This would not work if we attached this handler to more than
+ // one channel, but we already have a designated peerStream so this handler is fundamentally single-channel.
+ private ChannelFutureListener closeInboundOnPeerFailure = null;
+
+ public H2FrameProxyHandler(final Channel peerStream, final String proxyNameTag) {
+ this.peerStream = peerStream;
+ this.proxyNameTag = proxyNameTag;
+ }
+
+ @Override
+ public void handlerAdded(final ChannelHandlerContext ctx) {
+ closeInboundOnPeerFailure = f -> {
+ if (!f.isSuccess()) {
+ ctx.close();
+ }
+ };
+
+ // When the peer stream we are forwarding to becomes unwritable/writable, stop/start reading from the source stream.
+ // This prevents us from reading from the source stream as fast as we can just to buffer requests for the peer
+ // stream.
+ peerStream.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelWritabilityChanged(final ChannelHandlerContext peerCtx) throws Exception {
+ Metrics.counter(WRITABILITY_CHANGED_COUNTER_NAME,
+ "isWritable", Boolean.toString(peerCtx.channel().isWritable()),
+ "proxy", proxyNameTag)
+ .increment();
+ ctx.channel().config().setAutoRead(peerStream.isWritable());
+ super.channelWritabilityChanged(peerCtx);
+ }
+ });
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ logger.trace("Received frame {}", msg);
+ if (!(msg instanceof Http2StreamFrame streamFrame)) {
+ logger.error("Received unexpected frame {}", msg);
+ ReferenceCountUtil.release(msg);
+ ctx.close();
+ return;
+ }
+
+ // Clear the stream-id on this frame, so netty will associate it with the peerStream's stream-id. The inbound
+ // frame has a stream-id associated with the inbound connection. This will not match the stream-id of the peer
+ // stream we are forwarding the frames to. If the stream-id on a frame is not set, netty handles sending the
+ // stream-id on the frame to the target stream's stream-id.
+ streamFrame.stream(null);
+
+ peerStream.writeAndFlush(streamFrame).addListener(closeInboundOnPeerFailure);
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) {
+ peerStream.close();
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ logger.warn("Exception proxying frames", cause);
+ ctx.close();
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedEventLoopGroup.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedEventLoopGroup.java
new file mode 100644
index 000000000..5c77b5df3
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedEventLoopGroup.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import io.dropwizard.lifecycle.Managed;
+import io.netty.channel.EventLoopGroup;
+
+/**
+ * A wrapper for a Netty {@link EventLoopGroup} that implements Dropwizard's {@link Managed} interface, allowing
+ * Dropwizard to manage the lifecycle of the event loop group.
+ */
+public class ManagedEventLoopGroup implements Managed {
+
+ private final T eventLoopGroup;
+
+ public ManagedEventLoopGroup(final T eventLoopGroup) {
+ this.eventLoopGroup = eventLoopGroup;
+ }
+
+ @Override
+ public void stop() throws Exception {
+ this.eventLoopGroup.shutdownGracefully().await();
+ }
+
+ public T getEventLoopGroup() {
+ return eventLoopGroup;
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedNioEventLoopGroup.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedNioEventLoopGroup.java
deleted file mode 100644
index 06d3e97db..000000000
--- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedNioEventLoopGroup.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.whispersystems.textsecuregcm.grpc.net;
-
-import io.dropwizard.lifecycle.Managed;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-/**
- * A wrapper for a Netty {@link NioEventLoopGroup} that implements Dropwizard's {@link Managed} interface, allowing
- * Dropwizard to manage the lifecycle of the event loop group.
- */
-public class ManagedNioEventLoopGroup extends NioEventLoopGroup implements Managed {
-
- @Override
- public void stop() throws Exception {
- this.shutdownGracefully().await();
- }
-}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusConnectionCounterHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusConnectionCounterHandler.java
new file mode 100644
index 000000000..ef4199a93
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusConnectionCounterHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Metrics;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
+import java.util.concurrent.atomic.AtomicLong;
+
+@ChannelHandler.Sharable
+public class OmnibusConnectionCounterHandler extends ChannelInboundHandlerAdapter {
+ private final AtomicLong openConnections;
+ private final Counter acceptedConnectionsCounter =
+ Metrics.counter(MetricsUtil.name(OmnibusConnectionCounterHandler.class, "connectionsAccepted"));
+
+ public OmnibusConnectionCounterHandler() {
+ openConnections =
+ Metrics.gauge(MetricsUtil.name(OmnibusConnectionCounterHandler.class, "openConnections"), new AtomicLong());
+ }
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ acceptedConnectionsCounter.increment();
+ openConnections.incrementAndGet();
+ super.channelRegistered(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ openConnections.decrementAndGet();
+ super.channelInactive(ctx);
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusExceptionHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusExceptionHandler.java
new file mode 100644
index 000000000..325b52c9b
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusExceptionHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import io.micrometer.core.instrument.Metrics;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
+
+/// A handler that closes the channel on an exception and records errors in a counter. This should be placed at the tail
+/// of pipelines to catch uncaught exceptions gracefully
+@ChannelHandler.Sharable
+public class OmnibusExceptionHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger logger = LoggerFactory.getLogger(OmnibusExceptionHandler.class);
+
+ private static final String UNCAUGHT_EXCEPTION_COUNTER_NAME = MetricsUtil.name(OmnibusExceptionHandler.class,
+ "uncaughtException");
+
+ private final String channelName;
+ private final List> expectedExceptions;
+
+ public OmnibusExceptionHandler(final String channelName, final List> expectedExceptions) {
+ this.channelName = channelName;
+ this.expectedExceptions = expectedExceptions;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ Metrics.counter(UNCAUGHT_EXCEPTION_COUNTER_NAME,
+ "channelName", channelName,
+ "exceptionClass", cause.getClass().getSimpleName())
+ .increment();
+
+ // There are 'expected' ways to get exceptions on a channel (e.g. client disconnects) so we only log them at debug.
+ if (expectedException(cause)) {
+ logger.debug("uncaught exception on channel {}", channelName, cause);
+ } else {
+ logger.warn("unexpected uncaught exception on channel {}", channelName, cause);
+ }
+ ctx.close();
+ }
+
+ private boolean expectedException(final Throwable exception) {
+ return expectedExceptions
+ .stream()
+ .anyMatch(expectedException -> expectedException.isInstance(exception));
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2Server.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2Server.java
new file mode 100644
index 000000000..7bfa7ef95
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2Server.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.dropwizard.lifecycle.Managed;
+import io.micrometer.core.instrument.Metrics;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufAllocatorMetricProvider;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.SimpleUserEventChannelHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.DecoderException;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
+import io.netty.handler.ssl.SniHandler;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.Mapping;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
+
+/// An HTTP/2 server that proxies H2 streams to configurable backends via path-based routing
+public class OmnibusH2Server implements Managed {
+
+ private static final Logger logger = LoggerFactory.getLogger(OmnibusH2Server.class);
+
+ private static final OmnibusConnectionCounterHandler CONNECT_COUNTER = new OmnibusConnectionCounterHandler();
+ private static final OmnibusExceptionHandler HANDSHAKE_EXCEPTION_HANDLER =
+ new OmnibusExceptionHandler("omnibus-handshake", List.of(SocketException.class, DecoderException.class, IOException.class));
+ private static final OmnibusExceptionHandler SESSION_EXCEPTION_HANDLER =
+ new OmnibusExceptionHandler("omnibus-session", List.of(Http2Exception.class));
+ private static final String IDLE_DISCONNECT_COUNTER_NAME = MetricsUtil.name(OmnibusH2Server.class, "idleDisconnect");
+
+ private final @Nullable Mapping sslContextBySni;
+ private final OmnibusRouter router;
+ private final Duration idleTimeout;
+ private final DefaultEventLoopGroup localEventLoopGroup;
+ private final NioEventLoopGroup nioEventLoopGroup;
+ private final SocketAddress bindAddress;
+
+ private Channel serverChannel;
+
+ /// Create an omnibus server
+ ///
+ /// @param sslContextBySni If not null, a mapping between domain (SNI) and the appropriate SslContext to use for
+ /// that SNI. If null, the server will not include TLS (h2c with prior-knowledge)
+ /// @param nioEventLoopGroup Event loop to use for all NIO channel pipelines
+ /// @param localEventLoopGroup Event loop to use for all local channel pipelines
+ /// @param bindAddress The address the server should listen on
+ /// @param router How the server should select backends based on request paths
+ public OmnibusH2Server(
+ final @Nullable Mapping sslContextBySni,
+ final NioEventLoopGroup nioEventLoopGroup,
+ final DefaultEventLoopGroup localEventLoopGroup,
+ final SocketAddress bindAddress,
+ final OmnibusRouter router,
+ final Duration idleTimeout) {
+ this.sslContextBySni = sslContextBySni;
+ this.nioEventLoopGroup = nioEventLoopGroup;
+ this.localEventLoopGroup = localEventLoopGroup;
+ this.bindAddress = bindAddress;
+ this.router = router;
+ this.idleTimeout = idleTimeout;
+ }
+
+ @Override
+ public void start() throws Exception {
+ if (this.sslContextBySni == null) {
+ logger.warn("No SSL configuration provided for OmnibusH2Server, serving h2c");
+ }
+
+ if (ByteBufAllocator.DEFAULT instanceof ByteBufAllocatorMetricProvider alloc) {
+ Metrics.gauge(MetricsUtil.name(OmnibusH2Server.class, "nettyUsedDirectMemory"),
+ alloc,
+ allocator -> allocator.metric().usedDirectMemory());
+ Metrics.gauge(MetricsUtil.name(OmnibusH2Server.class, "nettyUsedHeapMemory"),
+ alloc,
+ allocator -> allocator.metric().usedHeapMemory());
+ }
+
+ final ServerBootstrap bootstrap = new ServerBootstrap()
+ .group(nioEventLoopGroup)
+ .channel(NioServerSocketChannel.class)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(final SocketChannel ch) {
+ ch.pipeline().addLast(new IdleStateHandler(0, 0, idleTimeout.toMillis(), TimeUnit.MILLISECONDS));
+ ch.pipeline().addLast(new SimpleUserEventChannelHandler() {
+ @Override
+ protected void eventReceived(final ChannelHandlerContext ctx, final IdleStateEvent evt) {
+ Metrics.counter(IDLE_DISCONNECT_COUNTER_NAME, "type", evt.state().name()).increment();
+ ctx.close();
+ }
+ });
+ ch.pipeline().addLast(CONNECT_COUNTER);
+ ch.pipeline().addLast(new ProxyProtocolHandler());
+ ch.pipeline().addLast(new ProxyMessageAttributeSetterHandler());
+ if (sslContextBySni == null) {
+ configureH2Pipeline(ch.pipeline());
+ } else {
+ ch.pipeline().addLast(new SniHandler(sslContextBySni));
+ ch.pipeline().addLast(new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) {
+ @Override
+ protected void configurePipeline(final ChannelHandlerContext ctx, final String protocol) {
+ if (!ApplicationProtocolNames.HTTP_2.equals(protocol)) {
+ // HTTP/2 should be enforced by our ALPN settings
+ logger.error("Unsupported protocol negotiated: {}, closing connection", protocol);
+ ctx.close();
+ return;
+ }
+ configureH2Pipeline(ctx.pipeline());
+ }
+ });
+ ch.pipeline().addLast(HANDSHAKE_EXCEPTION_HANDLER);
+ }
+ }
+ });
+
+ serverChannel = bootstrap.bind(bindAddress).sync().channel();
+ logger.info("Omnibus server listening on {}", getLocalAddress());
+ }
+
+ @VisibleForTesting
+ InetSocketAddress getLocalAddress() {
+ return (InetSocketAddress) serverChannel.localAddress();
+ }
+
+ @Override
+ public void stop() {
+ if (serverChannel != null) {
+ logger.info("Stopping omnibus server");
+ serverChannel.close().syncUninterruptibly();
+ logger.info("Omnibus server stopped");
+ }
+ }
+
+ private void configureH2Pipeline(final ChannelPipeline pipeline) {
+ // Advertise support for RFC-8441 extended connect
+ final Http2Settings settings = Http2Settings.defaultSettings().connectProtocolEnabled(true);
+ pipeline.addLast(Http2FrameCodecBuilder.forServer().initialSettings(settings).build());
+ pipeline.addLast(new Http2MultiplexHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(final Http2StreamChannel ch) {
+ ch.pipeline().addLast(new OmnibusH2StreamHandler(nioEventLoopGroup, localEventLoopGroup, router));
+ }
+ }));
+ pipeline.addLast(SESSION_EXCEPTION_HANDLER);
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2StreamHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2StreamHandler.java
new file mode 100644
index 000000000..81c4cc53e
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2StreamHandler.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
+
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.local.LocalChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/// Handler added on each newly created [Http2StreamChannel] on an H2 connection. Inspects the [Http2HeadersFrame] and
+/// determines which backend to forward the stream to, and then proxies frames to and from the backend.
+///
+/// When this handler receives an H2 header for a new stream-id=X on our parent H2 connection it will
+/// - Receive the stream-id=X header and check the path to determine the correct backend
+/// - Make a new H2 connection to the backend
+/// - Forward the header with stream-id=Y to the backend
+/// - Install a [H2FrameProxyHandler] on the backend stream pipeline that forwards the received stream-id=Y frames from
+/// the backend back to the client on stream-id=X
+/// - Install a [H2FrameProxyHandler] on the client stream pipeline forwards the received stream-id=X frames from the
+/// client to the backend on stream-id=Y
+public class OmnibusH2StreamHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger logger = LoggerFactory.getLogger(OmnibusH2StreamHandler.class);
+
+ private static final OmnibusExceptionHandler BACKEND_CONNECTION_EXCEPTION_HANDLER =
+ new OmnibusExceptionHandler("backend-connection", List.of());
+ private static final String BACKEND_STREAM_COUNTER_NAME = name(OmnibusH2StreamHandler.class, "backendStream");
+ private static final String BACKEND_CONNECT_DURATION_NAME = name(OmnibusH2StreamHandler.class,
+ "backendConnectDuration");
+ private static final String BACKEND_TAG = "backend";
+
+ private final OmnibusRouter router;
+
+ private final DefaultEventLoopGroup localEventLoopGroup;
+ private final NioEventLoopGroup nioEventLoopGroup;
+
+ public OmnibusH2StreamHandler(
+ final NioEventLoopGroup nioEventLoopGroup,
+ final DefaultEventLoopGroup localEventLoopGroup,
+ final OmnibusRouter router) {
+ this.router = router;
+ this.localEventLoopGroup = localEventLoopGroup;
+ this.nioEventLoopGroup = nioEventLoopGroup;
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ if (!(msg instanceof Http2HeadersFrame headersFrame)) {
+ logger.warn("Expected initial HEADERS frame but got {}", msg.getClass().getSimpleName());
+ ReferenceCountUtil.release(msg);
+ ctx.close();
+ return;
+ }
+
+ // We don't expect headers frames to come with manually managed memory attached. Assert this in case this changes
+ // in the future, but for now we don't have to worry about freeing the headers frame
+ assert !(headersFrame instanceof ReferenceCounted);
+
+ // Disable reading from the client because we want to wait until we make the backend connection and install the
+ // forwarding handler before processing any more frames.
+ ctx.channel().config().setAutoRead(false);
+
+ // Select the target backend based on the path
+ final String path = Optional.ofNullable(headersFrame.headers().path()).map(CharSequence::toString).orElse("");
+ final SocketAddress target = router.match(path);
+ final String backendTag = target.toString();
+
+ Metrics.counter(BACKEND_STREAM_COUNTER_NAME, BACKEND_TAG, backendTag).increment();
+
+ // Set X-Forwarded-For from the PROXY protocol header if present, otherwise via the remote address
+ final InetAddress proxyRemoteAddress = ctx.channel().parent()
+ .attr(ProxyMessageAttributeSetterHandler.PROXY_REMOTE_ADDRESS)
+ .get();
+ headersFrame.headers().set("x-forwarded-for", proxyRemoteAddress != null
+ ? proxyRemoteAddress.getHostAddress()
+ : ((InetSocketAddress) ctx.channel().remoteAddress()).getHostString());
+
+ // Make a new H2 connection to the target backend
+ final Timer.Sample connectSample = Timer.start();
+ new Bootstrap()
+ .group(selectEventLoop(ctx, target))
+ .channel(target instanceof LocalAddress ? LocalChannel.class : NioSocketChannel.class)
+ .handler(new ChannelInitializer<>() {
+ @Override
+ protected void initChannel(final Channel ch) {
+ ch.pipeline()
+ .addLast(Http2FrameCodecBuilder.forClient().initialSettings(Http2Settings.defaultSettings()).build());
+ // Http2MultiplexHandler takes handler that is added to new inbound streams. A client Http2MultiplexHandler
+ // like we're defining here should never receive an inbound H2 stream so we can just pass a noop handler
+ ch.pipeline().addLast(new Http2MultiplexHandler(new NoopInboundStreamHandler()));
+ ch.pipeline().addLast(BACKEND_CONNECTION_EXCEPTION_HANDLER);
+ }
+ })
+ .connect(target)
+ .addListener((ChannelFuture connectFuture) -> {
+ connectSample.stop(Timer.builder(BACKEND_CONNECT_DURATION_NAME)
+ .tag(BACKEND_TAG, backendTag)
+ .tag("outcome", connectFuture.isSuccess() ? "success" : "failure")
+ .register(Metrics.globalRegistry));
+
+ if (!connectFuture.isSuccess()) {
+ // Close the client stream with a 502: Bad Gateway if the backend wasn't available
+ logger.warn("Failed to connect to backend {}", target, connectFuture.cause());
+ ctx.channel()
+ .writeAndFlush(new DefaultHttp2HeadersFrame(
+ new DefaultHttp2Headers().status("502"), true))
+ .addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+
+ // Connected, open a new H2 stream to the backend so we can proxy the client's frames
+ logger.trace("Opening a HTTP/2 stream to the backend {}", target);
+ final Channel backendConnection = connectFuture.channel();
+ createBackendProxyStream(ctx, backendConnection, headersFrame);
+ });
+ }
+
+ /// Create a proxy stream on the provided `backendConnection` that forwards H2 frames to/from the client H2 stream.
+ ///
+ /// @param clientStreamCtx The context for a client H2 stream that targets the backend
+ /// @param backendConnection An established H2 connection [Channel], on which a new h2 stream will be opened
+ /// @param headersFrame The first `headersFrame` from the client h2 stream that should be forwarded to the new
+ /// backend stream
+ private void createBackendProxyStream(
+ final ChannelHandlerContext clientStreamCtx,
+ final Channel backendConnection,
+ final Http2HeadersFrame headersFrame) {
+ new Http2StreamChannelBootstrap(backendConnection)
+ // Forwards response frames from the backend back to the client stream
+ .handler(new H2FrameProxyHandler(clientStreamCtx.channel(), "responseStream"))
+ .open()
+ .addListener((io.netty.util.concurrent.Future streamFuture) -> {
+ if (!streamFuture.isSuccess()) {
+ logger.warn("Failed to open backend stream", streamFuture.cause());
+ clientStreamCtx.channel()
+ .writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.INTERNAL_ERROR))
+ .addListener(ChannelFutureListener.CLOSE);
+ backendConnection.close();
+ return;
+ }
+
+ final Http2StreamChannel backendStream = streamFuture.getNow();
+
+ // Close the entire H2 connection whenever the stream we just opened closes. We only plan on using
+ // a single stream on this connection.
+ backendStream.closeFuture().addListener(_ -> backendConnection.close());
+
+ // We're going to modify the inbound H2 stream channel, which runs on a different eventloop than the
+ // outbound channel we've made to the backend. We have to submit our updates back to the inbound
+ // channel's event loop for thread safety
+ clientStreamCtx.channel().eventLoop().execute(() -> {
+
+ if (!clientStreamCtx.channel().isActive()) {
+ // The client disconnected already and the client pipeline is already torn down.
+ backendConnection.close();
+ return;
+ }
+
+ // Install proxy on client stream, remove this handler, then fire the buffered headers through the proxy
+ clientStreamCtx.pipeline().replace(
+ OmnibusH2StreamHandler.this,
+ "backend-to-client-proxy",
+ new H2FrameProxyHandler(backendStream, "requestStream"));
+ clientStreamCtx.channel().pipeline().fireChannelRead(headersFrame);
+
+ // Resume inbound reads, which should now be forwarded
+ clientStreamCtx.channel().config().setAutoRead(true);
+ });
+ });
+ }
+
+ private EventLoopGroup selectEventLoop(final ChannelHandlerContext inboundCtx, SocketAddress target) {
+ final boolean localInbound = inboundCtx.channel() instanceof LocalChannel;
+ final boolean localTarget = target instanceof LocalAddress;
+
+ // If the inbound eventloop matches the target type, we can just reuse the inbound's event loop
+ if (localInbound == localTarget) {
+ return inboundCtx.channel().eventLoop();
+ }
+
+ return localTarget ? this.localEventLoopGroup : this.nioEventLoopGroup;
+ }
+
+ private static class NoopInboundStreamHandler extends ChannelInboundHandlerAdapter {
+ @Override
+ public void channelRegistered(final ChannelHandlerContext ctx) {
+ logger.error("Inbound stream handler was registered when no inbound streams expected");
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ logger.error("Received unexpected message: {} on inbound stream from backend", msg);
+ super.channelRead(ctx, msg);
+ }
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusRouter.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusRouter.java
new file mode 100644
index 000000000..1365a0c2b
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusRouter.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+public class OmnibusRouter {
+
+ public record OmnibusRoute(String prefix, SocketAddress backend) {}
+
+ private final List prefixRoutes;
+ private final SocketAddress defaultBackend;
+
+ public OmnibusRouter(final List prefixRoutes, final SocketAddress defaultBackend) {
+ this.prefixRoutes = prefixRoutes;
+ this.defaultBackend = defaultBackend;
+ }
+
+ SocketAddress match(final String path) {
+ for (final OmnibusRoute route : prefixRoutes) {
+ if (path.startsWith(route.prefix)) {
+ return route.backend;
+ }
+ }
+ return defaultBackend;
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyMessageAttributeSetterHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyMessageAttributeSetterHandler.java
new file mode 100644
index 000000000..97d13e692
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyMessageAttributeSetterHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.util.AttributeKey;
+import java.net.InetAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/// Reads the decoded [HAProxyMessage], stores the source address as a channel attribute, and removes itself.
+class ProxyMessageAttributeSetterHandler extends ChannelInboundHandlerAdapter {
+ private static final Logger logger = LoggerFactory.getLogger(ProxyMessageAttributeSetterHandler.class);
+
+ /// Attribute for the remote address extracted from a proxy protocol header
+ static final AttributeKey PROXY_REMOTE_ADDRESS = AttributeKey.newInstance("proxyRemoteAddress");
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+ if (!(msg instanceof HAProxyMessage proxyMessage)) {
+ ctx.pipeline().remove(this);
+ ctx.fireChannelRead(msg);
+ return;
+ }
+
+ try {
+ final String sourceAddress = proxyMessage.sourceAddress();
+ if (sourceAddress != null) {
+ ctx.channel().attr(PROXY_REMOTE_ADDRESS).set(InetAddress.getByName(sourceAddress));
+ } else {
+ logger.warn("PROXY protocol message has no source address");
+ }
+ } finally {
+ proxyMessage.release();
+ ctx.pipeline().remove(this);
+ }
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandler.java
new file mode 100644
index 000000000..7b4b3b3c3
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
+
+import io.micrometer.core.instrument.Metrics;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.ProtocolDetectionResult;
+import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import java.util.List;
+
+class ProxyProtocolHandler extends ByteToMessageDecoder {
+
+ private static final String PROXY_PROTOCOL_DETECTED_NAME =
+ name(ProxyProtocolHandler.class, "proxyProtocol");
+
+ @Override
+ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List