From 2599f40f7b342a4c0c900432ad49b5eec6aa59f6 Mon Sep 17 00:00:00 2001 From: xbgmsharp Date: Sun, 25 Jun 2023 15:12:04 +0200 Subject: [PATCH] Add ref_id to process_queue table to allow timeline per user_id and/or vessel_id --- initdb/02_1_signalk_api.sql | 33 ++++++++++++++++--------- initdb/02_2_signalk_cron.sql | 4 +-- initdb/02_3_1_signalk_public_tables.sql | 9 ++++--- initdb/02_4_signalk_auth.sql | 7 +++--- initdb/02_5_signalk_auth_otp.sql | 4 +-- 5 files changed, 35 insertions(+), 22 deletions(-) diff --git a/initdb/02_1_signalk_api.sql b/initdb/02_1_signalk_api.sql index 11b7d85..4a6bf49 100644 --- a/initdb/02_1_signalk_api.sql +++ b/initdb/02_1_signalk_api.sql @@ -297,6 +297,7 @@ CREATE FUNCTION metadata_upsert_trigger_fn() RETURNS trigger AS $metadata_upsert --PERFORM set_config('vessel.client_id', NEW.client_id, false); -- UPSERT - Insert vs Update for Metadata RAISE NOTICE 'metadata_upsert_trigger_fn'; + --PERFORM set_config('vessel.id', NEW.vessel_id, true); RAISE WARNING 'metadata_upsert_trigger_fn [%] [%]', current_setting('vessel.id', true), NEW; SELECT m.id,m.active INTO metadata_id, metadata_active FROM api.metadata m @@ -306,8 +307,8 @@ CREATE FUNCTION metadata_upsert_trigger_fn() RETURNS trigger AS $metadata_upsert -- send notifitacion if boat is back online IF metadata_active is False THEN -- Add monitor online entry to process queue for later notification - INSERT INTO process_queue (channel, payload, stored) - VALUES ('monitoring_online', metadata_id, now()); + INSERT INTO process_queue (channel, payload, stored, ref_id) + VALUES ('monitoring_online', metadata_id, now(), current_setting('vessel.id', true)); END IF; -- Update vessel metadata UPDATE api.metadata @@ -355,8 +356,8 @@ CREATE FUNCTION metadata_notification_trigger_fn() RETURNS trigger AS $metadata_ DECLARE BEGIN RAISE NOTICE 'metadata_notification_trigger_fn [%]', NEW; - INSERT INTO process_queue (channel, payload, stored) - VALUES ('monitoring_online', NEW.id, now()); + INSERT INTO process_queue (channel, payload, stored, ref_id) + VALUES ('monitoring_online', NEW.id, now(), NEW.vessel_id); RETURN NULL; END; $metadata_notification$ LANGUAGE plpgsql; @@ -399,7 +400,15 @@ CREATE FUNCTION metrics_trigger_fn() RETURNS trigger AS $metrics$ logbook_id integer; stay_id integer; valid_status BOOLEAN; + _vessel_id TEXT; BEGIN + -- Force vessel_id to import from SQL cli + SELECT vessel_id INTO _vessel_id FROM api.metadata WHERE client_id = NEW.client_id; + IF _vessel_id IS NOT NULL THEN + NEW.vessel_id = _vessel_id; + PERFORM set_config('vessel.id', _vessel_id, false) as vessel_id; + --RAISE NOTICE 'metrics_trigger_fn set vessel_id [%] [%] ', NEW.vessel_id, NEW.client_id; + END IF; -- Set client_id to new value to allow RLS --PERFORM set_config('vessel.client_id', NEW.client_id, false); NEW.vessel_id = current_setting('vessel.id'); @@ -447,8 +456,8 @@ CREATE FUNCTION metrics_trigger_fn() RETURNS trigger AS $metrics$ VALUES (current_setting('vessel.id', true), true, NEW.time, NEW.latitude, NEW.longitude, 1) RETURNING id INTO stay_id; -- Add stay entry to process queue for further processing - INSERT INTO process_queue (channel, payload, stored) - VALUES ('new_stay', stay_id, now()); + INSERT INTO process_queue (channel, payload, stored, ref_id) + VALUES ('new_stay', stay_id, now(), current_setting('vessel.id', true)); RAISE WARNING 'Metrics Insert first stay as no previous metrics exist, stay_id %', stay_id; END IF; -- Check if status is valid enum @@ -494,8 +503,8 @@ CREATE FUNCTION metrics_trigger_fn() RETURNS trigger AS $metrics$ WHERE id = stay_id; RAISE WARNING 'Metrics Updating Stay end current stay_id [%] [%] [%]', stay_id, NEW.status, NEW.time; -- Add moorage entry to process queue for further processing - INSERT INTO process_queue (channel, payload, stored) - VALUES ('new_moorage', stay_id, now()); + INSERT INTO process_queue (channel, payload, stored, ref_id) + VALUES ('new_moorage', stay_id, now(), current_setting('vessel.id', true)); ELSE RAISE WARNING 'Metrics Invalid stay_id [%] [%]', stay_id, NEW.time; END IF; @@ -520,8 +529,8 @@ CREATE FUNCTION metrics_trigger_fn() RETURNS trigger AS $metrics$ VALUES (current_setting('vessel.id', true), true, NEW.time, NEW.latitude, NEW.longitude, stay_code) RETURNING id INTO stay_id; -- Add stay entry to process queue for further processing - INSERT INTO process_queue (channel, payload, stored) - VALUES ('new_stay', stay_id, now()); + INSERT INTO process_queue (channel, payload, stored, ref_id) + VALUES ('new_stay', stay_id, now(), current_setting('vessel.id', true)); ELSE RAISE WARNING 'Metrics Invalid stay_id [%] [%]', stay_id, NEW.time; UPDATE api.stays @@ -545,8 +554,8 @@ CREATE FUNCTION metrics_trigger_fn() RETURNS trigger AS $metrics$ _to_lng = NEW.longitude WHERE id = logbook_id; -- Add logbook entry to process queue for later processing - INSERT INTO process_queue (channel, payload, stored) - VALUEs ('new_logbook', logbook_id, now()); + INSERT INTO process_queue (channel, payload, stored, ref_id) + VALUEs ('new_logbook', logbook_id, now(), current_setting('vessel.id', true)); ELSE RAISE WARNING 'Metrics Invalid logbook_id [%] [%]', logbook_id, NEW.time; END IF; diff --git a/initdb/02_2_signalk_cron.sql b/initdb/02_2_signalk_cron.sql index a394e9e..e90058b 100644 --- a/initdb/02_2_signalk_cron.sql +++ b/initdb/02_2_signalk_cron.sql @@ -145,9 +145,9 @@ begin --PERFORM send_pushover_py_fn('monitor_offline'::TEXT, user_settings::JSONB, app_settings::JSONB); -- log/insert/update process_queue table with processed INSERT INTO process_queue - (channel, payload, stored, processed) + (channel, payload, stored, processed, ref_id) VALUES - ('monitoring_offline', metadata_rec.id, metadata_rec.interval, now()) + ('monitoring_offline', metadata_rec.id, metadata_rec.interval, now(), metadata_rec.vessel_id) RETURNING id INTO process_id; RAISE NOTICE '-> cron_process_monitor_offline_fn updated process_queue table [%]', process_id; END LOOP; diff --git a/initdb/02_3_1_signalk_public_tables.sql b/initdb/02_3_1_signalk_public_tables.sql index f5ac226..1bea4a2 100644 --- a/initdb/02_3_1_signalk_public_tables.sql +++ b/initdb/02_3_1_signalk_public_tables.sql @@ -133,6 +133,7 @@ CREATE TABLE IF NOT EXISTS public.process_queue ( id SERIAL PRIMARY KEY, channel TEXT NOT NULL, payload TEXT NOT NULL, + ref_id TEXT NOT NULL, stored TIMESTAMP WITHOUT TIME ZONE NOT NULL, processed TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL ); @@ -145,24 +146,26 @@ CREATE INDEX ON public.process_queue (channel); CREATE INDEX ON public.process_queue (stored); CREATE INDEX ON public.process_queue (processed); +COMMENT ON COLUMN public.process_queue.ref_id IS 'either user_id or vessel_id'; + -- Function process_queue helpers create function new_account_entry_fn() returns trigger as $new_account_entry$ begin - insert into process_queue (channel, payload, stored) values ('new_account', NEW.email, now()); + insert into process_queue (channel, payload, stored, ref_id) values ('new_account', NEW.email, now(), NEW.user_id); return NEW; END; $new_account_entry$ language plpgsql; create function new_account_otp_validation_entry_fn() returns trigger as $new_account_otp_validation_entry$ begin - insert into process_queue (channel, payload, stored) values ('email_otp', NEW.email, now()); + insert into process_queue (channel, payload, stored, ref_id) values ('email_otp', NEW.email, now(), NEW.user_id); return NEW; END; $new_account_otp_validation_entry$ language plpgsql; create function new_vessel_entry_fn() returns trigger as $new_vessel_entry$ begin - insert into process_queue (channel, payload, stored) values ('new_vessel', NEW.owner_email, now()); + insert into process_queue (channel, payload, stored, ref_id) values ('new_vessel', NEW.owner_email, now(), NEW.vessel_id); return NEW; END; $new_vessel_entry$ language plpgsql; diff --git a/initdb/02_4_signalk_auth.sql b/initdb/02_4_signalk_auth.sql index a051667..413ad07 100644 --- a/initdb/02_4_signalk_auth.sql +++ b/initdb/02_4_signalk_auth.sql @@ -192,10 +192,11 @@ begin FROM auth.accounts a WHERE a.email = _email; IF _email_valid is null or _email_valid is False THEN - INSERT INTO process_queue (channel, payload, stored) - VALUES ('email_otp', email, now()); + INSERT INTO process_queue (channel, payload, stored, ref_id) + VALUES ('email_otp', email, now(), _user_id); END IF; + --RAISE WARNING 'api.login debug: [%],[%],[%]', app_jwt_secret, _role, login.email; -- Generate jwt select jwt.sign( -- row_to_json(r), '' @@ -204,7 +205,7 @@ begin ) as token from ( select _role as role, login.email as email, -- TODO replace with user_id - -- select _role as role, user_id as uid, + -- select _role as role, user_id as uid, -- add support in check_jwt extract(epoch from now())::integer + 60*60 as exp ) r into result; diff --git a/initdb/02_5_signalk_auth_otp.sql b/initdb/02_5_signalk_auth_otp.sql index 2afcd47..42e06de 100644 --- a/initdb/02_5_signalk_auth_otp.sql +++ b/initdb/02_5_signalk_auth_otp.sql @@ -360,8 +360,8 @@ AS $telegram$ DELETE FROM auth.otp WHERE user_email = _email; -- Send Notification async - INSERT INTO process_queue (channel, payload, stored) - VALUES ('telegram_valid', _email, now()); + --INSERT INTO process_queue (channel, payload, stored) + -- VALUES ('telegram_valid', _email, now()); RETURN True; END IF; RETURN False;