[#6689] updated to automatically routes raw write SQL statements to the nonconcurrent db pool

This commit is contained in:
Gani Georgiev 2025-05-02 10:27:41 +03:00
parent 3ef752c232
commit 836fc77ddc
27 changed files with 4601 additions and 3917 deletions

View File

@ -2,6 +2,9 @@
- Write the default response body of `*Request` hooks that are wrapped in a transaction after the related transaction completes to allow propagating errors ([#6462](https://github.com/pocketbase/pocketbase/discussions/6462#discussioncomment-12207818)).
- Updated `app.DB()` to automatically routes raw write SQL statements to the nonconcurrent db pool ([#6689](https://github.com/pocketbase/pocketbase/discussions/6689)).
_For the rare cases when it is needed users still have the option to explicitly target the specific pool they want using `app.ConcurrentDB()`/`app.NonconcurrentDB()`._
## v0.27.2

View File

@ -758,7 +758,7 @@ func realtimeCanAccessRecord(
var exists int
q := app.DB().Select("(1)").
q := app.ConcurrentDB().Select("(1)").
From(record.Collection().Name).
AndWhere(dbx.HashExp{record.Collection().Name + ".id": record.Id})

View File

@ -300,7 +300,7 @@ func recordCreate(responseWriteAfterTx bool, optFinalizer func(data any) error)
// check non-empty create rule
if *dummyCollection.CreateRule != "" {
ruleQuery := e.App.DB().Select("(1)").PreFragment(withFrom).From(dummyCollection.Name).AndBind(dummyParams)
ruleQuery := e.App.ConcurrentDB().Select("(1)").PreFragment(withFrom).From(dummyCollection.Name).AndBind(dummyParams)
resolver := core.NewRecordFieldResolver(e.App, &dummyCollection, requestInfo, true)
@ -320,7 +320,7 @@ func recordCreate(responseWriteAfterTx bool, optFinalizer func(data any) error)
}
// check for manage rule access
manageRuleQuery := e.App.DB().Select("(1)").PreFragment(withFrom).From(dummyCollection.Name).AndBind(dummyParams)
manageRuleQuery := e.App.ConcurrentDB().Select("(1)").PreFragment(withFrom).From(dummyCollection.Name).AndBind(dummyParams)
if !form.HasManageAccess() &&
hasAuthManageAccess(e.App, requestInfo, &dummyCollection, manageRuleQuery) {
form.GrantManagerAccess()
@ -452,7 +452,7 @@ func recordUpdate(responseWriteAfterTx bool, optFinalizer func(data any) error)
}
form.Load(data)
manageRuleQuery := e.App.DB().Select("(1)").From(collection.Name).AndWhere(dbx.HashExp{
manageRuleQuery := e.App.ConcurrentDB().Select("(1)").From(collection.Name).AndWhere(dbx.HashExp{
collection.Name + ".id": record.Id,
})
if !form.HasManageAccess() &&

View File

@ -457,8 +457,8 @@ func autoResolveRecordsFlags(app core.App, records []*core.Record, requestInfo *
managedIds := []string{}
query := app.RecordQuery(collection).
Select(app.DB().QuoteSimpleColumnName(collection.Name) + ".id").
AndWhere(dbx.In(app.DB().QuoteSimpleColumnName(collection.Name)+".id", recordIds...))
Select(app.ConcurrentDB().QuoteSimpleColumnName(collection.Name) + ".id").
AndWhere(dbx.In(app.ConcurrentDB().QuoteSimpleColumnName(collection.Name)+".id", recordIds...))
resolver := core.NewRecordFieldResolver(app, collection, requestInfo, true)
expr, err := search.FilterData(*collection.ManageRule).BuildExpr(resolver)

View File

@ -146,46 +146,82 @@ type App interface {
// DB methods
// ---------------------------------------------------------------
// DB returns the default app data db instance (pb_data/data.db).
// DB returns the default app data.db builder instance.
//
// To minimize SQLITE_BUSY errors, it automatically routes the
// SELECT queries to the underlying concurrent db pool and everything else
// to the nonconcurrent one.
//
// For more finer control over the used connections pools you can
// call directly ConcurrentDB() or NonconcurrentDB().
DB() dbx.Builder
// NonconcurrentDB returns the nonconcurrent app data db instance (pb_data/data.db).
// ConcurrentDB returns the concurrent app data.db builder instance.
//
// This method is used mainly internally for executing db read
// operations in a concurrent/non-blocking manner.
//
// Most users should use simply DB() as it will automatically
// route the query execution to ConcurrentDB() or NonconcurrentDB().
//
// In a transaction the ConcurrentDB() and NonconcurrentDB() refer to the same *dbx.TX instance.
ConcurrentDB() dbx.Builder
// NonconcurrentDB returns the nonconcurrent app data.db builder instance.
//
// The returned db instance is limited only to a single open connection,
// meaning that it can process only 1 db operation at a time (other operations will be queued up).
// meaning that it can process only 1 db operation at a time (other queries queue up).
//
// This method is used mainly internally and in the tests to execute write
// (save/delete) db operations as it helps with minimizing the SQLITE_BUSY errors.
//
// For the majority of cases you would want to use the regular DB() method
// since it allows concurrent db read operations.
// Most users should use simply DB() as it will automatically
// route the query execution to ConcurrentDB() or NonconcurrentDB().
//
// In a transaction the ConcurrentDB() and NonconcurrentDB() refer to the same *dbx.TX instance.
NonconcurrentDB() dbx.Builder
// AuxDB returns the default app auxiliary db instance (pb_data/auxiliary.db).
// AuxDB returns the app auxiliary.db builder instance.
//
// To minimize SQLITE_BUSY errors, it automatically routes the
// SELECT queries to the underlying concurrent db pool and everything else
// to the nonconcurrent one.
//
// For more finer control over the used connections pools you can
// call directly AuxConcurrentDB() or AuxNonconcurrentDB().
AuxDB() dbx.Builder
// AuxNonconcurrentDB returns the nonconcurrent app auxiliary db instance (pb_data/auxiliary.db)..
// AuxConcurrentDB returns the concurrent app auxiliary.db builder instance.
//
// This method is used mainly internally for executing db read
// operations in a concurrent/non-blocking manner.
//
// Most users should use simply AuxDB() as it will automatically
// route the query execution to AuxConcurrentDB() or AuxNonconcurrentDB().
//
// In a transaction the AuxConcurrentDB() and AuxNonconcurrentDB() refer to the same *dbx.TX instance.
AuxConcurrentDB() dbx.Builder
// AuxNonconcurrentDB returns the nonconcurrent app auxiliary.db builder instance.
//
// The returned db instance is limited only to a single open connection,
// meaning that it can process only 1 db operation at a time (other operations will be queued up).
// meaning that it can process only 1 db operation at a time (other queries queue up).
//
// This method is used mainly internally and in the tests to execute write
// (save/delete) db operations as it helps with minimizing the SQLITE_BUSY errors.
//
// For the majority of cases you would want to use the regular DB() method
// since it allows concurrent db read operations.
// Most users should use simply AuxDB() as it will automatically
// route the query execution to AuxConcurrentDB() or AuxNonconcurrentDB().
//
// In a transaction the AuxNonconcurrentDB() and AuxNonconcurrentDB() refer to the same *dbx.TX instance.
// In a transaction the AuxConcurrentDB() and AuxNonconcurrentDB() refer to the same *dbx.TX instance.
AuxNonconcurrentDB() dbx.Builder
// HasTable checks if a table (or view) with the provided name exists (case insensitive).
// in the current app.DB() instance.
// in the data.db.
HasTable(tableName string) bool
// AuxHasTable checks if a table (or view) with the provided name exists (case insensitive)
// in the current app.AuxDB() instance.
// in the auxiliary.db.
AuxHasTable(tableName string) bool
// TableColumns returns all column names of a single table by its name.
@ -231,21 +267,19 @@ type App interface {
// FindRecordByViewFile returns the original Record of the provided view collection file.
FindRecordByViewFile(viewCollectionModelOrIdentifier any, fileFieldName string, filename string) (*Record, error)
// Vacuum executes VACUUM on the current app.DB() instance
// in order to reclaim unused data db disk space.
// Vacuum executes VACUUM on the data.db in order to reclaim unused data db disk space.
Vacuum() error
// AuxVacuum executes VACUUM on the current app.AuxDB() instance
// in order to reclaim unused auxiliary db disk space.
// AuxVacuum executes VACUUM on the auxiliary.db in order to reclaim unused auxiliary db disk space.
AuxVacuum() error
// ---------------------------------------------------------------
// ModelQuery creates a new preconfigured select app.DB() query with preset
// ModelQuery creates a new preconfigured select data.db query with preset
// SELECT, FROM and other common fields based on the provided model.
ModelQuery(model Model) *dbx.SelectQuery
// AuxModelQuery creates a new preconfigured select app.AuxDB() query with preset
// AuxModelQuery creates a new preconfigured select auxiliary.db query with preset
// SELECT, FROM and other common fields based on the provided model.
AuxModelQuery(model Model) *dbx.SelectQuery

View File

@ -476,44 +476,100 @@ func (app *BaseApp) ResetBootstrapState() error {
return nil
}
// DB returns the default app data db instance (pb_data/data.db).
// DB returns the default app data.db builder instance.
//
// To minimize SQLITE_BUSY errors, it automatically routes the
// SELECT queries to the underlying concurrent db pool and everything
// else to the nonconcurrent one.
//
// For more finer control over the used connections pools you can
// call directly ConcurrentDB() or NonconcurrentDB().
func (app *BaseApp) DB() dbx.Builder {
// transactional or both are nil
if app.concurrentDB == app.nonconcurrentDB {
return app.concurrentDB
}
// NonconcurrentDB returns the nonconcurrent app data db instance (pb_data/data.db).
return &dualDBBuilder{
concurrentDB: app.concurrentDB,
nonconcurrentDB: app.nonconcurrentDB,
}
}
// ConcurrentDB returns the concurrent app data.db builder instance.
//
// This method is used mainly internally for executing db read
// operations in a concurrent/non-blocking manner.
//
// Most users should use simply DB() as it will automatically
// route the query execution to ConcurrentDB() or NonconcurrentDB().
//
// In a transaction the ConcurrentDB() and NonconcurrentDB() refer to the same *dbx.TX instance.
func (app *BaseApp) ConcurrentDB() dbx.Builder {
return app.concurrentDB
}
// NonconcurrentDB returns the nonconcurrent app data.db builder instance.
//
// The returned db instance is limited only to a single open connection,
// meaning that it can process only 1 db operation at a time (other operations will be queued up).
// meaning that it can process only 1 db operation at a time (other queries queue up).
//
// This method is used mainly internally and in the tests to execute write
// (save/delete) db operations as it helps with minimizing the SQLITE_BUSY errors.
//
// For the majority of cases you would want to use the regular DB() method
// since it allows concurrent db read operations.
// Most users should use simply DB() as it will automatically
// route the query execution to ConcurrentDB() or NonconcurrentDB().
//
// In a transaction the ConcurrentDB() and NonconcurrentDB() refer to the same *dbx.TX instance.
func (app *BaseApp) NonconcurrentDB() dbx.Builder {
return app.nonconcurrentDB
}
// AuxDB returns the default app auxiliary db instance (pb_data/auxiliary.db).
// AuxDB returns the app auxiliary.db builder instance.
//
// To minimize SQLITE_BUSY errors, it automatically routes the
// SELECT queries to the underlying concurrent db pool and everything
// else to the nonconcurrent one.
//
// For more finer control over the used connections pools you can
// call directly AuxConcurrentDB() or AuxNonconcurrentDB().
func (app *BaseApp) AuxDB() dbx.Builder {
// transactional or both are nil
if app.auxConcurrentDB == app.auxNonconcurrentDB {
return app.auxConcurrentDB
}
// AuxNonconcurrentDB returns the nonconcurrent app auxiliary db instance (pb_data/auxiliary.db).
return &dualDBBuilder{
concurrentDB: app.auxConcurrentDB,
nonconcurrentDB: app.auxNonconcurrentDB,
}
}
// AuxConcurrentDB returns the concurrent app auxiliary.db builder instance.
//
// This method is used mainly internally for executing db read
// operations in a concurrent/non-blocking manner.
//
// Most users should use simply AuxDB() as it will automatically
// route the query execution to AuxConcurrentDB() or AuxNonconcurrentDB().
//
// In a transaction the AuxConcurrentDB() and AuxNonconcurrentDB() refer to the same *dbx.TX instance.
func (app *BaseApp) AuxConcurrentDB() dbx.Builder {
return app.auxConcurrentDB
}
// AuxNonconcurrentDB returns the nonconcurrent app auxiliary.db builder instance.
//
// The returned db instance is limited only to a single open connection,
// meaning that it can process only 1 db operation at a time (other operations will be queued up).
// meaning that it can process only 1 db operation at a time (other queries queue up).
//
// This method is used mainly internally and in the tests to execute write
// (save/delete) db operations as it helps with minimizing the SQLITE_BUSY errors.
//
// For the majority of cases you would want to use the regular DB() method
// since it allows concurrent db read operations.
// Most users should use simply AuxDB() as it will automatically
// route the query execution to AuxConcurrentDB() or AuxNonconcurrentDB().
//
// In a transaction the AuxNonconcurrentDB() and AuxNonconcurrentDB() refer to the same *dbx.TX instance.
// In a transaction the AuxConcurrentDB() and AuxNonconcurrentDB() refer to the same *dbx.TX instance.
func (app *BaseApp) AuxNonconcurrentDB() dbx.Builder {
return app.auxNonconcurrentDB
}
@ -1309,7 +1365,7 @@ func (app *BaseApp) registerBaseHooks() {
app.Logger().Warn("Failed to run periodic PRAGMA wal_checkpoint for the auxiliary DB", slog.String("error", execErr.Error()))
}
_, execErr = app.DB().NewQuery("PRAGMA optimize").Execute()
_, execErr = app.ConcurrentDB().NewQuery("PRAGMA optimize").Execute()
if execErr != nil {
app.Logger().Warn("Failed to run periodic PRAGMA optimize", slog.String("error", execErr.Error()))
}

View File

@ -5,6 +5,7 @@ import (
"database/sql"
"log/slog"
"os"
"slices"
"testing"
"time"
@ -99,9 +100,11 @@ func TestBaseAppBootstrap(t *testing.T) {
}
nilChecksBeforeReset := []nilCheck{
{"[before] concurrentDB", app.DB(), false},
{"[before] db", app.DB(), false},
{"[before] concurrentDB", app.ConcurrentDB(), false},
{"[before] nonconcurrentDB", app.NonconcurrentDB(), false},
{"[before] auxConcurrentDB", app.AuxDB(), false},
{"[before] auxDB", app.AuxDB(), false},
{"[before] auxConcurrentDB", app.AuxConcurrentDB(), false},
{"[before] auxNonconcurrentDB", app.AuxNonconcurrentDB(), false},
{"[before] settings", app.Settings(), false},
{"[before] logger", app.Logger(), false},
@ -116,9 +119,11 @@ func TestBaseAppBootstrap(t *testing.T) {
}
nilChecksAfterReset := []nilCheck{
{"[after] concurrentDB", app.DB(), true},
{"[after] db", app.DB(), true},
{"[after] concurrentDB", app.ConcurrentDB(), true},
{"[after] nonconcurrentDB", app.NonconcurrentDB(), true},
{"[after] auxConcurrentDB", app.AuxDB(), true},
{"[after] auxDB", app.AuxDB(), true},
{"[after] auxConcurrentDB", app.AuxConcurrentDB(), true},
{"[after] auxNonconcurrentDB", app.AuxNonconcurrentDB(), true},
{"[after] settings", app.Settings(), false},
{"[after] logger", app.Logger(), false},
@ -371,8 +376,8 @@ func TestBaseAppRefreshSettingsLoggerMinLevelEnabled(t *testing.T) {
}
// silence query logs
app.DB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {}
app.DB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {}
app.ConcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {}
app.ConcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {}
app.NonconcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {}
app.NonconcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {}
@ -395,3 +400,155 @@ func TestBaseAppRefreshSettingsLoggerMinLevelEnabled(t *testing.T) {
})
}
}
func TestBaseAppDBDualBuilder(t *testing.T) {
t.Parallel()
app, _ := tests.NewTestApp()
defer app.Cleanup()
concurrentQueries := []string{}
nonconcurrentQueries := []string{}
app.ConcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
concurrentQueries = append(concurrentQueries, sql)
}
app.ConcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
concurrentQueries = append(concurrentQueries, sql)
}
app.NonconcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
nonconcurrentQueries = append(nonconcurrentQueries, sql)
}
app.NonconcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
nonconcurrentQueries = append(nonconcurrentQueries, sql)
}
type testQuery struct {
query string
isConcurrent bool
}
regularTests := []testQuery{
{" \n sEleCt 1", true},
{"With abc(x) AS (select 2) SELECT x FROM abc", true},
{"create table t1(x int)", false},
{"insert into t1(x) values(1)", false},
{"update t1 set x = 2", false},
{"delete from t1", false},
}
txTests := []testQuery{
{"select 3", false},
{" \n WITH abc(x) AS (select 4) SELECT x FROM abc", false},
{"create table t2(x int)", false},
{"insert into t2(x) values(1)", false},
{"update t2 set x = 2", false},
{"delete from t2", false},
}
for _, item := range regularTests {
_, err := app.DB().NewQuery(item.query).Execute()
if err != nil {
t.Fatalf("Failed to execute query %q error: %v", item.query, err)
}
}
app.RunInTransaction(func(txApp core.App) error {
for _, item := range txTests {
_, err := txApp.DB().NewQuery(item.query).Execute()
if err != nil {
t.Fatalf("Failed to execute query %q error: %v", item.query, err)
}
}
return nil
})
allTests := append(regularTests, txTests...)
for _, item := range allTests {
if item.isConcurrent {
if !slices.Contains(concurrentQueries, item.query) {
t.Fatalf("Expected concurrent query\n%q\ngot\nconcurrent:%v\nnonconcurrent:%v", item.query, concurrentQueries, nonconcurrentQueries)
}
} else {
if !slices.Contains(nonconcurrentQueries, item.query) {
t.Fatalf("Expected nonconcurrent query\n%q\ngot\nconcurrent:%v\nnonconcurrent:%v", item.query, concurrentQueries, nonconcurrentQueries)
}
}
}
}
func TestBaseAppAuxDBDualBuilder(t *testing.T) {
t.Parallel()
app, _ := tests.NewTestApp()
defer app.Cleanup()
concurrentQueries := []string{}
nonconcurrentQueries := []string{}
app.AuxConcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
concurrentQueries = append(concurrentQueries, sql)
}
app.AuxConcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
concurrentQueries = append(concurrentQueries, sql)
}
app.AuxNonconcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
nonconcurrentQueries = append(nonconcurrentQueries, sql)
}
app.AuxNonconcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
nonconcurrentQueries = append(nonconcurrentQueries, sql)
}
type testQuery struct {
query string
isConcurrent bool
}
regularTests := []testQuery{
{" \n sEleCt 1", true},
{"With abc(x) AS (select 2) SELECT x FROM abc", true},
{"create table t1(x int)", false},
{"insert into t1(x) values(1)", false},
{"update t1 set x = 2", false},
{"delete from t1", false},
}
txTests := []testQuery{
{"select 3", false},
{" \n WITH abc(x) AS (select 4) SELECT x FROM abc", false},
{"create table t2(x int)", false},
{"insert into t2(x) values(1)", false},
{"update t2 set x = 2", false},
{"delete from t2", false},
}
for _, item := range regularTests {
_, err := app.AuxDB().NewQuery(item.query).Execute()
if err != nil {
t.Fatalf("Failed to execute query %q error: %v", item.query, err)
}
}
app.AuxRunInTransaction(func(txApp core.App) error {
for _, item := range txTests {
_, err := txApp.AuxDB().NewQuery(item.query).Execute()
if err != nil {
t.Fatalf("Failed to execute query %q error: %v", item.query, err)
}
}
return nil
})
allTests := append(regularTests, txTests...)
for _, item := range allTests {
if item.isConcurrent {
if !slices.Contains(concurrentQueries, item.query) {
t.Fatalf("Expected concurrent query\n%q\ngot\nconcurrent:%v\nnonconcurrent:%v", item.query, concurrentQueries, nonconcurrentQueries)
}
} else {
if !slices.Contains(nonconcurrentQueries, item.query) {
t.Fatalf("Expected nonconcurrent query\n%q\ngot\nconcurrent:%v\nnonconcurrent:%v", item.query, concurrentQueries, nonconcurrentQueries)
}
}
}
}

View File

@ -1623,7 +1623,7 @@ func TestCollectionSaveViewWrapping(t *testing.T) {
var sql string
rowErr := app.DB().NewQuery("SELECT sql FROM sqlite_master WHERE type='view' AND name={:name}").
rowErr := app.ConcurrentDB().NewQuery("SELECT sql FROM sqlite_master WHERE type='view' AND name={:name}").
Bind(dbx.Params{"name": viewName}).
Row(&sql)
if rowErr != nil {

View File

@ -153,7 +153,7 @@ func TestFindCachedCollectionByNameOrId(t *testing.T) {
defer app.Cleanup()
totalQueries := 0
app.DB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
app.ConcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
totalQueries++
}
@ -272,7 +272,7 @@ func TestFindCachedCollectionReferences(t *testing.T) {
}
totalQueries := 0
app.DB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
app.ConcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
totalQueries++
}

View File

@ -144,7 +144,7 @@ func (app *BaseApp) SyncRecordTableSchema(newCollection *Collection, oldCollecti
// run optimize per the SQLite recommendations
// (https://www.sqlite.org/pragma.html#pragma_optimize)
_, optimizeErr := app.DB().NewQuery("PRAGMA optimize").Execute()
_, optimizeErr := app.ConcurrentDB().NewQuery("PRAGMA optimize").Execute()
if optimizeErr != nil {
app.Logger().Warn("Failed to run PRAGMA optimize after record table sync", slog.String("error", optimizeErr.Error()))
}
@ -310,7 +310,8 @@ func dropCollectionIndexes(app App, collection *Collection) error {
continue
}
if _, err := app.DB().NewQuery(fmt.Sprintf("DROP INDEX IF EXISTS [[%s]]", parsed.IndexName)).Execute(); err != nil {
_, err := txApp.DB().NewQuery(fmt.Sprintf("DROP INDEX IF EXISTS [[%s]]", parsed.IndexName)).Execute()
if err != nil {
return err
}
}

View File

@ -87,7 +87,7 @@ func (validator *collectionValidator) run() error {
validator.original.IsNew(),
validation.Length(1, 100),
validation.Match(DefaultIdRegex),
validation.By(validators.UniqueId(validator.app.DB(), validator.new.TableName())),
validation.By(validators.UniqueId(validator.app.ConcurrentDB(), validator.new.TableName())),
).Else(
validation.By(validators.Equal(validator.original.Id)),
),
@ -558,7 +558,7 @@ func (cv *collectionValidator) checkIndexes(value any) error {
// ensure that the index name is not used in another collection
var usedTblName string
_ = cv.app.DB().Select("tbl_name").
_ = cv.app.ConcurrentDB().Select("tbl_name").
From("sqlite_master").
AndWhere(dbx.HashExp{"type": "index"}).
AndWhere(dbx.NewExp("LOWER([[tbl_name]])!=LOWER({:oldName})", dbx.Params{"oldName": cv.original.Name})).

View File

@ -62,16 +62,16 @@ func crc32Checksum(str string) string {
return strconv.FormatInt(int64(crc32.ChecksumIEEE([]byte(str))), 10)
}
// ModelQuery creates a new preconfigured select app.DB() query with preset
// ModelQuery creates a new preconfigured select data.db query with preset
// SELECT, FROM and other common fields based on the provided model.
func (app *BaseApp) ModelQuery(m Model) *dbx.SelectQuery {
return app.modelQuery(app.DB(), m)
return app.modelQuery(app.ConcurrentDB(), m)
}
// AuxModelQuery creates a new preconfigured select app.AuxDB() query with preset
// AuxModelQuery creates a new preconfigured select auxiliary.db query with preset
// SELECT, FROM and other common fields based on the provided model.
func (app *BaseApp) AuxModelQuery(m Model) *dbx.SelectQuery {
return app.modelQuery(app.AuxDB(), m)
return app.modelQuery(app.AuxConcurrentDB(), m)
}
func (app *BaseApp) modelQuery(db dbx.Builder, m Model) *dbx.SelectQuery {
@ -484,7 +484,7 @@ func validateRecordId(app App, collectionNameOrId string) validation.RuleFunc {
var exists int
rowErr := app.DB().Select("(1)").
rowErr := app.ConcurrentDB().Select("(1)").
From(collection.Name).
AndWhere(dbx.HashExp{"id": id}).
Limit(1).

187
core/db_builder.go Normal file
View File

@ -0,0 +1,187 @@
package core
import (
"strings"
"unicode"
"unicode/utf8"
"github.com/pocketbase/dbx"
)
var _ dbx.Builder = (*dualDBBuilder)(nil)
// note: expects both builder to use the same driver
type dualDBBuilder struct {
concurrentDB dbx.Builder
nonconcurrentDB dbx.Builder
}
// Select implements the [dbx.Builder.Select] interface method.
func (b *dualDBBuilder) Select(cols ...string) *dbx.SelectQuery {
return b.concurrentDB.Select(cols...)
}
// Model implements the [dbx.Builder.Model] interface method.
func (b *dualDBBuilder) Model(data interface{}) *dbx.ModelQuery {
return b.nonconcurrentDB.Model(data)
}
// GeneratePlaceholder implements the [dbx.Builder.GeneratePlaceholder] interface method.
func (b *dualDBBuilder) GeneratePlaceholder(i int) string {
return b.concurrentDB.GeneratePlaceholder(i)
}
// Quote implements the [dbx.Builder.Quote] interface method.
func (b *dualDBBuilder) Quote(str string) string {
return b.concurrentDB.Quote(str)
}
// QuoteSimpleTableName implements the [dbx.Builder.QuoteSimpleTableName] interface method.
func (b *dualDBBuilder) QuoteSimpleTableName(table string) string {
return b.concurrentDB.QuoteSimpleTableName(table)
}
// QuoteSimpleColumnName implements the [dbx.Builder.QuoteSimpleColumnName] interface method.
func (b *dualDBBuilder) QuoteSimpleColumnName(col string) string {
return b.concurrentDB.QuoteSimpleColumnName(col)
}
// QueryBuilder implements the [dbx.Builder.QueryBuilder] interface method.
func (b *dualDBBuilder) QueryBuilder() dbx.QueryBuilder {
return b.concurrentDB.QueryBuilder()
}
// Insert implements the [dbx.Builder.Insert] interface method.
func (b *dualDBBuilder) Insert(table string, cols dbx.Params) *dbx.Query {
return b.nonconcurrentDB.Insert(table, cols)
}
// Upsert implements the [dbx.Builder.Upsert] interface method.
func (b *dualDBBuilder) Upsert(table string, cols dbx.Params, constraints ...string) *dbx.Query {
return b.nonconcurrentDB.Upsert(table, cols, constraints...)
}
// Update implements the [dbx.Builder.Update] interface method.
func (b *dualDBBuilder) Update(table string, cols dbx.Params, where dbx.Expression) *dbx.Query {
return b.nonconcurrentDB.Update(table, cols, where)
}
// Delete implements the [dbx.Builder.Delete] interface method.
func (b *dualDBBuilder) Delete(table string, where dbx.Expression) *dbx.Query {
return b.nonconcurrentDB.Delete(table, where)
}
// CreateTable implements the [dbx.Builder.CreateTable] interface method.
func (b *dualDBBuilder) CreateTable(table string, cols map[string]string, options ...string) *dbx.Query {
return b.nonconcurrentDB.CreateTable(table, cols, options...)
}
// RenameTable implements the [dbx.Builder.RenameTable] interface method.
func (b *dualDBBuilder) RenameTable(oldName, newName string) *dbx.Query {
return b.nonconcurrentDB.RenameTable(oldName, newName)
}
// DropTable implements the [dbx.Builder.DropTable] interface method.
func (b *dualDBBuilder) DropTable(table string) *dbx.Query {
return b.nonconcurrentDB.DropTable(table)
}
// TruncateTable implements the [dbx.Builder.TruncateTable] interface method.
func (b *dualDBBuilder) TruncateTable(table string) *dbx.Query {
return b.nonconcurrentDB.TruncateTable(table)
}
// AddColumn implements the [dbx.Builder.AddColumn] interface method.
func (b *dualDBBuilder) AddColumn(table, col, typ string) *dbx.Query {
return b.nonconcurrentDB.AddColumn(table, col, typ)
}
// DropColumn implements the [dbx.Builder.DropColumn] interface method.
func (b *dualDBBuilder) DropColumn(table, col string) *dbx.Query {
return b.nonconcurrentDB.DropColumn(table, col)
}
// RenameColumn implements the [dbx.Builder.RenameColumn] interface method.
func (b *dualDBBuilder) RenameColumn(table, oldName, newName string) *dbx.Query {
return b.nonconcurrentDB.RenameColumn(table, oldName, newName)
}
// AlterColumn implements the [dbx.Builder.AlterColumn] interface method.
func (b *dualDBBuilder) AlterColumn(table, col, typ string) *dbx.Query {
return b.nonconcurrentDB.AlterColumn(table, col, typ)
}
// AddPrimaryKey implements the [dbx.Builder.AddPrimaryKey] interface method.
func (b *dualDBBuilder) AddPrimaryKey(table, name string, cols ...string) *dbx.Query {
return b.nonconcurrentDB.AddPrimaryKey(table, name, cols...)
}
// DropPrimaryKey implements the [dbx.Builder.DropPrimaryKey] interface method.
func (b *dualDBBuilder) DropPrimaryKey(table, name string) *dbx.Query {
return b.nonconcurrentDB.DropPrimaryKey(table, name)
}
// AddForeignKey implements the [dbx.Builder.AddForeignKey] interface method.
func (b *dualDBBuilder) AddForeignKey(table, name string, cols, refCols []string, refTable string, options ...string) *dbx.Query {
return b.nonconcurrentDB.AddForeignKey(table, name, cols, refCols, refTable, options...)
}
// DropForeignKey implements the [dbx.Builder.DropForeignKey] interface method.
func (b *dualDBBuilder) DropForeignKey(table, name string) *dbx.Query {
return b.nonconcurrentDB.DropForeignKey(table, name)
}
// CreateIndex implements the [dbx.Builder.CreateIndex] interface method.
func (b *dualDBBuilder) CreateIndex(table, name string, cols ...string) *dbx.Query {
return b.nonconcurrentDB.CreateIndex(table, name, cols...)
}
// CreateUniqueIndex implements the [dbx.Builder.CreateUniqueIndex] interface method.
func (b *dualDBBuilder) CreateUniqueIndex(table, name string, cols ...string) *dbx.Query {
return b.nonconcurrentDB.CreateUniqueIndex(table, name, cols...)
}
// DropIndex implements the [dbx.Builder.DropIndex] interface method.
func (b *dualDBBuilder) DropIndex(table, name string) *dbx.Query {
return b.nonconcurrentDB.DropIndex(table, name)
}
// NewQuery implements the [dbx.Builder.NewQuery] interface method by
// routing the SELECT queries to the concurrent builder instance.
func (b *dualDBBuilder) NewQuery(str string) *dbx.Query {
// note: technically INSERT/UPDATE/DELETE could also have CTE but since
// it is rare for now this scase is ignored to avoid unnecessary complicating the checks
trimmed := trimLeftSpaces(str)
if hasPrefixFold(trimmed, "SELECT") || hasPrefixFold(trimmed, "WITH") {
return b.concurrentDB.NewQuery(str)
}
return b.nonconcurrentDB.NewQuery(str)
}
var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1}
// note: similar to strings.Space() but without the right trim because it is not needed in our case
func trimLeftSpaces(str string) string {
start := 0
for ; start < len(str); start++ {
c := str[start]
if c >= utf8.RuneSelf {
return strings.TrimLeftFunc(str[start:], unicode.IsSpace)
}
if asciiSpace[c] == 0 {
break
}
}
return str[start:]
}
// note: the prefix is expected to be ASCII
func hasPrefixFold(str, prefix string) bool {
if len(str) < len(prefix) {
return false
}
return strings.EqualFold(str[:len(prefix)], prefix)
}

View File

@ -11,7 +11,7 @@ import (
func (app *BaseApp) TableColumns(tableName string) ([]string, error) {
columns := []string{}
err := app.DB().NewQuery("SELECT name FROM PRAGMA_TABLE_INFO({:tableName})").
err := app.ConcurrentDB().NewQuery("SELECT name FROM PRAGMA_TABLE_INFO({:tableName})").
Bind(dbx.Params{"tableName": tableName}).
Column(&columns)
@ -34,7 +34,7 @@ type TableInfoRow struct {
func (app *BaseApp) TableInfo(tableName string) ([]*TableInfoRow, error) {
info := []*TableInfoRow{}
err := app.DB().NewQuery("SELECT * FROM PRAGMA_TABLE_INFO({:tableName})").
err := app.ConcurrentDB().NewQuery("SELECT * FROM PRAGMA_TABLE_INFO({:tableName})").
Bind(dbx.Params{"tableName": tableName}).
All(&info)
if err != nil {
@ -59,7 +59,7 @@ func (app *BaseApp) TableIndexes(tableName string) (map[string]string, error) {
Sql string
}{}
err := app.DB().Select("name", "sql").
err := app.ConcurrentDB().Select("name", "sql").
From("sqlite_master").
AndWhere(dbx.NewExp("sql is not null")).
AndWhere(dbx.HashExp{
@ -87,7 +87,7 @@ func (app *BaseApp) TableIndexes(tableName string) (map[string]string, error) {
// NB! Be aware that this method is vulnerable to SQL injection and the
// "tableName" argument must come only from trusted input!
func (app *BaseApp) DeleteTable(tableName string) error {
_, err := app.DB().NewQuery(fmt.Sprintf(
_, err := app.NonconcurrentDB().NewQuery(fmt.Sprintf(
"DROP TABLE IF EXISTS {{%s}}",
tableName,
)).Execute()
@ -96,15 +96,15 @@ func (app *BaseApp) DeleteTable(tableName string) error {
}
// HasTable checks if a table (or view) with the provided name exists (case insensitive).
// in the current app.DB() instance.
// in the data.db.
func (app *BaseApp) HasTable(tableName string) bool {
return app.hasTable(app.DB(), tableName)
return app.hasTable(app.ConcurrentDB(), tableName)
}
// AuxHasTable checks if a table (or view) with the provided name exists (case insensitive)
// in the current app.AuxDB() instance.
// in the auixiliary.db.
func (app *BaseApp) AuxHasTable(tableName string) bool {
return app.hasTable(app.AuxDB(), tableName)
return app.hasTable(app.AuxConcurrentDB(), tableName)
}
func (app *BaseApp) hasTable(db dbx.Builder, tableName string) bool {
@ -120,16 +120,14 @@ func (app *BaseApp) hasTable(db dbx.Builder, tableName string) bool {
return err == nil && exists > 0
}
// Vacuum executes VACUUM on the current app.DB() instance
// in order to reclaim unused data db disk space.
// Vacuum executes VACUUM on the data.db in order to reclaim unused data db disk space.
func (app *BaseApp) Vacuum() error {
return app.vacuum(app.DB())
return app.vacuum(app.NonconcurrentDB())
}
// AuxVacuum executes VACUUM on the current app.AuxDB() instance
// in order to reclaim unused auxiliary db disk space.
// AuxVacuum executes VACUUM on the auxiliary.db in order to reclaim unused auxiliary db disk space.
func (app *BaseApp) AuxVacuum() error {
return app.vacuum(app.AuxDB())
return app.vacuum(app.AuxNonconcurrentDB())
}
func (app *BaseApp) vacuum(db dbx.Builder) error {

View File

@ -202,10 +202,10 @@ func TestVacuum(t *testing.T) {
defer app.Cleanup()
calledQueries := []string{}
app.DB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
app.NonconcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
calledQueries = append(calledQueries, sql)
}
app.DB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
app.NonconcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
calledQueries = append(calledQueries, sql)
}
@ -229,10 +229,10 @@ func TestAuxVacuum(t *testing.T) {
defer app.Cleanup()
calledQueries := []string{}
app.AuxDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
app.AuxNonconcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
calledQueries = append(calledQueries, sql)
}
app.AuxDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
app.AuxNonconcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
calledQueries = append(calledQueries, sql)
}

View File

@ -38,12 +38,12 @@ func TestModelQuery(t *testing.T) {
modelsQuery := app.ModelQuery(&core.Collection{})
logsModelQuery := app.AuxModelQuery(&core.Collection{})
if app.DB() == modelsQuery.Info().Builder {
t.Fatalf("ModelQuery() is not using app.DB()")
if app.ConcurrentDB() == modelsQuery.Info().Builder {
t.Fatalf("ModelQuery() is not using app.ConcurrentDB()")
}
if app.AuxDB() == logsModelQuery.Info().Builder {
t.Fatalf("AuxModelQuery() is not using app.AuxDB()")
if app.AuxConcurrentDB() == logsModelQuery.Info().Builder {
t.Fatalf("AuxModelQuery() is not using app.AuxConcurrentDB()")
}
expectedSQL := "SELECT {{_collections}}.* FROM `_collections`"

View File

@ -155,7 +155,7 @@ func TestFindFirstExternalAuthByExpr(t *testing.T) {
}
for i, s := range scenarios {
t.Run(fmt.Sprintf("%d_%v", i, s.expr.Build(app.DB().(*dbx.DB), dbx.Params{})), func(t *testing.T) {
t.Run(fmt.Sprintf("%d_%v", i, s.expr.Build(app.ConcurrentDB().(*dbx.DB), dbx.Params{})), func(t *testing.T) {
result, err := app.FindFirstExternalAuthByExpr(s.expr)
hasErr := err != nil

View File

@ -10,7 +10,6 @@ import (
"strings"
validation "github.com/go-ozzo/ozzo-validation/v4"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core/validators"
"github.com/pocketbase/pocketbase/tools/filesystem"
"github.com/pocketbase/pocketbase/tools/list"
@ -357,8 +356,7 @@ func (f *FileField) Intercept(
return nil
case InterceptorActionAfterCreateError, InterceptorActionAfterUpdateError:
// when in transaction we assume that the error was handled by afterRecordExecuteFailure
_, insideTransaction := app.DB().(*dbx.Tx)
if insideTransaction {
if app.IsTransactional() {
return actionFunc()
}

View File

@ -219,7 +219,7 @@ func (f *RelationField) ValidateValue(ctx context.Context, app App, record *Reco
}
var total int
_ = app.DB().
_ = app.ConcurrentDB().
Select("count(*)").
From(relCollection.Name).
AndWhere(dbx.In("id", list.ToInterfaceSlice(ids)...)).

View File

@ -194,7 +194,7 @@ func (f *TextField) ValidateValue(ctx context.Context, app App, record *Record)
// (@todo eventually may get replaced in the future with a system unique constraint to avoid races or wrapping the request in a transaction)
if f.Pattern != defaultLowercaseRecordIdPattern {
var exists int
err := app.DB().
err := app.ConcurrentDB().
Select("(1)").
From(record.TableName()).
Where(dbx.NewExp("id = {:id} COLLATE NOCASE", dbx.Params{"id": newVal})).

View File

@ -55,10 +55,10 @@ func TestBaseAppLoggerLevelDevPrint(t *testing.T) {
}
// silence query logs
app.DB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {}
app.DB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {}
app.NonconcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {}
app.NonconcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {}
app.concurrentDB.(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {}
app.concurrentDB.(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {}
app.nonconcurrentDB.(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {}
app.nonconcurrentDB.(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {}
app.Settings().Logs.MinLevel = testLogLevel
if err := app.Save(app.Settings()); err != nil {

View File

@ -2254,13 +2254,13 @@ func TestRecordDelete(t *testing.T) {
app.NonconcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
calledQueries = append(calledQueries, sql)
}
app.DB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
app.ConcurrentDB().(*dbx.DB).QueryLogFunc = func(ctx context.Context, t time.Duration, sql string, rows *sql.Rows, err error) {
calledQueries = append(calledQueries, sql)
}
app.NonconcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
calledQueries = append(calledQueries, sql)
}
app.DB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
app.ConcurrentDB().(*dbx.DB).ExecLogFunc = func(ctx context.Context, t time.Duration, sql string, result sql.Result, err error) {
calledQueries = append(calledQueries, sql)
}
rec3, _ := app.FindRecordById("users", "oap640cot4yru2s")

View File

@ -35,7 +35,7 @@ func (app *BaseApp) RecordQuery(collectionModelOrIdentifier any) *dbx.SelectQuer
tableName = "@@__invalidCollectionModelOrIdentifier"
}
query := app.DB().Select(app.DB().QuoteSimpleColumnName(tableName) + ".*").From(tableName)
query := app.ConcurrentDB().Select(app.ConcurrentDB().QuoteSimpleColumnName(tableName) + ".*").From(tableName)
// in case of an error attach a new context and cancel it immediately with the error
if collectionErr != nil {

View File

@ -103,7 +103,7 @@ func (app *BaseApp) expandRecords(records []*Record, expandPath string, fetchFun
// add the related id(s) as a dynamic relation field value to
// allow further expand checks at later stage in a more unified manner
prepErr := func() error {
q := app.DB().Select("id").
q := app.ConcurrentDB().Select("id").
From(indirectRel.Name).
Limit(1000) // the limit is arbitrary chosen and may change in the future

View File

@ -227,7 +227,7 @@ func (form *RecordUpsert) DrySubmit(callback func(txApp core.App, drySavedRecord
app := form.app.UnsafeWithoutHooks()
_, isTransactional := app.DB().(*dbx.Tx)
isTransactional := app.IsTransactional()
if !isTransactional {
return app.RunInTransaction(func(txApp core.App) error {
tx, ok := txApp.DB().(*dbx.Tx)

File diff suppressed because it is too large Load Diff

View File

@ -112,6 +112,8 @@ func (s *System) Attributes(fileKey string) (*blob.Attributes, error) {
// NB! Make sure to call Close() on the file after you are done working with it.
//
// If the file doesn't exist returns ErrNotFound.
//
// @todo consider renaming to GetFileReader to avoid the confusion with filesystem.File
func (s *System) GetFile(fileKey string) (*blob.Reader, error) {
return s.bucket.NewReader(s.ctx, fileKey)
}
@ -241,7 +243,8 @@ func (s *System) UploadMultipart(fh *multipart.FileHeader, fileKey string) error
return err
}
if _, err := w.ReadFrom(f); err != nil {
_, err = w.ReadFrom(f)
if err != nil {
w.Close()
return err
}