Minor performance optimization: process the change-sets outside the snapshotQueue. This decreases contention on the queue between connections.

This commit is contained in:
Robbie Hanson 2016-03-27 16:40:48 -07:00
parent c14327e238
commit 7e4e3a92d3

View File

@ -2112,7 +2112,7 @@ static int connectionBusyHandler(void *ptr, int count)
**/
- (void)preReadTransaction:(YapDatabaseReadTransaction *)transaction
{
// Pre-Read-Transaction: Step 1 of 4
// Pre-Read-Transaction: Step 1 of 5
//
// Execute "BEGIN TRANSACTION" on database connection.
// This is actually a deferred transaction, meaning the sqlite connection won't actually
@ -2123,10 +2123,14 @@ static int connectionBusyHandler(void *ptr, int count)
// hit our in-memory caches. Thus we avoid sqlite machinery when unneeded.
[transaction beginTransaction];
__block uint64_t dbSnapshot = 0;
__block BOOL expectsChangesets = NO;
__block NSArray *changesets = nil;
dispatch_sync(database->snapshotQueue, ^{ @autoreleasepool {
// Pre-Read-Transaction: Step 2 of 4
// Pre-Read-Transaction: Step 2 of 5
//
// Update our connection state within the state table.
//
@ -2163,13 +2167,13 @@ static int connectionBusyHandler(void *ptr, int count)
NSAssert(myState != nil, @"Missing state in database->connectionStates");
// Pre-Read-Transaction: Step 3 of 4
// Pre-Read-Transaction: Step 3 of 5
//
// Update our in-memory data (caches, etc) if needed.
// Compare our snapshot with the database's snapshot.
if (hasActiveWriteTransaction || longLivedReadTransaction || wal_file == NULL || enableMultiProcessSupport)
{
// If there is a write transaction in progress, (in case of multiple processes accessing the database, we can't know for sure so we make this assumption)
// If there is a write transaction in progress,
// then it's not safe to proceed until we acquire a "sql-level" snapshot.
//
// If this is for a longLivedReadTransaction,
@ -2180,6 +2184,9 @@ static int connectionBusyHandler(void *ptr, int count)
// We need the wal_file in order to properly receive notifications of
// when sqlite acquires an "sql-level" snapshot.
//
// In case of multiple processes accessing the database,
// we can't know for sure so we must make this assumption.
//
// During this process we ensure that our "yap-level" snapshot of the in-memory data (caches, etc)
// is in sync with our "sql-level" snapshot of the database.
//
@ -2189,36 +2196,16 @@ static int connectionBusyHandler(void *ptr, int count)
// If the two match then our snapshots are in sync.
// If they don't then we need to get caught up by processing changesets.
uint64_t yapSnapshot = snapshot;
uint64_t sqlSnapshot = [self readSnapshotFromDatabase];
dbSnapshot = [self readSnapshotFromDatabase];
if (yapSnapshot < sqlSnapshot)
if (snapshot < dbSnapshot)
{
// The transaction can see the sqlite commit from another transaction,
// and it hasn't processed the changeset(s) yet. We need to process them now.
// and it hasn't processed the changeset(s) yet.
// We need to fetch them now.
NSArray *changesets = [database pendingAndCommittedChangesetsSince:yapSnapshot until:sqlSnapshot];
if (!changesets) // we could not retrieve changeset due to a change from another process.
{
NSUInteger flags = YapDatabaseConnectionFlushMemoryFlags_Caches |
YapDatabaseConnectionFlushMemoryFlags_Extension_State;
[self _flushMemoryWithFlags:flags];
snapshot = sqlSnapshot;
}
else
{
for (NSDictionary *changeset in changesets)
{
[self noteCommittedChangeset:changeset];
}
}
// The noteCommittedChangeset method (invoked above) updates our 'snapshot' variable.
NSAssert(snapshot == sqlSnapshot,
@"Invalid connection state in preReadTransaction: snapshot(%llu) != sqlSnapshot(%llu): %@",
snapshot, sqlSnapshot, changesets);
expectsChangesets = YES;
changesets = [database pendingAndCommittedChangesetsSince:snapshot until:dbSnapshot];
}
myState->longLivedReadTransaction = (longLivedReadTransaction != nil);
@ -2237,36 +2224,55 @@ static int connectionBusyHandler(void *ptr, int count)
// able to process a changeset from a sibling connection.
// If this is the case then we need to get caught up by processing the changeset(s).
uint64_t localSnapshot = snapshot;
uint64_t globalSnapshot = [database snapshot];
dbSnapshot = [database snapshot];
if (localSnapshot < globalSnapshot)
if (snapshot < dbSnapshot)
{
// The transaction hasn't processed recent changeset(s) yet. We need to process them now.
// The transaction hasn't processed recent changeset(s) yet.
// We need to fetch them now.
NSArray *changesets = [database pendingAndCommittedChangesetsSince:localSnapshot until:globalSnapshot];
// changeset cannot be nil because we are not supporting multiprocess changes
for (NSDictionary *changeset in changesets)
{
[self noteCommittedChangeset:changeset];
}
// The noteCommittedChangeset method (invoked above) updates our 'snapshot' variable.
NSAssert(snapshot == globalSnapshot,
@"Invalid connection state in preReadTransaction: snapshot(%llu) != globalSnapshot(%llu): %@",
snapshot, globalSnapshot, changesets);
expectsChangesets = YES;
changesets = [database pendingAndCommittedChangesetsSince:snapshot until:dbSnapshot];
}
myState->sqlLevelSharedReadLock = NO;
needsMarkSqlLevelSharedReadLock = YES;
}
myState->lastTransactionSnapshot = snapshot;
myState->lastTransactionSnapshot = dbSnapshot;
myState->lastTransactionTime = mach_absolute_time();
}});
// Pre-Read-Transaction: Step 4 of 4
// Pre-Read-Transaction: Setp 4 of 5
//
// Update our in-memory data (caches, etc) if needed.
// Since this can be CPU intensive, we do this outside the snapshotQueue.
if (expectsChangesets)
{
if (!changesets) // we could not retrieve changeset due to a change from another process.
{
NSUInteger flags = YapDatabaseConnectionFlushMemoryFlags_Caches |
YapDatabaseConnectionFlushMemoryFlags_Extension_State;
[self _flushMemoryWithFlags:flags];
snapshot = dbSnapshot;
}
else
{
for (NSDictionary *changeset in changesets)
{
[self noteCommittedChangeset:changeset];
}
// The noteCommittedChangeset method (invoked above) updates our 'snapshot' variable.
NSAssert(snapshot == dbSnapshot,
@"Invalid connection state in preReadTransaction: snapshot(%llu) != dbSnapshot(%llu): %@",
snapshot, dbSnapshot, changesets);
}
}
// Pre-Read-Transaction: Step 5 of 5
//
// Enable sqlite VFS shim listeners for read notifications (if needed).
//
@ -2440,7 +2446,7 @@ static int connectionBusyHandler(void *ptr, int count)
**/
- (void)preReadWriteTransaction:(YapDatabaseReadWriteTransaction *)transaction
{
// Pre-Write-Transaction: Step 1 of 5
// Pre-Write-Transaction: Step 1 of 6
//
// Execute "BEGIN TRANSACTION" on database connection.
// This is actually a deferred transaction, meaning the sqlite connection won't actually
@ -2462,9 +2468,13 @@ static int connectionBusyHandler(void *ptr, int count)
[transaction beginTransaction];
}
__block uint64_t dbSnapshot = 0;
__block BOOL expectsChangesets = NO;
__block NSArray *changesets = nil;
dispatch_sync(database->snapshotQueue, ^{ @autoreleasepool {
// Pre-Write-Transaction: Step 2 of 5
// Pre-Write-Transaction: Step 2 of 6
//
// Update our connection state within the state table.
//
@ -2484,59 +2494,68 @@ static int connectionBusyHandler(void *ptr, int count)
NSAssert(myState != nil, @"Missing state in database->connectionStates");
// Pre-Write-Transaction: Step 3 of 5
// Pre-Write-Transaction: Step 3 of 6
//
// Validate our caches based on snapshot numbers
uint64_t localSnapshot = snapshot;
uint64_t globalSnapshot = 0;
// Compare our snapshot with the database's snapshot.
// In multiprocess mode, the snapshot number might have been externally updated
if (enableMultiProcessSupport)
globalSnapshot = [self readSnapshotFromDatabase];
dbSnapshot = [self readSnapshotFromDatabase];
else
globalSnapshot = [database snapshot];
dbSnapshot = [database snapshot];
if (localSnapshot < globalSnapshot)
if (snapshot < dbSnapshot)
{
NSArray *changesets = [database pendingAndCommittedChangesetsSince:localSnapshot until:globalSnapshot];
// The transaction hasn't processed recent changeset(s) yet.
// We need to fetch them now.
externallyModified = (changesets == nil);
if (!changesets) // we could not retrieve changeset due to a change from another process.
{
NSUInteger flags = YapDatabaseConnectionFlushMemoryFlags_Caches |
YapDatabaseConnectionFlushMemoryFlags_Extension_State;
[self _flushMemoryWithFlags:flags];
snapshot = globalSnapshot;
}
else
{
for (NSDictionary *changeset in changesets)
{
[self noteCommittedChangeset:changeset];
}
// The noteCommittedChangeset method (invoked above) updates our 'snapshot' variable.
NSAssert(snapshot == globalSnapshot,
@"Invalid connection state in preReadWriteTransaction: snapshot(%llu) != globalSnapshot(%llu)",
snapshot, globalSnapshot);
}
}
else
{
externallyModified = NO;
expectsChangesets = YES;
changesets = [database pendingAndCommittedChangesetsSince:snapshot until:dbSnapshot];
}
myState->lastTransactionSnapshot = snapshot;
myState->lastTransactionSnapshot = dbSnapshot;
myState->lastTransactionTime = mach_absolute_time();
needsMarkSqlLevelSharedReadLock = NO;
YDBLogVerbose(@"YapDatabaseConnection(%p) starting read-write transaction.", self);
}});
// Pre-Write-Transaction: Step 4 of 5
// Pre-Write-Transaction: Step 4 of 6
//
// Update our in-memory data (caches, etc) if needed.
// Since this can be CPU intensive, we do this outside the snapshotQueue.
if (expectsChangesets)
{
externallyModified = (changesets == nil);
if (!changesets) // we could not retrieve changeset due to a change from another process.
{
NSUInteger flags = YapDatabaseConnectionFlushMemoryFlags_Caches |
YapDatabaseConnectionFlushMemoryFlags_Extension_State;
[self _flushMemoryWithFlags:flags];
snapshot = dbSnapshot;
}
else
{
for (NSDictionary *changeset in changesets)
{
[self noteCommittedChangeset:changeset];
}
// The noteCommittedChangeset method (invoked above) updates our 'snapshot' variable.
NSAssert(snapshot == dbSnapshot,
@"Invalid connection state in preReadWriteTransaction: snapshot(%llu) != dbSnapshot(%llu)",
snapshot, dbSnapshot);
}
}
else
{
externallyModified = NO;
}
// Pre-Write-Transaction: Step 5 of 6
//
// Setup write state and changeset variables
@ -2562,7 +2581,7 @@ static int connectionBusyHandler(void *ptr, int count)
if (mutationStack == nil)
mutationStack = [[YapMutationStack_Bool alloc] init];
// Pre-Write-Transaction: Step 5 of 5
// Pre-Write-Transaction: Step 6 of 6
//
// Add IsOnConnectionQueueKey flag to writeQueue.
// This allows various methods that depend on the flag to operate correctly.