[#6433] fixed realtime delete event for RecordProxy and other custom record models

This commit is contained in:
Gani Georgiev 2025-02-09 19:24:17 +02:00
parent 048e534f0d
commit 920e893e11
1 changed files with 45 additions and 23 deletions

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"strings" "strings"
@ -353,7 +354,7 @@ func bindRealtimeEvents(app core.App) {
Func: func(e *core.ModelEvent) error { Func: func(e *core.ModelEvent) error {
record := realtimeResolveRecord(e.App, e.Model, "") record := realtimeResolveRecord(e.App, e.Model, "")
if record != nil { if record != nil {
// note: use the outside scoped app instance for the access checks so that the API ruless // note: use the outside scoped app instance for the access checks so that the API rules
// are performed out of the delete transaction ensuring that they would still work even if // are performed out of the delete transaction ensuring that they would still work even if
// a cascade-deleted record's API rule relies on an already deleted parent record // a cascade-deleted record's API rule relies on an already deleted parent record
err := realtimeBroadcastRecord(e.App, "delete", record, true, app) err := realtimeBroadcastRecord(e.App, "delete", record, true, app)
@ -375,14 +376,17 @@ func bindRealtimeEvents(app core.App) {
// delete: broadcast // delete: broadcast
app.OnModelAfterDeleteSuccess().Bind(&hook.Handler[*core.ModelEvent]{ app.OnModelAfterDeleteSuccess().Bind(&hook.Handler[*core.ModelEvent]{
Func: func(e *core.ModelEvent) error { Func: func(e *core.ModelEvent) error {
record := realtimeResolveRecord(e.App, e.Model, "") // note: only ensure that it is a collection record
if record != nil { // and don't use realtimeResolveRecord because in case of a
err := realtimeBroadcastDryCachedRecord(e.App, "delete", record) // custom model it'll fail to resolve since the record is already deleted
collection := realtimeResolveRecordCollection(e.App, e.Model)
if collection != nil {
err := realtimeBroadcastDryCacheKey(e.App, getDryCacheKey("delete", e.Model))
if err != nil { if err != nil {
app.Logger().Debug( app.Logger().Debug(
"Failed to broadcast record delete", "Failed to broadcast record delete",
slog.String("id", record.Id), slog.Any("id", e.Model.PK()),
slog.String("collectionName", record.Collection().Name), slog.String("collectionName", collection.Name),
slog.String("error", err.Error()), slog.String("error", err.Error()),
) )
} }
@ -398,7 +402,7 @@ func bindRealtimeEvents(app core.App) {
Func: func(e *core.ModelErrorEvent) error { Func: func(e *core.ModelErrorEvent) error {
record := realtimeResolveRecord(e.App, e.Model, "") record := realtimeResolveRecord(e.App, e.Model, "")
if record != nil { if record != nil {
err := realtimeUnsetDryCachedRecord(e.App, "delete", record) err := realtimeUnsetDryCacheKey(e.App, getDryCacheKey("delete", record))
if err != nil { if err != nil {
app.Logger().Debug( app.Logger().Debug(
"Failed to cleanup after broadcast record delete failure", "Failed to cleanup after broadcast record delete failure",
@ -418,7 +422,14 @@ func bindRealtimeEvents(app core.App) {
// resolveRecord converts *if possible* the provided model interface to a Record. // resolveRecord converts *if possible* the provided model interface to a Record.
// This is usually helpful if the provided model is a custom Record model struct. // This is usually helpful if the provided model is a custom Record model struct.
func realtimeResolveRecord(app core.App, model core.Model, optCollectionType string) *core.Record { func realtimeResolveRecord(app core.App, model core.Model, optCollectionType string) *core.Record {
record, _ := model.(*core.Record) var record *core.Record
switch v := model.(type) {
case *core.Record:
record = v
case core.RecordProxy:
record = v.ProxyRecord()
}
if record != nil { if record != nil {
if optCollectionType == "" || record.Collection().Type == optCollectionType { if optCollectionType == "" || record.Collection().Type == optCollectionType {
return record return record
@ -447,14 +458,20 @@ func realtimeResolveRecord(app core.App, model core.Model, optCollectionType str
// realtimeResolveRecordCollection extracts *if possible* the Collection model from the provided model interface. // realtimeResolveRecordCollection extracts *if possible* the Collection model from the provided model interface.
// This is usually helpful if the provided model is a custom Record model struct. // This is usually helpful if the provided model is a custom Record model struct.
func realtimeResolveRecordCollection(app core.App, model core.Model) (collection *core.Collection) { func realtimeResolveRecordCollection(app core.App, model core.Model) (collection *core.Collection) {
if record, ok := model.(*core.Record); ok { switch m := model.(type) {
collection = record.Collection() case *core.Record:
} else { return m.Collection()
// check if it is custom Record model struct (ignore "private" tables) case core.RecordProxy:
collection, _ = app.FindCachedCollectionByNameOrId(model.TableName()) return m.ProxyRecord().Collection()
default:
// check if it is custom Record model struct
collection, err := app.FindCachedCollectionByNameOrId(model.TableName())
if err == nil {
return collection
}
} }
return collection return nil
} }
// recordData represents the broadcasted record subscrition message data. // recordData represents the broadcasted record subscrition message data.
@ -489,7 +506,7 @@ func realtimeBroadcastRecord(app core.App, action string, record *core.Record, d
(collection.Id + "?"): collection.ListRule, (collection.Id + "?"): collection.ListRule,
} }
dryCacheKey := action + "/" + record.Id dryCacheKey := getDryCacheKey(action, record)
group := new(errgroup.Group) group := new(errgroup.Group)
@ -634,15 +651,13 @@ func realtimeBroadcastRecord(app core.App, action string, record *core.Record, d
return group.Wait() return group.Wait()
} }
// realtimeBroadcastDryCachedRecord broadcasts all cached record related messages. // realtimeBroadcastDryCacheKey broadcasts all cached key related messages.
func realtimeBroadcastDryCachedRecord(app core.App, action string, record *core.Record) error { func realtimeBroadcastDryCacheKey(app core.App, key string) error {
chunks := app.SubscriptionsBroker().ChunkedClients(clientsChunkSize) chunks := app.SubscriptionsBroker().ChunkedClients(clientsChunkSize)
if len(chunks) == 0 { if len(chunks) == 0 {
return nil // no subscribers return nil // no subscribers
} }
key := action + "/" + record.Id
group := new(errgroup.Group) group := new(errgroup.Group)
for _, chunk := range chunks { for _, chunk := range chunks {
@ -671,15 +686,13 @@ func realtimeBroadcastDryCachedRecord(app core.App, action string, record *core.
return group.Wait() return group.Wait()
} }
// realtimeUnsetDryCachedRecord removes the dry cached record related messages. // realtimeUnsetDryCacheKey removes the dry cached record related messages.
func realtimeUnsetDryCachedRecord(app core.App, action string, record *core.Record) error { func realtimeUnsetDryCacheKey(app core.App, key string) error {
chunks := app.SubscriptionsBroker().ChunkedClients(clientsChunkSize) chunks := app.SubscriptionsBroker().ChunkedClients(clientsChunkSize)
if len(chunks) == 0 { if len(chunks) == 0 {
return nil // no subscribers return nil // no subscribers
} }
key := action + "/" + record.Id
group := new(errgroup.Group) group := new(errgroup.Group)
for _, chunk := range chunks { for _, chunk := range chunks {
@ -697,6 +710,15 @@ func realtimeUnsetDryCachedRecord(app core.App, action string, record *core.Reco
return group.Wait() return group.Wait()
} }
func getDryCacheKey(action string, model core.Model) string {
pkStr, ok := model.PK().(string)
if !ok {
pkStr = fmt.Sprintf("%v", model.PK())
}
return action + "/" + model.TableName() + "/" + pkStr
}
func isSameAuth(authA, authB *core.Record) bool { func isSameAuth(authA, authB *core.Record) bool {
if authA == nil { if authA == nil {
return authB == nil return authB == nil