Skip to content

Commit e226ea6

Browse files
committed
Expand user events PubSub
1 parent 16bcce1 commit e226ea6

File tree

5 files changed

+108
-33
lines changed

5 files changed

+108
-33
lines changed

lib/claper/events.ex

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ defmodule Claper.Events do
66
"""
77

88
import Ecto.Query, warn: false
9-
alias Claper.Repo
109

10+
alias Claper.{Repo, Accounts}
1111
alias Claper.Events.{Event, ActivityLeader}
1212

1313
@default_page_size 5
@@ -363,7 +363,10 @@ defmodule Claper.Events do
363363
|> validate_unique_event()
364364
|> case do
365365
{:ok, event} ->
366-
Repo.insert(event, returning: [:uuid])
366+
with {:ok, event} <- Repo.insert(event, returning: [:uuid]) do
367+
broadcast_all_users({:created, event})
368+
{:ok, event}
369+
end
367370

368371
{:error, changeset} ->
369372
{:error, %{changeset | action: :insert}}
@@ -402,7 +405,25 @@ defmodule Claper.Events do
402405
|> validate_unique_event()
403406
|> case do
404407
{:ok, event} ->
405-
Repo.update(event, returning: [:uuid])
408+
with {:ok, event} <- Repo.update(event, returning: [:uuid]) do
409+
broadcast_all_users({:updated, event})
410+
411+
deleted_leaders =
412+
attrs
413+
|> Map.get("leaders", %{})
414+
|> Map.values()
415+
|> Enum.filter(fn
416+
%{"delete" => "true"} -> true
417+
_ -> false
418+
end)
419+
420+
for %{"email" => leader_email} <- deleted_leaders do
421+
leader = Accounts.get_user_by_email(leader_email)
422+
broadcast_user_events(leader.id, {:updated, event})
423+
end
424+
425+
{:ok, event}
426+
end
406427

407428
{:error, changeset} ->
408429
{:error, %{changeset | action: :update}}
@@ -424,7 +445,9 @@ defmodule Claper.Events do
424445
|> Repo.update()
425446
|> case do
426447
{:ok, event} ->
427-
broadcast({:ok, event, event.uuid}, :event_terminated)
448+
broadcast_all_users({:updated, event})
449+
broadcast_event(event.uuid, {:event_terminated, event.uuid})
450+
{:ok, event}
428451

429452
{:error, changeset} ->
430453
{:error, %{changeset | action: :update}}
@@ -693,7 +716,20 @@ defmodule Claper.Events do
693716
694717
"""
695718
def delete_event(%Event{} = event) do
696-
Repo.delete(event)
719+
leaders =
720+
for %{email: email} <- get_activity_leaders_for_event(event.id) do
721+
Accounts.get_user_by_email(email)
722+
end
723+
724+
with {:ok, event} <- Repo.delete(event) do
725+
broadcast_user_events(event.user_id, {:deleted, event})
726+
727+
for leader <- leaders do
728+
broadcast_user_events(leader.id, {:deleted, event})
729+
end
730+
731+
{:ok, event}
732+
end
697733
end
698734

699735
@doc """
@@ -709,8 +745,6 @@ defmodule Claper.Events do
709745
Event.changeset(event, attrs)
710746
end
711747

712-
alias Claper.Events.ActivityLeader
713-
714748
@doc """
715749
Creates a activity leader.
716750
@@ -777,13 +811,47 @@ defmodule Claper.Events do
777811
ActivityLeader.changeset(activity_leader, attrs)
778812
end
779813

780-
defp broadcast({:ok, e, event_uuid}, event) do
781-
Phoenix.PubSub.broadcast(
782-
Claper.PubSub,
783-
"event:#{event_uuid}",
784-
{event, event_uuid}
785-
)
814+
@doc """
815+
Subscribes to an event's public `Phoenix.PubSub` topic.
786816
787-
{:ok, e}
817+
The broadcasted messages match the pattern:
818+
819+
* {:terminated, event_uuid}
820+
821+
"""
822+
def subscribe_event(event_uuid) when is_binary(event_uuid) do
823+
Phoenix.PubSub.subscribe(Claper.PubSub, "event:#{event_uuid}")
824+
end
825+
826+
defp broadcast_event(event_uuid, message) when is_binary(event_uuid) do
827+
Phoenix.PubSub.broadcast(Claper.PubSub, "event:#{event_uuid}", message)
828+
end
829+
830+
@doc """
831+
Subscribes to a user's events private `Phoenix.PubSub` topic.
832+
833+
The broadcasted messages match the pattern:
834+
835+
* {:created, %Event{}}
836+
* {:updated, %Event{}}
837+
* {:deleted, %Event{}}
838+
839+
"""
840+
def subscribe_user_events(user_id) when is_integer(user_id) do
841+
Phoenix.PubSub.subscribe(Claper.PubSub, "user:#{user_id}:events")
842+
end
843+
844+
def broadcast_user_events(user_id, message) when is_integer(user_id) do
845+
Phoenix.PubSub.broadcast(Claper.PubSub, "user:#{user_id}:events", message)
846+
end
847+
848+
defp broadcast_all_users({_type, %Event{} = event} = message, _opts \\ []) do
849+
event = Repo.preload(event, [:leaders])
850+
broadcast_user_events(event.user_id, message)
851+
852+
for %{email: leader_email} <- event.leaders do
853+
leader = Accounts.get_user_by_email(leader_email)
854+
broadcast_user_events(leader.id, message)
855+
end
788856
end
789857
end

lib/claper/events/event.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ defmodule Claper.Events.Event do
9090
change(event, expired_at: expiry)
9191
end
9292

93-
def subscribe(event_id) do
94-
Phoenix.PubSub.subscribe(Claper.PubSub, "event:#{event_id}")
93+
def subscribe(event_uuid) do
94+
Phoenix.PubSub.subscribe(Claper.PubSub, "event:#{event_uuid}")
9595
end
9696

9797
def started?(event) do

lib/claper/tasks/converter.ex

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ defmodule Claper.Tasks.Converter do
44
We use a hash to identify the presentation. A new hash is generated when the conversion is finished and the presentation is being uploaded.
55
"""
66

7+
alias Claper.Events
78
alias ExAws.S3
89
alias Porcelain.Result
910

@@ -19,11 +20,7 @@ defmodule Claper.Tasks.Converter do
1920
"status" => "progress"
2021
})
2122

22-
Phoenix.PubSub.broadcast(
23-
Claper.PubSub,
24-
"events:#{user_id}",
25-
{:presentation_file_process_done, presentation}
26-
)
23+
Events.broadcast_user_events(user_id, {:presentation_file_process_done, presentation})
2724

2825
path =
2926
Path.join([
@@ -167,11 +164,7 @@ defmodule Claper.Tasks.Converter do
167164
}) do
168165
if get_presentation_storage() != "local", do: File.rm_rf!(path)
169166

170-
Phoenix.PubSub.broadcast(
171-
Claper.PubSub,
172-
"events:#{user_id}",
173-
{:presentation_file_process_done, presentation}
174-
)
167+
Events.broadcast_user_events(user_id, {:presentation_file_process_done, presentation})
175168
end
176169
end
177170

@@ -182,11 +175,7 @@ defmodule Claper.Tasks.Converter do
182175
}) do
183176
File.rm_rf!(path)
184177

185-
Phoenix.PubSub.broadcast(
186-
Claper.PubSub,
187-
"events:#{user_id}",
188-
{:presentation_file_process_done, presentation}
189-
)
178+
Events.broadcast_user_events(user_id, {:presentation_file_process_done, presentation})
190179
end
191180
end
192181

lib/claper_web/live/event_live/index.ex

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ defmodule ClaperWeb.EventLive.Index do
2222
})
2323

2424
if connected?(socket) do
25-
Phoenix.PubSub.subscribe(Claper.PubSub, "events:#{socket.assigns.current_user.id}")
25+
Events.subscribe_user_events(socket.assigns.current_user.id)
2626
end
2727

2828
expired_events_count = Events.count_expired_events(socket.assigns.current_user.id)
@@ -57,6 +57,12 @@ defmodule ClaperWeb.EventLive.Index do
5757
socket |> assign(:events, [event | socket.assigns.events]) |> put_flash(:info, nil)}
5858
end
5959

60+
@impl true
61+
def handle_info({type, %Events.Event{}}, socket)
62+
when type in [:created, :updated, :deleted] do
63+
{:noreply, refresh_events(socket)}
64+
end
65+
6066
@impl true
6167
def handle_event("validate", %{"event" => event_params}, socket) do
6268
changeset =
@@ -245,4 +251,16 @@ defmodule ClaperWeb.EventLive.Index do
245251
if(socket.assigns.page == 1, do: events, else: socket.assigns.events ++ events)
246252
)
247253
end
254+
255+
defp refresh_events(socket) do
256+
expired_events_count = Events.count_expired_events(socket.assigns.current_user.id)
257+
invited_events_count = Events.count_managed_events_by(socket.assigns.current_user.email)
258+
259+
socket
260+
|> assign(:has_expired_events, expired_events_count > 0)
261+
|> assign(:has_invited_events, invited_events_count > 0)
262+
|> assign(:events, [])
263+
|> assign(:page, 1)
264+
|> load_events()
265+
end
248266
end

lib/claper_web/live/event_live/show.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ defmodule ClaperWeb.EventLive.Show do
187187
end
188188

189189
@impl true
190-
def handle_info({:event_terminated, _event}, socket) do
190+
def handle_info({:event_terminated, _event_uuid}, socket) do
191191
{:noreply,
192192
socket
193193
|> put_flash(:error, gettext("This event has been terminated"))

0 commit comments

Comments
 (0)