Fix watch command stdout buffering (#43)
* Fix watch command stdout buffering The watch command was not producing any output because stdout was not being flushed after printing JSON lines. This caused the watch functionality to appear broken even though message detection was working correctly. Added fflush(stdout) call after Swift.print() to ensure immediate output delivery, fixing both CLI watch mode and RPC watch notifications. Fixes: Messages detected but not displayed Testing: Verified with 'imsg watch --chat-id 1 --json' - messages now appear immediately * fix: flush watch stdout buffering (#43) (thanks @ccaum) --------- Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
parent
5b5c8bcc50
commit
ee3c085070
@ -4,6 +4,7 @@
|
||||
|
||||
- fix: prefer handle sends when chat identifier is a direct handle
|
||||
- fix: apply history filters before limit (#20, thanks @tommybananas)
|
||||
- fix: flush watch output immediately when stdout is buffered (#43, thanks @ccaum)
|
||||
- feat: include `thread_originator_guid` in message output (#39, thanks @ruthmade)
|
||||
|
||||
## 0.4.0 - 2026-01-07
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import Commander
|
||||
import Darwin
|
||||
import Foundation
|
||||
import IMsgCore
|
||||
|
||||
@ -90,6 +91,7 @@ enum WatchCommand {
|
||||
reactions: reactions
|
||||
)
|
||||
try JSONLines.print(payload)
|
||||
fflush(stdout)
|
||||
continue
|
||||
}
|
||||
let direction = message.isFromMe ? "sent" : "recv"
|
||||
@ -110,6 +112,7 @@ enum WatchCommand {
|
||||
)
|
||||
}
|
||||
}
|
||||
fflush(stdout)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -4,7 +4,8 @@ import Testing
|
||||
@testable import IMsgCore
|
||||
|
||||
private func normalizeForTest(_ input: String) -> String {
|
||||
let result = input
|
||||
let result =
|
||||
input
|
||||
.replacingOccurrences(of: "imessage:", with: "")
|
||||
.replacingOccurrences(of: "sms:", with: "")
|
||||
.replacingOccurrences(of: "auto:", with: "")
|
||||
|
||||
@ -327,3 +327,131 @@ func watchCommandRunsWithJsonOutput() async throws {
|
||||
streamProvider: streamProvider
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
func watchCommandFlushesPlainOutput() async throws {
|
||||
let values = ParsedValues(
|
||||
positional: [],
|
||||
options: ["db": ["/tmp/unused"], "debounce": ["1ms"]],
|
||||
flags: []
|
||||
)
|
||||
let runtime = RuntimeOptions(parsedValues: values)
|
||||
let db = try Connection(.inMemory)
|
||||
let store = try MessageStore(
|
||||
connection: db,
|
||||
path: ":memory:",
|
||||
hasAttributedBody: false,
|
||||
hasReactionColumns: false
|
||||
)
|
||||
let message = Message(
|
||||
rowID: 1,
|
||||
chatID: 1,
|
||||
sender: "+123",
|
||||
text: "hello",
|
||||
date: Date(),
|
||||
isFromMe: false,
|
||||
service: "iMessage",
|
||||
handleID: nil,
|
||||
attachmentsCount: 0
|
||||
)
|
||||
let streamProvider:
|
||||
(
|
||||
MessageWatcher,
|
||||
Int64?,
|
||||
Int64?,
|
||||
MessageWatcherConfiguration
|
||||
) -> AsyncThrowingStream<Message, Error> = { _, _, _, _ in
|
||||
AsyncThrowingStream { continuation in
|
||||
continuation.yield(message)
|
||||
continuation.finish()
|
||||
}
|
||||
}
|
||||
|
||||
let (output, _) = try await StdoutCapture.capture {
|
||||
try await WatchCommand.run(
|
||||
values: values,
|
||||
runtime: runtime,
|
||||
storeFactory: { _ in store },
|
||||
streamProvider: streamProvider
|
||||
)
|
||||
}
|
||||
#expect(output.contains("hello"))
|
||||
}
|
||||
|
||||
@Test
|
||||
func watchCommandFlushesJsonOutput() async throws {
|
||||
let values = ParsedValues(
|
||||
positional: [],
|
||||
options: ["db": ["/tmp/unused"], "debounce": ["1ms"]],
|
||||
flags: ["jsonOutput"]
|
||||
)
|
||||
let runtime = RuntimeOptions(parsedValues: values)
|
||||
let db = try Connection(.inMemory)
|
||||
try db.execute(
|
||||
"""
|
||||
CREATE TABLE attachment (
|
||||
ROWID INTEGER PRIMARY KEY,
|
||||
filename TEXT,
|
||||
transfer_name TEXT,
|
||||
uti TEXT,
|
||||
mime_type TEXT,
|
||||
total_bytes INTEGER,
|
||||
is_sticker INTEGER
|
||||
);
|
||||
"""
|
||||
)
|
||||
try db.execute(
|
||||
"CREATE TABLE message_attachment_join (message_id INTEGER, attachment_id INTEGER);")
|
||||
try db.execute(
|
||||
"""
|
||||
CREATE TABLE message (
|
||||
ROWID INTEGER PRIMARY KEY,
|
||||
handle_id INTEGER,
|
||||
text TEXT,
|
||||
date INTEGER,
|
||||
is_from_me INTEGER,
|
||||
service TEXT
|
||||
);
|
||||
"""
|
||||
)
|
||||
|
||||
let store = try MessageStore(
|
||||
connection: db,
|
||||
path: ":memory:",
|
||||
hasAttributedBody: false,
|
||||
hasReactionColumns: false
|
||||
)
|
||||
let message = Message(
|
||||
rowID: 1,
|
||||
chatID: 1,
|
||||
sender: "+123",
|
||||
text: "hello",
|
||||
date: Date(),
|
||||
isFromMe: false,
|
||||
service: "iMessage",
|
||||
handleID: nil,
|
||||
attachmentsCount: 0
|
||||
)
|
||||
let streamProvider:
|
||||
(
|
||||
MessageWatcher,
|
||||
Int64?,
|
||||
Int64?,
|
||||
MessageWatcherConfiguration
|
||||
) -> AsyncThrowingStream<Message, Error> = { _, _, _, _ in
|
||||
AsyncThrowingStream { continuation in
|
||||
continuation.yield(message)
|
||||
continuation.finish()
|
||||
}
|
||||
}
|
||||
|
||||
let (output, _) = try await StdoutCapture.capture {
|
||||
try await WatchCommand.run(
|
||||
values: values,
|
||||
runtime: runtime,
|
||||
storeFactory: { _ in store },
|
||||
streamProvider: streamProvider
|
||||
)
|
||||
}
|
||||
#expect(output.contains("\"text\":\"hello\""))
|
||||
}
|
||||
|
||||
78
Tests/imsgTests/StdoutCapture.swift
Normal file
78
Tests/imsgTests/StdoutCapture.swift
Normal file
@ -0,0 +1,78 @@
|
||||
import Darwin
|
||||
import Foundation
|
||||
|
||||
private actor StdoutCaptureLock {
|
||||
private var isLocked = false
|
||||
private var waiters: [CheckedContinuation<Void, Never>] = []
|
||||
|
||||
func acquire() async {
|
||||
if !isLocked {
|
||||
isLocked = true
|
||||
return
|
||||
}
|
||||
await withCheckedContinuation { continuation in
|
||||
waiters.append(continuation)
|
||||
}
|
||||
}
|
||||
|
||||
func release() {
|
||||
if waiters.isEmpty {
|
||||
isLocked = false
|
||||
return
|
||||
}
|
||||
let next = waiters.removeFirst()
|
||||
next.resume()
|
||||
}
|
||||
}
|
||||
|
||||
enum StdoutCapture {
|
||||
private static let lock = StdoutCaptureLock()
|
||||
|
||||
static func capture<T>(_ body: () async throws -> T) async rethrows -> (output: String, value: T)
|
||||
{
|
||||
await lock.acquire()
|
||||
|
||||
var fds: [Int32] = [0, 0]
|
||||
guard pipe(&fds) == 0 else {
|
||||
await lock.release()
|
||||
fatalError("pipe() failed")
|
||||
}
|
||||
let readFD = fds[0]
|
||||
let writeFD = fds[1]
|
||||
|
||||
let savedStdout = dup(STDOUT_FILENO)
|
||||
guard savedStdout >= 0 else {
|
||||
close(readFD)
|
||||
close(writeFD)
|
||||
await lock.release()
|
||||
fatalError("dup(STDOUT_FILENO) failed")
|
||||
}
|
||||
|
||||
guard dup2(writeFD, STDOUT_FILENO) >= 0 else {
|
||||
close(readFD)
|
||||
close(writeFD)
|
||||
close(savedStdout)
|
||||
await lock.release()
|
||||
fatalError("dup2(writeFD, STDOUT_FILENO) failed")
|
||||
}
|
||||
close(writeFD)
|
||||
|
||||
do {
|
||||
let value = try await body()
|
||||
|
||||
_ = dup2(savedStdout, STDOUT_FILENO)
|
||||
close(savedStdout)
|
||||
|
||||
let handle = FileHandle(fileDescriptor: readFD, closeOnDealloc: true)
|
||||
let data = handle.readDataToEndOfFile()
|
||||
await lock.release()
|
||||
return (String(data: data, encoding: .utf8) ?? "", value)
|
||||
} catch {
|
||||
_ = dup2(savedStdout, STDOUT_FILENO)
|
||||
close(savedStdout)
|
||||
close(readFD)
|
||||
await lock.release()
|
||||
throw error
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user