[#6780] added temp semaphore to limit the number of goroutines when cleaning files

This commit is contained in:
Gani Georgiev 2025-04-28 14:47:11 +03:00
parent 5713cf422b
commit 3885c93d59
1 changed files with 59 additions and 12 deletions

View File

@ -26,6 +26,8 @@ import (
"github.com/pocketbase/pocketbase/tools/store"
"github.com/pocketbase/pocketbase/tools/subscriptions"
"github.com/pocketbase/pocketbase/tools/types"
"github.com/spf13/cast"
"golang.org/x/sync/semaphore"
)
const (
@ -1190,6 +1192,33 @@ func (app *BaseApp) initAuxDB() error {
return nil
}
// @todo remove after refactoring the FilesManager interface
func supportFiles(m Model) bool {
var collection *Collection
switch v := m.(type) {
case *Collection:
collection = v
case *Record:
collection = v.Collection()
case RecordProxy:
if v.ProxyRecord() != nil {
collection = v.ProxyRecord().Collection()
}
}
if collection == nil {
return true
}
for _, f := range collection.Fields {
if f.Type() == FieldTypeFile {
return true
}
}
return false
}
func (app *BaseApp) registerBaseHooks() {
deletePrefix := func(prefix string) error {
fs, err := app.NewFilesystem()
@ -1206,18 +1235,35 @@ func (app *BaseApp) registerBaseHooks() {
return nil
}
maxFilesDeleteWorkers := cast.ToInt64(os.Getenv("PB_FILES_DELETE_MAX_WORKERS"))
if maxFilesDeleteWorkers <= 0 {
maxFilesDeleteWorkers = 2000 // the value is arbitrary chosen and may change in the future
}
deleteSem := semaphore.NewWeighted(maxFilesDeleteWorkers)
// try to delete the storage files from deleted Collection, Records, etc. model
app.OnModelAfterDeleteSuccess().Bind(&hook.Handler[*ModelEvent]{
Id: "__pbFilesManagerDelete__",
Func: func(e *ModelEvent) error {
if m, ok := e.Model.(FilesManager); ok && m.BaseFilesPath() != "" {
if m, ok := e.Model.(FilesManager); ok && m.BaseFilesPath() != "" && supportFiles(e.Model) {
// ensure that there is a trailing slash so that the list iterator could start walking from the prefix dir
// (https://github.com/pocketbase/pocketbase/discussions/5246#discussioncomment-10128955)
prefix := strings.TrimRight(m.BaseFilesPath(), "/") + "/"
// run in the background for "optimistic" delete to avoid
// blocking the delete transaction
// note: for now assume no context cancellation
err := deleteSem.Acquire(context.Background(), 1)
if err != nil {
app.Logger().Error(
"Failed to delete storage prefix (couldn't acquire a worker)",
slog.String("prefix", prefix),
slog.String("error", err.Error()),
)
} else {
// run in the background for "optimistic" delete to avoid blocking the delete transaction
routine.FireAndForget(func() {
defer deleteSem.Release(1)
if err := deletePrefix(prefix); err != nil {
app.Logger().Error(
"Failed to delete storage prefix (non critical error; usually could happen because of S3 api limits)",
@ -1227,6 +1273,7 @@ func (app *BaseApp) registerBaseHooks() {
}
})
}
}
return e.Next()
},