Skip to content

Commit

Permalink
Enable strict concurrency checking for NIOHTTP1 (#3115)
Browse files Browse the repository at this point in the history
### Motivation:

To ensure NIOHTTP1 concurrency safety.

### Modifications:

* Enable strict concurrency checking in the package manifest.
* Mark several objects `Sendable` with `@preconcurrency` annotations
where they are returned in futures which may execute in arbitrary
concurrency domains.
  * `NIOTypedHTTPClientProtocolUpgrader`
  * `NIOTypedHTTPClientUpgradeConfiguration`
  * `NIOUpgradableHTTPServerPipelineConfiguration`
  * `NIOUpgradableHTTPClientPipelineConfiguration`
  * `NIOTypedHTTPServerProtocolUpgrader`
  * `NIOTypedHTTPServerUpgradeConfiguration`
* Mark handlers as explicitly not sendable
  * `NIOTypedHTTPClientUpgradeHandler`
  * `NIOTypedHTTPServerUpgradeHandler`
* Added new Sendable type aliases:
  * `NIOHTTPClientUpgradeSendableConfiguration`
  * `NIOHTTPServerUpgradeSendableConfiguration`

### Result:

No more concurrency warnings. Builds will warn and CI will fail if
regressions are introduced.
  • Loading branch information
rnro authored Feb 19, 2025
1 parent f46998c commit 5f60cee
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 131 deletions.
3 changes: 2 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ let package = Package(
"NIOConcurrencyHelpers",
"CNIOLLHTTP",
swiftCollections,
]
],
swiftSettings: strictConcurrencySettings
),
.target(
name: "NIOWebSocket",
Expand Down
28 changes: 20 additions & 8 deletions Sources/NIOHTTP1/HTTPPipelineSetup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public typealias NIOHTTPClientUpgradeConfiguration = (
upgraders: [NIOHTTPClientProtocolUpgrader], completionHandler: @Sendable (ChannelHandlerContext) -> Void
)

public typealias NIOHTTPClientUpgradeSendableConfiguration = (
upgraders: [NIOHTTPClientProtocolUpgrader & Sendable], completionHandler: @Sendable (ChannelHandlerContext) -> Void
)

/// Configuration required to configure a HTTP server pipeline for upgrade.
///
/// See the documentation for `HTTPServerUpgradeHandler` for details on these
Expand All @@ -33,6 +37,10 @@ public typealias NIOHTTPServerUpgradeConfiguration = (
upgraders: [HTTPServerProtocolUpgrader], completionHandler: @Sendable (ChannelHandlerContext) -> Void
)

public typealias NIOHTTPServerUpgradeSendableConfiguration = (
upgraders: [HTTPServerProtocolUpgrader & Sendable], completionHandler: @Sendable (ChannelHandlerContext) -> Void
)

extension ChannelPipeline {
/// Configure a `ChannelPipeline` for use as a HTTP client.
///
Expand Down Expand Up @@ -67,7 +75,7 @@ extension ChannelPipeline {
public func addHTTPClientHandlers(
position: Position = .last,
leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes,
withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration?
withClientUpgrade upgrade: NIOHTTPClientUpgradeSendableConfiguration?
) -> EventLoopFuture<Void> {
self._addHTTPClientHandlers(
position: position,
Expand All @@ -79,7 +87,7 @@ extension ChannelPipeline {
private func _addHTTPClientHandlers(
position: Position = .last,
leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes,
withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration?
withClientUpgrade upgrade: NIOHTTPClientUpgradeSendableConfiguration?
) -> EventLoopFuture<Void> {
let future: EventLoopFuture<Void>

Expand Down Expand Up @@ -120,11 +128,12 @@ extension ChannelPipeline {
/// the upgrade completion handler. See the documentation on ``NIOHTTPClientUpgradeHandler``
/// for more details.
/// - Returns: An `EventLoopFuture` that will fire when the pipeline is configured.
@preconcurrency
public func addHTTPClientHandlers(
position: Position = .last,
leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes,
enableOutboundHeaderValidation: Bool = true,
withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil
withClientUpgrade upgrade: NIOHTTPClientUpgradeSendableConfiguration? = nil
) -> EventLoopFuture<Void> {
let future: EventLoopFuture<Void>

Expand Down Expand Up @@ -168,12 +177,13 @@ extension ChannelPipeline {
/// the upgrade completion handler. See the documentation on ``NIOHTTPClientUpgradeHandler``
/// for more details.
/// - Returns: An `EventLoopFuture` that will fire when the pipeline is configured.
@preconcurrency
public func addHTTPClientHandlers(
position: Position = .last,
leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes,
enableOutboundHeaderValidation: Bool = true,
encoderConfiguration: HTTPRequestEncoder.Configuration = .init(),
withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil
withClientUpgrade upgrade: NIOHTTPClientUpgradeSendableConfiguration? = nil
) -> EventLoopFuture<Void> {
let future: EventLoopFuture<Void>

Expand Down Expand Up @@ -234,7 +244,7 @@ extension ChannelPipeline {
public func configureHTTPServerPipeline(
position: ChannelPipeline.Position = .last,
withPipeliningAssistance pipelining: Bool = true,
withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil,
withServerUpgrade upgrade: NIOHTTPServerUpgradeSendableConfiguration? = nil,
withErrorHandling errorHandling: Bool = true
) -> EventLoopFuture<Void> {
self._configureHTTPServerPipeline(
Expand Down Expand Up @@ -274,10 +284,11 @@ extension ChannelPipeline {
/// - headerValidation: Whether to validate outbound request headers to confirm that they meet
/// spec compliance. Defaults to `true`.
/// - Returns: An `EventLoopFuture` that will fire when the pipeline is configured.
@preconcurrency
public func configureHTTPServerPipeline(
position: ChannelPipeline.Position = .last,
withPipeliningAssistance pipelining: Bool = true,
withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil,
withServerUpgrade upgrade: NIOHTTPServerUpgradeSendableConfiguration? = nil,
withErrorHandling errorHandling: Bool = true,
withOutboundHeaderValidation headerValidation: Bool = true
) -> EventLoopFuture<Void> {
Expand Down Expand Up @@ -320,10 +331,11 @@ extension ChannelPipeline {
/// spec compliance. Defaults to `true`.
/// - encoderConfiguration: The configuration for the ``HTTPResponseEncoder``.
/// - Returns: An `EventLoopFuture` that will fire when the pipeline is configured.
@preconcurrency
public func configureHTTPServerPipeline(
position: ChannelPipeline.Position = .last,
withPipeliningAssistance pipelining: Bool = true,
withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil,
withServerUpgrade upgrade: NIOHTTPServerUpgradeSendableConfiguration? = nil,
withErrorHandling errorHandling: Bool = true,
withOutboundHeaderValidation headerValidation: Bool = true,
withEncoderConfiguration encoderConfiguration: HTTPResponseEncoder.Configuration = .init()
Expand All @@ -341,7 +353,7 @@ extension ChannelPipeline {
private func _configureHTTPServerPipeline(
position: ChannelPipeline.Position = .last,
withPipeliningAssistance pipelining: Bool = true,
withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil,
withServerUpgrade upgrade: NIOHTTPServerUpgradeSendableConfiguration? = nil,
withErrorHandling errorHandling: Bool = true,
withOutboundHeaderValidation headerValidation: Bool = true,
withEncoderConfiguration encoderConfiguration: HTTPResponseEncoder.Configuration = .init()
Expand Down
130 changes: 66 additions & 64 deletions Sources/NIOHTTP1/HTTPServerUpgradeHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -180,28 +180,26 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha
// We'll attempt to upgrade. This may take a while, so while we're waiting more data can come in.
self.upgradeState = .awaitingUpgrader

let eventLoop = context.eventLoop
let loopBoundContext = context.loopBound
self.handleUpgrade(context: context, request: request, requestedProtocols: requestedProtocols)
.hop(to: eventLoop) // the user might return a future from another EventLoop.
.whenSuccess { callback in
eventLoop.assertInEventLoop()
if let callback = callback {
self.gotUpgrader(upgrader: callback)
} else {
self.notUpgrading(context: loopBoundContext.value, data: requestPart)
self.notUpgrading(context: context, data: requestPart)
}
}
}

/// The core of the upgrade handling logic.
///
/// - Returns: An `EventLoopFuture` that will contain a callback to invoke if upgrade is requested, or nil if upgrade has failed. Never returns a failed future.
/// - Returns: An isolated `EventLoopFuture` that will contain a callback to invoke if upgrade is requested,
/// or nil if upgrade has failed. Never returns a failed future.
private func handleUpgrade(
context: ChannelHandlerContext,
request: HTTPRequestHead,
requestedProtocols: [String]
) -> EventLoopFuture<(() -> Void)?> {
) -> EventLoopFuture<(() -> Void)?>.Isolated {

let connectionHeader = Set(request.headers[canonicalForm: "connection"].map { $0.lowercased() })
let allHeaderNames = Set(request.headers.map { $0.name.lowercased() })

Expand All @@ -219,18 +217,21 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha
/// Attempt to upgrade a single protocol.
///
/// Will recurse through `protocolIterator` if upgrade fails.
///
/// - Returns: An isolated `EventLoopFuture` that will contain a callback to invoke if upgrade is requested,
/// or nil if upgrade has failed. Never returns a failed future.
private func handleUpgradeForProtocol(
context: ChannelHandlerContext,
protocolIterator: Array<String>.Iterator,
request: HTTPRequestHead,
allHeaderNames: Set<String>,
connectionHeader: Set<String>
) -> EventLoopFuture<(() -> Void)?> {
) -> EventLoopFuture<(() -> Void)?>.Isolated {
// We want a local copy of the protocol iterator. We'll pass it to the next invocation of the function.
var protocolIterator = protocolIterator
guard let proto = protocolIterator.next() else {
// We're done! No suitable protocol for upgrade.
return context.eventLoop.makeSucceededFuture(nil)
return context.eventLoop.makeSucceededIsolatedFuture(nil)
}

guard let upgrader = self.upgraders[proto.lowercased()] else {
Expand All @@ -256,66 +257,67 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha

let responseHeaders = self.buildUpgradeHeaders(protocol: proto)
let pipeline = context.pipeline
let loopBoundContext = context.loopBound

return upgrader.buildUpgradeResponse(
channel: context.channel,
upgradeRequest: request,
initialResponseHeaders: responseHeaders
).map { finalResponseHeaders in
{
// Ok, we're upgrading.
self.upgradeState = .upgrading

// Before we finish the upgrade we have to remove the HTTPDecoder and any other non-Encoder HTTP
// handlers from the pipeline, to prevent them parsing any more data. We'll buffer the data until
// that completes.
// While there are a lot of Futures involved here it's quite possible that all of this code will
// actually complete synchronously: we just want to program for the possibility that it won't.
// Once that's done, we send the upgrade response, then remove the HTTP encoder, then call the
// internal handler, then call the user code, and then finally when the user code is done we do
// our final cleanup steps, namely we replay the received data we buffered in the meantime and
// then remove ourselves from the pipeline.
self.removeExtraHandlers(pipeline: pipeline).flatMap {
self.sendUpgradeResponse(
context: loopBoundContext.value,
upgradeRequest: request,
responseHeaders: finalResponseHeaders
)
}.flatMap {
pipeline.syncOperations.removeHandler(self.httpEncoder)
}.flatMap { () -> EventLoopFuture<Void> in
let context = loopBoundContext.value
self.upgradeCompletionHandler(context)
return upgrader.upgrade(context: context, upgradeRequest: request)
}.whenComplete { result in
let context = loopBoundContext.value
switch result {
case .success:
context.fireUserInboundEventTriggered(
HTTPServerUpgradeEvents.upgradeComplete(toProtocol: proto, upgradeRequest: request)
)
self.upgradeState = .upgradeComplete
// When we remove ourselves we'll be delivering any buffered data.
context.pipeline.syncOperations.removeHandler(context: context, promise: nil)

case .failure(let error):
// Remain in the '.upgrading' state.
context.fireErrorCaught(error)
}
).hop(to: context.eventLoop)
.assumeIsolated()
.map { finalResponseHeaders in
{
// Ok, we're upgrading.
self.upgradeState = .upgrading

// Before we finish the upgrade we have to remove the HTTPDecoder and any other non-Encoder HTTP
// handlers from the pipeline, to prevent them parsing any more data. We'll buffer the data until
// that completes.
// While there are a lot of Futures involved here it's quite possible that all of this code will
// actually complete synchronously: we just want to program for the possibility that it won't.
// Once that's done, we send the upgrade response, then remove the HTTP encoder, then call the
// internal handler, then call the user code, and then finally when the user code is done we do
// our final cleanup steps, namely we replay the received data we buffered in the meantime and
// then remove ourselves from the pipeline.
self.removeExtraHandlers(pipeline: pipeline)
.assumeIsolated()
.flatMap {
self.sendUpgradeResponse(
context: context,
upgradeRequest: request,
responseHeaders: finalResponseHeaders
)
}.flatMap {
pipeline.syncOperations.removeHandler(self.httpEncoder)
}.flatMap { () -> EventLoopFuture<Void> in
self.upgradeCompletionHandler(context)
return upgrader.upgrade(context: context, upgradeRequest: request)
}.whenComplete { result in
switch result {
case .success:
context.fireUserInboundEventTriggered(
HTTPServerUpgradeEvents.upgradeComplete(toProtocol: proto, upgradeRequest: request)
)
self.upgradeState = .upgradeComplete
// When we remove ourselves we'll be delivering any buffered data.
context.pipeline.syncOperations.removeHandler(context: context, promise: nil)

case .failure(let error):
// Remain in the '.upgrading' state.
context.fireErrorCaught(error)
}
}
}
}.flatMapError { error in
// No upgrade here. We want to fire the error down the pipeline, and then try another loop iteration.
context.fireErrorCaught(error)
return self.handleUpgradeForProtocol(
context: context,
protocolIterator: protocolIterator,
request: request,
allHeaderNames: allHeaderNames,
connectionHeader: connectionHeader
)
}
}.flatMapError { error in
// No upgrade here. We want to fire the error down the pipeline, and then try another loop iteration.
let context = loopBoundContext.value
context.fireErrorCaught(error)
return self.handleUpgradeForProtocol(
context: context,
protocolIterator: protocolIterator,
request: request,
allHeaderNames: allHeaderNames,
connectionHeader: connectionHeader
)
}
}

private func gotUpgrader(upgrader: @escaping (() -> Void)) {
Expand Down Expand Up @@ -379,7 +381,7 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha
}

return .andAllSucceed(
self.extraHTTPHandlers.map { pipeline.removeHandler($0) },
self.extraHTTPHandlers.map { pipeline.syncOperations.removeHandler($0) },
on: pipeline.eventLoop
)
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIOHTTP1/HTTPTypedPipelineSetup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import NIOCore

/// Configuration for an upgradable HTTP pipeline.
@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *)
public struct NIOUpgradableHTTPServerPipelineConfiguration<UpgradeResult: Sendable> {
public struct NIOUpgradableHTTPServerPipelineConfiguration<UpgradeResult: Sendable>: Sendable {
/// Whether to provide assistance handling HTTP clients that pipeline
/// their requests. Defaults to `true`. If `false`, users will need to handle clients that pipeline themselves.
public var enablePipelining = true
Expand Down Expand Up @@ -146,7 +146,7 @@ extension ChannelPipeline.SynchronousOperations {

/// Configuration for an upgradable HTTP pipeline.
@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *)
public struct NIOUpgradableHTTPClientPipelineConfiguration<UpgradeResult: Sendable> {
public struct NIOUpgradableHTTPClientPipelineConfiguration<UpgradeResult: Sendable>: Sendable {
/// The strategy to use when dealing with leftover bytes after removing the ``HTTPDecoder`` from the pipeline.
public var leftOverBytesStrategy = RemoveAfterUpgradeStrategy.dropBytes

Expand Down
Loading

0 comments on commit 5f60cee

Please sign in to comment.