Minor performance optimization: process the change-sets outside the snapshotQueue. This decreases contention on the queue between connections.
This commit is contained in:
parent
c14327e238
commit
7e4e3a92d3
@ -2112,7 +2112,7 @@ static int connectionBusyHandler(void *ptr, int count)
|
||||
**/
|
||||
- (void)preReadTransaction:(YapDatabaseReadTransaction *)transaction
|
||||
{
|
||||
// Pre-Read-Transaction: Step 1 of 4
|
||||
// Pre-Read-Transaction: Step 1 of 5
|
||||
//
|
||||
// Execute "BEGIN TRANSACTION" on database connection.
|
||||
// This is actually a deferred transaction, meaning the sqlite connection won't actually
|
||||
@ -2123,10 +2123,14 @@ static int connectionBusyHandler(void *ptr, int count)
|
||||
// hit our in-memory caches. Thus we avoid sqlite machinery when unneeded.
|
||||
|
||||
[transaction beginTransaction];
|
||||
|
||||
|
||||
__block uint64_t dbSnapshot = 0;
|
||||
__block BOOL expectsChangesets = NO;
|
||||
__block NSArray *changesets = nil;
|
||||
|
||||
dispatch_sync(database->snapshotQueue, ^{ @autoreleasepool {
|
||||
|
||||
// Pre-Read-Transaction: Step 2 of 4
|
||||
// Pre-Read-Transaction: Step 2 of 5
|
||||
//
|
||||
// Update our connection state within the state table.
|
||||
//
|
||||
@ -2163,13 +2167,13 @@ static int connectionBusyHandler(void *ptr, int count)
|
||||
|
||||
NSAssert(myState != nil, @"Missing state in database->connectionStates");
|
||||
|
||||
// Pre-Read-Transaction: Step 3 of 4
|
||||
// Pre-Read-Transaction: Step 3 of 5
|
||||
//
|
||||
// Update our in-memory data (caches, etc) if needed.
|
||||
// Compare our snapshot with the database's snapshot.
|
||||
|
||||
if (hasActiveWriteTransaction || longLivedReadTransaction || wal_file == NULL || enableMultiProcessSupport)
|
||||
{
|
||||
// If there is a write transaction in progress, (in case of multiple processes accessing the database, we can't know for sure so we make this assumption)
|
||||
// If there is a write transaction in progress,
|
||||
// then it's not safe to proceed until we acquire a "sql-level" snapshot.
|
||||
//
|
||||
// If this is for a longLivedReadTransaction,
|
||||
@ -2180,6 +2184,9 @@ static int connectionBusyHandler(void *ptr, int count)
|
||||
// We need the wal_file in order to properly receive notifications of
|
||||
// when sqlite acquires an "sql-level" snapshot.
|
||||
//
|
||||
// In case of multiple processes accessing the database,
|
||||
// we can't know for sure so we must make this assumption.
|
||||
//
|
||||
// During this process we ensure that our "yap-level" snapshot of the in-memory data (caches, etc)
|
||||
// is in sync with our "sql-level" snapshot of the database.
|
||||
//
|
||||
@ -2189,36 +2196,16 @@ static int connectionBusyHandler(void *ptr, int count)
|
||||
// If the two match then our snapshots are in sync.
|
||||
// If they don't then we need to get caught up by processing changesets.
|
||||
|
||||
uint64_t yapSnapshot = snapshot;
|
||||
uint64_t sqlSnapshot = [self readSnapshotFromDatabase];
|
||||
dbSnapshot = [self readSnapshotFromDatabase];
|
||||
|
||||
if (yapSnapshot < sqlSnapshot)
|
||||
if (snapshot < dbSnapshot)
|
||||
{
|
||||
// The transaction can see the sqlite commit from another transaction,
|
||||
// and it hasn't processed the changeset(s) yet. We need to process them now.
|
||||
// and it hasn't processed the changeset(s) yet.
|
||||
// We need to fetch them now.
|
||||
|
||||
NSArray *changesets = [database pendingAndCommittedChangesetsSince:yapSnapshot until:sqlSnapshot];
|
||||
|
||||
if (!changesets) // we could not retrieve changeset due to a change from another process.
|
||||
{
|
||||
NSUInteger flags = YapDatabaseConnectionFlushMemoryFlags_Caches |
|
||||
YapDatabaseConnectionFlushMemoryFlags_Extension_State;
|
||||
|
||||
[self _flushMemoryWithFlags:flags];
|
||||
snapshot = sqlSnapshot;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (NSDictionary *changeset in changesets)
|
||||
{
|
||||
[self noteCommittedChangeset:changeset];
|
||||
}
|
||||
}
|
||||
|
||||
// The noteCommittedChangeset method (invoked above) updates our 'snapshot' variable.
|
||||
NSAssert(snapshot == sqlSnapshot,
|
||||
@"Invalid connection state in preReadTransaction: snapshot(%llu) != sqlSnapshot(%llu): %@",
|
||||
snapshot, sqlSnapshot, changesets);
|
||||
expectsChangesets = YES;
|
||||
changesets = [database pendingAndCommittedChangesetsSince:snapshot until:dbSnapshot];
|
||||
}
|
||||
|
||||
myState->longLivedReadTransaction = (longLivedReadTransaction != nil);
|
||||
@ -2237,36 +2224,55 @@ static int connectionBusyHandler(void *ptr, int count)
|
||||
// able to process a changeset from a sibling connection.
|
||||
// If this is the case then we need to get caught up by processing the changeset(s).
|
||||
|
||||
uint64_t localSnapshot = snapshot;
|
||||
uint64_t globalSnapshot = [database snapshot];
|
||||
dbSnapshot = [database snapshot];
|
||||
|
||||
if (localSnapshot < globalSnapshot)
|
||||
if (snapshot < dbSnapshot)
|
||||
{
|
||||
// The transaction hasn't processed recent changeset(s) yet. We need to process them now.
|
||||
// The transaction hasn't processed recent changeset(s) yet.
|
||||
// We need to fetch them now.
|
||||
|
||||
NSArray *changesets = [database pendingAndCommittedChangesetsSince:localSnapshot until:globalSnapshot];
|
||||
|
||||
// changeset cannot be nil because we are not supporting multiprocess changes
|
||||
for (NSDictionary *changeset in changesets)
|
||||
{
|
||||
[self noteCommittedChangeset:changeset];
|
||||
}
|
||||
|
||||
// The noteCommittedChangeset method (invoked above) updates our 'snapshot' variable.
|
||||
NSAssert(snapshot == globalSnapshot,
|
||||
@"Invalid connection state in preReadTransaction: snapshot(%llu) != globalSnapshot(%llu): %@",
|
||||
snapshot, globalSnapshot, changesets);
|
||||
expectsChangesets = YES;
|
||||
changesets = [database pendingAndCommittedChangesetsSince:snapshot until:dbSnapshot];
|
||||
}
|
||||
|
||||
myState->sqlLevelSharedReadLock = NO;
|
||||
needsMarkSqlLevelSharedReadLock = YES;
|
||||
}
|
||||
|
||||
myState->lastTransactionSnapshot = snapshot;
|
||||
myState->lastTransactionSnapshot = dbSnapshot;
|
||||
myState->lastTransactionTime = mach_absolute_time();
|
||||
}});
|
||||
|
||||
// Pre-Read-Transaction: Step 4 of 4
|
||||
// Pre-Read-Transaction: Setp 4 of 5
|
||||
//
|
||||
// Update our in-memory data (caches, etc) if needed.
|
||||
// Since this can be CPU intensive, we do this outside the snapshotQueue.
|
||||
|
||||
if (expectsChangesets)
|
||||
{
|
||||
if (!changesets) // we could not retrieve changeset due to a change from another process.
|
||||
{
|
||||
NSUInteger flags = YapDatabaseConnectionFlushMemoryFlags_Caches |
|
||||
YapDatabaseConnectionFlushMemoryFlags_Extension_State;
|
||||
|
||||
[self _flushMemoryWithFlags:flags];
|
||||
snapshot = dbSnapshot;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (NSDictionary *changeset in changesets)
|
||||
{
|
||||
[self noteCommittedChangeset:changeset];
|
||||
}
|
||||
|
||||
// The noteCommittedChangeset method (invoked above) updates our 'snapshot' variable.
|
||||
NSAssert(snapshot == dbSnapshot,
|
||||
@"Invalid connection state in preReadTransaction: snapshot(%llu) != dbSnapshot(%llu): %@",
|
||||
snapshot, dbSnapshot, changesets);
|
||||
}
|
||||
}
|
||||
|
||||
// Pre-Read-Transaction: Step 5 of 5
|
||||
//
|
||||
// Enable sqlite VFS shim listeners for read notifications (if needed).
|
||||
//
|
||||
@ -2440,7 +2446,7 @@ static int connectionBusyHandler(void *ptr, int count)
|
||||
**/
|
||||
- (void)preReadWriteTransaction:(YapDatabaseReadWriteTransaction *)transaction
|
||||
{
|
||||
// Pre-Write-Transaction: Step 1 of 5
|
||||
// Pre-Write-Transaction: Step 1 of 6
|
||||
//
|
||||
// Execute "BEGIN TRANSACTION" on database connection.
|
||||
// This is actually a deferred transaction, meaning the sqlite connection won't actually
|
||||
@ -2462,9 +2468,13 @@ static int connectionBusyHandler(void *ptr, int count)
|
||||
[transaction beginTransaction];
|
||||
}
|
||||
|
||||
__block uint64_t dbSnapshot = 0;
|
||||
__block BOOL expectsChangesets = NO;
|
||||
__block NSArray *changesets = nil;
|
||||
|
||||
dispatch_sync(database->snapshotQueue, ^{ @autoreleasepool {
|
||||
|
||||
// Pre-Write-Transaction: Step 2 of 5
|
||||
// Pre-Write-Transaction: Step 2 of 6
|
||||
//
|
||||
// Update our connection state within the state table.
|
||||
//
|
||||
@ -2484,59 +2494,68 @@ static int connectionBusyHandler(void *ptr, int count)
|
||||
|
||||
NSAssert(myState != nil, @"Missing state in database->connectionStates");
|
||||
|
||||
// Pre-Write-Transaction: Step 3 of 5
|
||||
// Pre-Write-Transaction: Step 3 of 6
|
||||
//
|
||||
// Validate our caches based on snapshot numbers
|
||||
|
||||
uint64_t localSnapshot = snapshot;
|
||||
uint64_t globalSnapshot = 0;
|
||||
// Compare our snapshot with the database's snapshot.
|
||||
|
||||
// In multiprocess mode, the snapshot number might have been externally updated
|
||||
if (enableMultiProcessSupport)
|
||||
globalSnapshot = [self readSnapshotFromDatabase];
|
||||
dbSnapshot = [self readSnapshotFromDatabase];
|
||||
else
|
||||
globalSnapshot = [database snapshot];
|
||||
dbSnapshot = [database snapshot];
|
||||
|
||||
if (localSnapshot < globalSnapshot)
|
||||
if (snapshot < dbSnapshot)
|
||||
{
|
||||
NSArray *changesets = [database pendingAndCommittedChangesetsSince:localSnapshot until:globalSnapshot];
|
||||
// The transaction hasn't processed recent changeset(s) yet.
|
||||
// We need to fetch them now.
|
||||
|
||||
externallyModified = (changesets == nil);
|
||||
|
||||
if (!changesets) // we could not retrieve changeset due to a change from another process.
|
||||
{
|
||||
NSUInteger flags = YapDatabaseConnectionFlushMemoryFlags_Caches |
|
||||
YapDatabaseConnectionFlushMemoryFlags_Extension_State;
|
||||
|
||||
[self _flushMemoryWithFlags:flags];
|
||||
snapshot = globalSnapshot;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (NSDictionary *changeset in changesets)
|
||||
{
|
||||
[self noteCommittedChangeset:changeset];
|
||||
}
|
||||
|
||||
// The noteCommittedChangeset method (invoked above) updates our 'snapshot' variable.
|
||||
NSAssert(snapshot == globalSnapshot,
|
||||
@"Invalid connection state in preReadWriteTransaction: snapshot(%llu) != globalSnapshot(%llu)",
|
||||
snapshot, globalSnapshot);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
externallyModified = NO;
|
||||
expectsChangesets = YES;
|
||||
changesets = [database pendingAndCommittedChangesetsSince:snapshot until:dbSnapshot];
|
||||
}
|
||||
|
||||
myState->lastTransactionSnapshot = snapshot;
|
||||
myState->lastTransactionSnapshot = dbSnapshot;
|
||||
myState->lastTransactionTime = mach_absolute_time();
|
||||
needsMarkSqlLevelSharedReadLock = NO;
|
||||
|
||||
YDBLogVerbose(@"YapDatabaseConnection(%p) starting read-write transaction.", self);
|
||||
}});
|
||||
|
||||
// Pre-Write-Transaction: Step 4 of 5
|
||||
// Pre-Write-Transaction: Step 4 of 6
|
||||
//
|
||||
// Update our in-memory data (caches, etc) if needed.
|
||||
// Since this can be CPU intensive, we do this outside the snapshotQueue.
|
||||
|
||||
if (expectsChangesets)
|
||||
{
|
||||
externallyModified = (changesets == nil);
|
||||
|
||||
if (!changesets) // we could not retrieve changeset due to a change from another process.
|
||||
{
|
||||
NSUInteger flags = YapDatabaseConnectionFlushMemoryFlags_Caches |
|
||||
YapDatabaseConnectionFlushMemoryFlags_Extension_State;
|
||||
|
||||
[self _flushMemoryWithFlags:flags];
|
||||
snapshot = dbSnapshot;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (NSDictionary *changeset in changesets)
|
||||
{
|
||||
[self noteCommittedChangeset:changeset];
|
||||
}
|
||||
|
||||
// The noteCommittedChangeset method (invoked above) updates our 'snapshot' variable.
|
||||
NSAssert(snapshot == dbSnapshot,
|
||||
@"Invalid connection state in preReadWriteTransaction: snapshot(%llu) != dbSnapshot(%llu)",
|
||||
snapshot, dbSnapshot);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
externallyModified = NO;
|
||||
}
|
||||
|
||||
// Pre-Write-Transaction: Step 5 of 6
|
||||
//
|
||||
// Setup write state and changeset variables
|
||||
|
||||
@ -2562,7 +2581,7 @@ static int connectionBusyHandler(void *ptr, int count)
|
||||
if (mutationStack == nil)
|
||||
mutationStack = [[YapMutationStack_Bool alloc] init];
|
||||
|
||||
// Pre-Write-Transaction: Step 5 of 5
|
||||
// Pre-Write-Transaction: Step 6 of 6
|
||||
//
|
||||
// Add IsOnConnectionQueueKey flag to writeQueue.
|
||||
// This allows various methods that depend on the flag to operate correctly.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user