Add ref_id to process_queue table to allow timeline per user_id and/or vessel_id

This commit is contained in:
xbgmsharp
2023-06-25 15:12:04 +02:00
parent 4d833999e8
commit 2599f40f7b
5 changed files with 35 additions and 22 deletions

View File

@@ -297,6 +297,7 @@ CREATE FUNCTION metadata_upsert_trigger_fn() RETURNS trigger AS $metadata_upsert
--PERFORM set_config('vessel.client_id', NEW.client_id, false);
-- UPSERT - Insert vs Update for Metadata
RAISE NOTICE 'metadata_upsert_trigger_fn';
--PERFORM set_config('vessel.id', NEW.vessel_id, true);
RAISE WARNING 'metadata_upsert_trigger_fn [%] [%]', current_setting('vessel.id', true), NEW;
SELECT m.id,m.active INTO metadata_id, metadata_active
FROM api.metadata m
@@ -306,8 +307,8 @@ CREATE FUNCTION metadata_upsert_trigger_fn() RETURNS trigger AS $metadata_upsert
-- send notifitacion if boat is back online
IF metadata_active is False THEN
-- Add monitor online entry to process queue for later notification
INSERT INTO process_queue (channel, payload, stored)
VALUES ('monitoring_online', metadata_id, now());
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('monitoring_online', metadata_id, now(), current_setting('vessel.id', true));
END IF;
-- Update vessel metadata
UPDATE api.metadata
@@ -355,8 +356,8 @@ CREATE FUNCTION metadata_notification_trigger_fn() RETURNS trigger AS $metadata_
DECLARE
BEGIN
RAISE NOTICE 'metadata_notification_trigger_fn [%]', NEW;
INSERT INTO process_queue (channel, payload, stored)
VALUES ('monitoring_online', NEW.id, now());
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('monitoring_online', NEW.id, now(), NEW.vessel_id);
RETURN NULL;
END;
$metadata_notification$ LANGUAGE plpgsql;
@@ -399,7 +400,15 @@ CREATE FUNCTION metrics_trigger_fn() RETURNS trigger AS $metrics$
logbook_id integer;
stay_id integer;
valid_status BOOLEAN;
_vessel_id TEXT;
BEGIN
-- Force vessel_id to import from SQL cli
SELECT vessel_id INTO _vessel_id FROM api.metadata WHERE client_id = NEW.client_id;
IF _vessel_id IS NOT NULL THEN
NEW.vessel_id = _vessel_id;
PERFORM set_config('vessel.id', _vessel_id, false) as vessel_id;
--RAISE NOTICE 'metrics_trigger_fn set vessel_id [%] [%] ', NEW.vessel_id, NEW.client_id;
END IF;
-- Set client_id to new value to allow RLS
--PERFORM set_config('vessel.client_id', NEW.client_id, false);
NEW.vessel_id = current_setting('vessel.id');
@@ -447,8 +456,8 @@ CREATE FUNCTION metrics_trigger_fn() RETURNS trigger AS $metrics$
VALUES (current_setting('vessel.id', true), true, NEW.time, NEW.latitude, NEW.longitude, 1)
RETURNING id INTO stay_id;
-- Add stay entry to process queue for further processing
INSERT INTO process_queue (channel, payload, stored)
VALUES ('new_stay', stay_id, now());
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('new_stay', stay_id, now(), current_setting('vessel.id', true));
RAISE WARNING 'Metrics Insert first stay as no previous metrics exist, stay_id %', stay_id;
END IF;
-- Check if status is valid enum
@@ -494,8 +503,8 @@ CREATE FUNCTION metrics_trigger_fn() RETURNS trigger AS $metrics$
WHERE id = stay_id;
RAISE WARNING 'Metrics Updating Stay end current stay_id [%] [%] [%]', stay_id, NEW.status, NEW.time;
-- Add moorage entry to process queue for further processing
INSERT INTO process_queue (channel, payload, stored)
VALUES ('new_moorage', stay_id, now());
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('new_moorage', stay_id, now(), current_setting('vessel.id', true));
ELSE
RAISE WARNING 'Metrics Invalid stay_id [%] [%]', stay_id, NEW.time;
END IF;
@@ -520,8 +529,8 @@ CREATE FUNCTION metrics_trigger_fn() RETURNS trigger AS $metrics$
VALUES (current_setting('vessel.id', true), true, NEW.time, NEW.latitude, NEW.longitude, stay_code)
RETURNING id INTO stay_id;
-- Add stay entry to process queue for further processing
INSERT INTO process_queue (channel, payload, stored)
VALUES ('new_stay', stay_id, now());
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('new_stay', stay_id, now(), current_setting('vessel.id', true));
ELSE
RAISE WARNING 'Metrics Invalid stay_id [%] [%]', stay_id, NEW.time;
UPDATE api.stays
@@ -545,8 +554,8 @@ CREATE FUNCTION metrics_trigger_fn() RETURNS trigger AS $metrics$
_to_lng = NEW.longitude
WHERE id = logbook_id;
-- Add logbook entry to process queue for later processing
INSERT INTO process_queue (channel, payload, stored)
VALUEs ('new_logbook', logbook_id, now());
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUEs ('new_logbook', logbook_id, now(), current_setting('vessel.id', true));
ELSE
RAISE WARNING 'Metrics Invalid logbook_id [%] [%]', logbook_id, NEW.time;
END IF;

View File

@@ -145,9 +145,9 @@ begin
--PERFORM send_pushover_py_fn('monitor_offline'::TEXT, user_settings::JSONB, app_settings::JSONB);
-- log/insert/update process_queue table with processed
INSERT INTO process_queue
(channel, payload, stored, processed)
(channel, payload, stored, processed, ref_id)
VALUES
('monitoring_offline', metadata_rec.id, metadata_rec.interval, now())
('monitoring_offline', metadata_rec.id, metadata_rec.interval, now(), metadata_rec.vessel_id)
RETURNING id INTO process_id;
RAISE NOTICE '-> cron_process_monitor_offline_fn updated process_queue table [%]', process_id;
END LOOP;

View File

@@ -133,6 +133,7 @@ CREATE TABLE IF NOT EXISTS public.process_queue (
id SERIAL PRIMARY KEY,
channel TEXT NOT NULL,
payload TEXT NOT NULL,
ref_id TEXT NOT NULL,
stored TIMESTAMP WITHOUT TIME ZONE NOT NULL,
processed TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL
);
@@ -145,24 +146,26 @@ CREATE INDEX ON public.process_queue (channel);
CREATE INDEX ON public.process_queue (stored);
CREATE INDEX ON public.process_queue (processed);
COMMENT ON COLUMN public.process_queue.ref_id IS 'either user_id or vessel_id';
-- Function process_queue helpers
create function new_account_entry_fn() returns trigger as $new_account_entry$
begin
insert into process_queue (channel, payload, stored) values ('new_account', NEW.email, now());
insert into process_queue (channel, payload, stored, ref_id) values ('new_account', NEW.email, now(), NEW.user_id);
return NEW;
END;
$new_account_entry$ language plpgsql;
create function new_account_otp_validation_entry_fn() returns trigger as $new_account_otp_validation_entry$
begin
insert into process_queue (channel, payload, stored) values ('email_otp', NEW.email, now());
insert into process_queue (channel, payload, stored, ref_id) values ('email_otp', NEW.email, now(), NEW.user_id);
return NEW;
END;
$new_account_otp_validation_entry$ language plpgsql;
create function new_vessel_entry_fn() returns trigger as $new_vessel_entry$
begin
insert into process_queue (channel, payload, stored) values ('new_vessel', NEW.owner_email, now());
insert into process_queue (channel, payload, stored, ref_id) values ('new_vessel', NEW.owner_email, now(), NEW.vessel_id);
return NEW;
END;
$new_vessel_entry$ language plpgsql;

View File

@@ -192,10 +192,11 @@ begin
FROM auth.accounts a
WHERE a.email = _email;
IF _email_valid is null or _email_valid is False THEN
INSERT INTO process_queue (channel, payload, stored)
VALUES ('email_otp', email, now());
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('email_otp', email, now(), _user_id);
END IF;
--RAISE WARNING 'api.login debug: [%],[%],[%]', app_jwt_secret, _role, login.email;
-- Generate jwt
select jwt.sign(
-- row_to_json(r), ''
@@ -204,7 +205,7 @@ begin
) as token
from (
select _role as role, login.email as email, -- TODO replace with user_id
-- select _role as role, user_id as uid,
-- select _role as role, user_id as uid, -- add support in check_jwt
extract(epoch from now())::integer + 60*60 as exp
) r
into result;

View File

@@ -360,8 +360,8 @@ AS $telegram$
DELETE FROM auth.otp
WHERE user_email = _email;
-- Send Notification async
INSERT INTO process_queue (channel, payload, stored)
VALUES ('telegram_valid', _email, now());
--INSERT INTO process_queue (channel, payload, stored)
-- VALUES ('telegram_valid', _email, now());
RETURN True;
END IF;
RETURN False;