From 7001a22d92debf58ac92af21c11639d4c11c7308 Mon Sep 17 00:00:00 2001 From: Gani Georgiev Date: Wed, 18 Jan 2023 15:41:33 +0200 Subject: [PATCH] [#1628] fixed realtime panic on concurrent clients iteration --- CHANGELOG.md | 11 ++++++++++ apis/realtime.go | 29 ++++++++++++++++++++------ tools/subscriptions/broker.go | 20 +++++++++++------- tools/subscriptions/broker_test.go | 7 +++++++ tools/subscriptions/client.go | 33 ++++++++++++++++++++++++++++++ tools/subscriptions/client_test.go | 14 +++++++++++++ 6 files changed, 101 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b8d71751..f3bf34a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +## v0.11.3 + +- Fix realtime API panic on concurrent clients iteration ([#1628](https://github.com/pocketbase/pocketbase/issues/1628)) + + - `app.SubscriptionsBroker().Clients()` now returns a shallow copy of the underlying map. + + - Added `Discard()` and `IsDiscarded()` helper methods to the `subscriptions.Client` interface. + + - Slow clients should no longer "block" the main action completion. + + ## v0.11.2 - Fixed `fs.DeleteByPrefix()` hang on invalid S3 settings ([#1575](https://github.com/pocketbase/pocketbase/discussions/1575#discussioncomment-4661089)). diff --git a/apis/realtime.go b/apis/realtime.go index 6b235427..7da9875c 100644 --- a/apis/realtime.go +++ b/apis/realtime.go @@ -15,6 +15,7 @@ import ( "github.com/pocketbase/pocketbase/forms" "github.com/pocketbase/pocketbase/models" "github.com/pocketbase/pocketbase/resolvers" + "github.com/pocketbase/pocketbase/tools/routine" "github.com/pocketbase/pocketbase/tools/search" "github.com/pocketbase/pocketbase/tools/subscriptions" ) @@ -43,10 +44,14 @@ func (api *realtimeApi) connect(c echo.Context) error { client := subscriptions.NewDefaultClient() api.app.SubscriptionsBroker().Register(client) defer func() { - api.app.OnRealtimeDisconnectRequest().Trigger(&core.RealtimeDisconnectEvent{ + disconnectEvent := &core.RealtimeDisconnectEvent{ HttpContext: c, Client: client, - }) + } + + if err := api.app.OnRealtimeDisconnectRequest().Trigger(disconnectEvent); err != nil && api.app.IsDebug() { + log.Println(err) + } api.app.SubscriptionsBroker().Unregister(client.Id()) }() @@ -259,21 +264,27 @@ func (api *realtimeApi) bindEvents() { api.app.OnModelAfterCreate().PreAdd(func(e *core.ModelEvent) error { if record, ok := e.Model.(*models.Record); ok { - api.broadcastRecord("create", record) + if err := api.broadcastRecord("create", record); err != nil && api.app.IsDebug() { + log.Println(err) + } } return nil }) api.app.OnModelAfterUpdate().PreAdd(func(e *core.ModelEvent) error { if record, ok := e.Model.(*models.Record); ok { - api.broadcastRecord("update", record) + if err := api.broadcastRecord("update", record); err != nil && api.app.IsDebug() { + log.Println(err) + } } return nil }) api.app.OnModelBeforeDelete().Add(func(e *core.ModelEvent) error { if record, ok := e.Model.(*models.Record); ok { - api.broadcastRecord("delete", record) + if err := api.broadcastRecord("delete", record); err != nil && api.app.IsDebug() { + log.Println(err) + } } return nil }) @@ -370,6 +381,8 @@ func (api *realtimeApi) broadcastRecord(action string, record *models.Record) er encodedData := string(dataBytes) for _, client := range clients { + client := client + for subscription, rule := range subscriptionRuleMap { if !client.HasSubscription(subscription) { continue @@ -398,7 +411,11 @@ func (api *realtimeApi) broadcastRecord(action string, record *models.Record) er } } - client.Channel() <- msg + routine.FireAndForget(func() { + if !client.IsDiscarded() { + client.Channel() <- msg + } + }) } } diff --git a/tools/subscriptions/broker.go b/tools/subscriptions/broker.go index bdee3552..efa864bc 100644 --- a/tools/subscriptions/broker.go +++ b/tools/subscriptions/broker.go @@ -18,12 +18,19 @@ func NewBroker() *Broker { } } -// Clients returns all registered clients. +// Clients returns a shallow copy of all registered clients indexed +// with their connection id. func (b *Broker) Clients() map[string]Client { b.mux.RLock() defer b.mux.RUnlock() - return b.clients + copy := make(map[string]Client, len(b.clients)) + + for id, c := range b.clients { + copy[id] = c + } + + return copy } // ClientById finds a registered client by its id. @@ -56,9 +63,8 @@ func (b *Broker) Unregister(clientId string) { b.mux.Lock() defer b.mux.Unlock() - // Note: - // There is no need to explicitly close the client's channel since it will be GC-ed anyway. - // Addinitionally, closing the channel explicitly could panic when there are several - // subscriptions attached to the client that needs to receive the same event. - delete(b.clients, clientId) + if client, ok := b.clients[clientId]; ok { + client.Discard() + delete(b.clients, clientId) + } } diff --git a/tools/subscriptions/broker_test.go b/tools/subscriptions/broker_test.go index 87774a61..d01d290f 100644 --- a/tools/subscriptions/broker_test.go +++ b/tools/subscriptions/broker_test.go @@ -24,6 +24,13 @@ func TestClients(t *testing.T) { b.Register(subscriptions.NewDefaultClient()) b.Register(subscriptions.NewDefaultClient()) + // check if it is a shallow copy + clients := b.Clients() + for k := range clients { + delete(clients, k) + } + + // should return a new copy if total := len(b.Clients()); total != 2 { t.Fatalf("Expected 2 clients, got %v", total) } diff --git a/tools/subscriptions/client.go b/tools/subscriptions/client.go index c948a530..49026a2d 100644 --- a/tools/subscriptions/client.go +++ b/tools/subscriptions/client.go @@ -37,6 +37,16 @@ type Client interface { // Get retrieves the key value from the client's context. Get(key string) any + + // Discard marks the client as "discarded", meaning that it + // shouldn't be used anymore for sending new messages. + // + // It is safe to call Discard() multiple times. + Discard() + + // IsDiscarded indicates whether the client has been "discarded" + // and should no longer be used. + IsDiscarded() bool } // ensures that DefaultClient satisfies the Client interface @@ -45,6 +55,7 @@ var _ Client = (*DefaultClient)(nil) // DefaultClient defines a generic subscription client. type DefaultClient struct { mux sync.RWMutex + isDiscarded bool id string store map[string]any channel chan Message @@ -63,11 +74,17 @@ func NewDefaultClient() *DefaultClient { // Id implements the [Client.Id] interface method. func (c *DefaultClient) Id() string { + c.mux.RLock() + defer c.mux.RUnlock() + return c.id } // Channel implements the [Client.Channel] interface method. func (c *DefaultClient) Channel() chan Message { + c.mux.RLock() + defer c.mux.RUnlock() + return c.channel } @@ -139,3 +156,19 @@ func (c *DefaultClient) Set(key string, value any) { c.store[key] = value } + +// Discard implements the [Client.Discard] interface method. +func (c *DefaultClient) Discard() { + c.mux.Lock() + defer c.mux.Unlock() + + c.isDiscarded = true +} + +// IsDiscarded implements the [Client.IsDiscarded] interface method. +func (c *DefaultClient) IsDiscarded() bool { + c.mux.RLock() + defer c.mux.RUnlock() + + return c.isDiscarded +} diff --git a/tools/subscriptions/client_test.go b/tools/subscriptions/client_test.go index b26ae685..00ffe33f 100644 --- a/tools/subscriptions/client_test.go +++ b/tools/subscriptions/client_test.go @@ -129,3 +129,17 @@ func TestSetAndGet(t *testing.T) { t.Errorf("Expected 1, got %v", result) } } + +func TestDiscard(t *testing.T) { + c := subscriptions.NewDefaultClient() + + if v := c.IsDiscarded(); v { + t.Fatal("Expected false, got true") + } + + c.Discard() + + if v := c.IsDiscarded(); !v { + t.Fatal("Expected true, got false") + } +}