fix: flush watch stdout buffering (#43) (thanks @ccaum)

This commit is contained in:
Peter Steinberger 2026-02-16 03:05:26 +01:00
parent 2a0342f4e7
commit a4c0880f06
6 changed files with 212 additions and 2 deletions

View File

@ -4,6 +4,7 @@
- fix: prefer handle sends when chat identifier is a direct handle
- fix: apply history filters before limit (#20, thanks @tommybananas)
- fix: flush watch output immediately when stdout is buffered (#43, thanks @ccaum)
- feat: include `thread_originator_guid` in message output (#39, thanks @ruthmade)
## 0.4.0 - 2026-01-07

View File

@ -1,4 +1,5 @@
import Commander
import Darwin
import Foundation
import IMsgCore
@ -90,6 +91,7 @@ enum WatchCommand {
reactions: reactions
)
try JSONLines.print(payload)
fflush(stdout)
continue
}
let direction = message.isFromMe ? "sent" : "recv"
@ -110,6 +112,7 @@ enum WatchCommand {
)
}
}
fflush(stdout)
}
}
}

View File

@ -16,7 +16,6 @@ enum JSONLines {
let line = try encode(value)
if !line.isEmpty {
Swift.print(line)
fflush(stdout)
}
}
}

View File

@ -4,7 +4,8 @@ import Testing
@testable import IMsgCore
private func normalizeForTest(_ input: String) -> String {
let result = input
let result =
input
.replacingOccurrences(of: "imessage:", with: "")
.replacingOccurrences(of: "sms:", with: "")
.replacingOccurrences(of: "auto:", with: "")

View File

@ -327,3 +327,131 @@ func watchCommandRunsWithJsonOutput() async throws {
streamProvider: streamProvider
)
}
@Test
func watchCommandFlushesPlainOutput() async throws {
let values = ParsedValues(
positional: [],
options: ["db": ["/tmp/unused"], "debounce": ["1ms"]],
flags: []
)
let runtime = RuntimeOptions(parsedValues: values)
let db = try Connection(.inMemory)
let store = try MessageStore(
connection: db,
path: ":memory:",
hasAttributedBody: false,
hasReactionColumns: false
)
let message = Message(
rowID: 1,
chatID: 1,
sender: "+123",
text: "hello",
date: Date(),
isFromMe: false,
service: "iMessage",
handleID: nil,
attachmentsCount: 0
)
let streamProvider:
(
MessageWatcher,
Int64?,
Int64?,
MessageWatcherConfiguration
) -> AsyncThrowingStream<Message, Error> = { _, _, _, _ in
AsyncThrowingStream { continuation in
continuation.yield(message)
continuation.finish()
}
}
let (output, _) = try await StdoutCapture.capture {
try await WatchCommand.run(
values: values,
runtime: runtime,
storeFactory: { _ in store },
streamProvider: streamProvider
)
}
#expect(output.contains("hello"))
}
@Test
func watchCommandFlushesJsonOutput() async throws {
let values = ParsedValues(
positional: [],
options: ["db": ["/tmp/unused"], "debounce": ["1ms"]],
flags: ["jsonOutput"]
)
let runtime = RuntimeOptions(parsedValues: values)
let db = try Connection(.inMemory)
try db.execute(
"""
CREATE TABLE attachment (
ROWID INTEGER PRIMARY KEY,
filename TEXT,
transfer_name TEXT,
uti TEXT,
mime_type TEXT,
total_bytes INTEGER,
is_sticker INTEGER
);
"""
)
try db.execute(
"CREATE TABLE message_attachment_join (message_id INTEGER, attachment_id INTEGER);")
try db.execute(
"""
CREATE TABLE message (
ROWID INTEGER PRIMARY KEY,
handle_id INTEGER,
text TEXT,
date INTEGER,
is_from_me INTEGER,
service TEXT
);
"""
)
let store = try MessageStore(
connection: db,
path: ":memory:",
hasAttributedBody: false,
hasReactionColumns: false
)
let message = Message(
rowID: 1,
chatID: 1,
sender: "+123",
text: "hello",
date: Date(),
isFromMe: false,
service: "iMessage",
handleID: nil,
attachmentsCount: 0
)
let streamProvider:
(
MessageWatcher,
Int64?,
Int64?,
MessageWatcherConfiguration
) -> AsyncThrowingStream<Message, Error> = { _, _, _, _ in
AsyncThrowingStream { continuation in
continuation.yield(message)
continuation.finish()
}
}
let (output, _) = try await StdoutCapture.capture {
try await WatchCommand.run(
values: values,
runtime: runtime,
storeFactory: { _ in store },
streamProvider: streamProvider
)
}
#expect(output.contains("\"text\":\"hello\""))
}

View File

@ -0,0 +1,78 @@
import Darwin
import Foundation
private actor StdoutCaptureLock {
private var isLocked = false
private var waiters: [CheckedContinuation<Void, Never>] = []
func acquire() async {
if !isLocked {
isLocked = true
return
}
await withCheckedContinuation { continuation in
waiters.append(continuation)
}
}
func release() {
if waiters.isEmpty {
isLocked = false
return
}
let next = waiters.removeFirst()
next.resume()
}
}
enum StdoutCapture {
private static let lock = StdoutCaptureLock()
static func capture<T>(_ body: () async throws -> T) async rethrows -> (output: String, value: T)
{
await lock.acquire()
var fds: [Int32] = [0, 0]
guard pipe(&fds) == 0 else {
await lock.release()
fatalError("pipe() failed")
}
let readFD = fds[0]
let writeFD = fds[1]
let savedStdout = dup(STDOUT_FILENO)
guard savedStdout >= 0 else {
close(readFD)
close(writeFD)
await lock.release()
fatalError("dup(STDOUT_FILENO) failed")
}
guard dup2(writeFD, STDOUT_FILENO) >= 0 else {
close(readFD)
close(writeFD)
close(savedStdout)
await lock.release()
fatalError("dup2(writeFD, STDOUT_FILENO) failed")
}
close(writeFD)
do {
let value = try await body()
_ = dup2(savedStdout, STDOUT_FILENO)
close(savedStdout)
let handle = FileHandle(fileDescriptor: readFD, closeOnDealloc: true)
let data = handle.readDataToEndOfFile()
await lock.release()
return (String(data: data, encoding: .utf8) ?? "", value)
} catch {
_ = dup2(savedStdout, STDOUT_FILENO)
close(savedStdout)
close(readFD)
await lock.release()
throw error
}
}
}