diff --git a/initdb/99_migrations_202408.sql b/initdb/99_migrations_202408.sql new file mode 100644 index 0000000..a794387 --- /dev/null +++ b/initdb/99_migrations_202408.sql @@ -0,0 +1,1369 @@ +--------------------------------------------------------------------------- +-- Copyright 2021-2024 Francois Lacroix +-- This file is part of PostgSail which is released under Apache License, Version 2.0 (the "License"). +-- See file LICENSE or go to http://www.apache.org/licenses/LICENSE-2.0 for full license details. +-- +-- Migration August 2024 +-- +-- List current database +select current_database(); + +-- connect to the DB +\c signalk + +\echo 'Timing mode is enabled' +\timing + +\echo 'Force timezone, just in case' +set timezone to 'UTC'; + +-- Update api role SQL connection to 30 +ALTER ROLE authenticator WITH NOSUPERUSER NOCREATEDB NOCREATEROLE NOINHERIT NOBYPASSRLS NOREPLICATION CONNECTION LIMIT 30 LOGIN; + +-- Remove CONSTRAINT unique email for vessels table +ALTER TABLE auth.vessels DROP CONSTRAINT vessels_pkey; +-- Create new index for vessels table +CREATE INDEX vessels_owner_email_idx ON auth.vessels (owner_email); + +-- Update new logbook email template +UPDATE public.email_templates + SET email_content='Hello __RECIPIENT__, + +Here is a recap of your latest trip __LOGBOOK_NAME__ on PostgSail. + +Check out your timelapse at __APP_URL__/timelapse/__LOGBOOK_LINK__ + +See more details at __APP_URL__/log/__LOGBOOK_LINK__ + +Happy sailing! +The PostgSail Team' + WHERE "name"='logbook'; + +-- Update deactivated email template +UPDATE public.email_templates + SET email_content='Hello __RECIPIENT__, + +Your account has been deactivated and all your data has been removed from PostgSail system. + +Thank you for being a valued user of PostgSail! + +Find more details at __APP_URL__ +' + WHERE "name"='deactivated'; + +-- Update HTML email for new logbook +DROP FUNCTION IF EXISTS public.send_email_py_fn; +CREATE OR REPLACE FUNCTION public.send_email_py_fn(IN email_type TEXT, IN _user JSONB, IN app JSONB) RETURNS void +AS $send_email_py$ + # Import smtplib for the actual sending function + import smtplib + import requests + + # Import the email modules we need + from email.message import EmailMessage + from email.utils import formatdate,make_msgid + from email.mime.text import MIMEText + + # Use the shared cache to avoid preparing the email metadata + if email_type in SD: + plan = SD[email_type] + # A prepared statement from Python + else: + plan = plpy.prepare("SELECT * FROM email_templates WHERE name = $1", ["text"]) + SD[email_type] = plan + + # Execute the statement with the email_type param and limit to 1 result + rv = plpy.execute(plan, [email_type], 1) + email_subject = rv[0]['email_subject'] + email_content = rv[0]['email_content'] + + # Replace fields using input jsonb obj + if not _user or not app: + plpy.notice('send_email_py_fn Parameters [{}] [{}]'.format(_user, app)) + plpy.error('Error missing parameters') + return None + if 'logbook_name' in _user and _user['logbook_name']: + email_content = email_content.replace('__LOGBOOK_NAME__', str(_user['logbook_name'])) + if 'logbook_link' in _user and _user['logbook_link']: + email_content = email_content.replace('__LOGBOOK_LINK__', str(_user['logbook_link'])) + if 'logbook_img' in _user and _user['logbook_img']: + email_content = email_content.replace('__LOGBOOK_IMG__', str(_user['logbook_img'])) + if 'video_link' in _user and _user['video_link']: + email_content = email_content.replace('__VIDEO_LINK__', str( _user['video_link'])) + if 'recipient' in _user and _user['recipient']: + email_content = email_content.replace('__RECIPIENT__', _user['recipient']) + if 'boat' in _user and _user['boat']: + email_content = email_content.replace('__BOAT__', _user['boat']) + if 'badge' in _user and _user['badge']: + email_content = email_content.replace('__BADGE_NAME__', _user['badge']) + if 'otp_code' in _user and _user['otp_code']: + email_content = email_content.replace('__OTP_CODE__', _user['otp_code']) + if 'reset_qs' in _user and _user['reset_qs']: + email_content = email_content.replace('__RESET_QS__', _user['reset_qs']) + if 'alert' in _user and _user['alert']: + email_content = email_content.replace('__ALERT__', _user['alert']) + + if 'app.url' in app and app['app.url']: + email_content = email_content.replace('__APP_URL__', app['app.url']) + + email_from = 'root@localhost' + if 'app.email_from' in app and app['app.email_from']: + email_from = 'PostgSail <' + app['app.email_from'] + '>' + #plpy.notice('Sending email from [{}] [{}]'.format(email_from, app['app.email_from'])) + + email_to = 'root@localhost' + if 'email' in _user and _user['email']: + email_to = _user['email'] + #plpy.notice('Sending email to [{}] [{}]'.format(email_to, _user['email'])) + else: + plpy.error('Error email to') + return None + + if email_type == 'logbook': + msg = EmailMessage() + msg.set_content(email_content) + else: + msg = MIMEText(email_content, 'plain', 'utf-8') + msg["Subject"] = email_subject + msg["From"] = email_from + msg["To"] = email_to + msg["Date"] = formatdate() + msg["Message-ID"] = make_msgid() + + if email_type == 'logbook' and 'logbook_img' in _user and _user['logbook_img']: + # Create a Content-ID for the image + image_cid = make_msgid() + # Transform HTML template + logbook_link = "{__APP_URL__}/log/{__LOGBOOK_LINK__}".format( __APP_URL__=app['app.url'], __LOGBOOK_LINK__=str(_user['logbook_link'])) + timelapse_link = "{__APP_URL__}/timelapse/{__LOGBOOK_LINK__}".format( __APP_URL__=app['app.url'], __LOGBOOK_LINK__=str(_user['logbook_link'])) + email_content = email_content.replace('\n', '
') + email_content = email_content.replace(str(_user['logbook_name']), '{logbook_name}'.format(logbook_link=logbook_link, logbook_name=str(_user['logbook_name']))) + email_content = email_content.replace(logbook_link, '{logbook_link}'.format(logbook_link=logbook_link)) + email_content = email_content.replace(timelapse_link, '{timelapse_link}'.format(timelapse_link=timelapse_link)) + # Set an alternative html body + msg.add_alternative("""\ + + +

{email_content}

+ + + +""".format(email_content=email_content, image_cid=image_cid[1:-1]), subtype='html') + img_url = 'https://gis.openplotter.cloud/{}'.format(str(_user['logbook_img'])) + response = requests.get(img_url, stream=True) + if response.status_code == 200: + msg.get_payload()[1].add_related(response.raw.data, + maintype='image', + subtype='png', + cid=image_cid) + + server_smtp = 'localhost' + if 'app.email_server' in app and app['app.email_server']: + server_smtp = app['app.email_server'] + #plpy.notice('Sending server [{}] [{}]'.format(server_smtp, app['app.email_server'])) + + # Send the message via our own SMTP server. + try: + # send your message with credentials specified above + with smtplib.SMTP(server_smtp, 587) as server: + if 'app.email_user' in app and app['app.email_user'] \ + and 'app.email_pass' in app and app['app.email_pass']: + server.starttls() + server.login(app['app.email_user'], app['app.email_pass']) + #server.send_message(msg) + server.sendmail(msg["From"], msg["To"], msg.as_string()) + server.quit() + # tell the script to report if your message was sent or which errors need to be fixed + plpy.notice('Sent email successfully to [{}] [{}]'.format(msg["To"], msg["Subject"])) + return None + except OSError as error: + plpy.error('OS Error occurred: ' + str(error)) + except smtplib.SMTPConnectError: + plpy.error('Failed to connect to the server. Bad connection settings?') + except smtplib.SMTPServerDisconnected: + plpy.error('Failed to connect to the server. Wrong user/password?') + except smtplib.SMTPException as e: + plpy.error('SMTP error occurred: ' + str(e)) +$send_email_py$ TRANSFORM FOR TYPE jsonb LANGUAGE plpython3u; +-- Description +COMMENT ON FUNCTION + public.send_email_py_fn + IS 'Send email notification using plpython3u'; + +-- Set default settings to avoid sql shared session cache for every new HTTP request +CREATE OR REPLACE FUNCTION public.check_jwt() + RETURNS void + LANGUAGE plpgsql + SECURITY DEFINER +AS $function$ +-- Prevent unregister user or unregister vessel access +-- Allow anonymous access +DECLARE + _role name := NULL; + _email text := NULL; + anonymous_rec record; + _path name := NULL; + _vid text := NULL; + _vname text := NULL; + boat TEXT := NULL; + _pid INTEGER := 0; -- public_id + _pvessel TEXT := NULL; -- public_type + _ptype TEXT := NULL; -- public_type + _ppath BOOLEAN := False; -- public_path + _pvalid BOOLEAN := False; -- public_valid + _pheader text := NULL; -- public_header + valid_public_type BOOLEAN := False; + account_rec record; + vessel_rec record; +BEGIN + -- RESET settings to avoid sql shared session cache + -- Valid for every new HTTP request + PERFORM set_config('vessel.id', NULL, true); + PERFORM set_config('vessel.name', NULL, true); + PERFORM set_config('user.id', NULL, true); + PERFORM set_config('user.email', NULL, true); + -- Extract email and role from jwt token + --RAISE WARNING 'check_jwt jwt %', current_setting('request.jwt.claims', true); + SELECT current_setting('request.jwt.claims', true)::json->>'email' INTO _email; + PERFORM set_config('user.email', _email, true); + SELECT current_setting('request.jwt.claims', true)::json->>'role' INTO _role; + --RAISE WARNING 'jwt email %', current_setting('request.jwt.claims', true)::json->>'email'; + --RAISE WARNING 'jwt role %', current_setting('request.jwt.claims', true)::json->>'role'; + --RAISE WARNING 'cur_user %', current_user; + --RAISE WARNING 'user.id [%], user.email [%]', current_setting('user.id', true), current_setting('user.email', true); + --RAISE WARNING 'vessel.id [%], vessel.name [%]', current_setting('vessel.id', true), current_setting('vessel.name', true); + + --TODO SELECT current_setting('request.jwt.uid', true)::json->>'uid' INTO _user_id; + --TODO RAISE WARNING 'jwt user_id %', current_setting('request.jwt.uid', true)::json->>'uid'; + --TODO SELECT current_setting('request.jwt.vid', true)::json->>'vid' INTO _vessel_id; + --TODO RAISE WARNING 'jwt vessel_id %', current_setting('request.jwt.vid', true)::json->>'vid'; + IF _role = 'user_role' THEN + -- Check the user exist in the accounts table + SELECT * INTO account_rec + FROM auth.accounts + WHERE auth.accounts.email = _email; + IF account_rec.email IS NULL THEN + RAISE EXCEPTION 'Invalid user' + USING HINT = 'Unknown user or password'; + END IF; + -- Set session variables + PERFORM set_config('user.id', account_rec.user_id, true); + SELECT current_setting('request.path', true) into _path; + --RAISE WARNING 'req path %', current_setting('request.path', true); + -- Function allow without defined vessel like for anonymous role + IF _path ~ '^\/rpc\/(login|signup|recover|reset)$' THEN + RETURN; + END IF; + -- Function allow without defined vessel as user role + -- openapi doc, user settings, otp code and vessel registration + IF _path = '/rpc/settings_fn' + OR _path = '/rpc/register_vessel' + OR _path = '/rpc/update_user_preferences_fn' + OR _path = '/rpc/versions_fn' + OR _path = '/rpc/email_fn' + OR _path = '/' THEN + RETURN; + END IF; + -- Check a vessel and user exist + SELECT auth.vessels.* INTO vessel_rec + FROM auth.vessels, auth.accounts + WHERE auth.vessels.owner_email = auth.accounts.email + AND auth.accounts.email = _email; + -- check if boat exist yet? + IF vessel_rec.owner_email IS NULL THEN + -- Return http status code 551 with message + RAISE sqlstate 'PT551' using + message = 'Vessel Required', + detail = 'Invalid vessel', + hint = 'Unknown vessel'; + --RETURN; -- ignore if not exist + END IF; + -- Redundant? + IF vessel_rec.vessel_id IS NULL THEN + RAISE EXCEPTION 'Invalid vessel' + USING HINT = 'Unknown vessel id'; + END IF; + -- Set session variables + PERFORM set_config('vessel.id', vessel_rec.vessel_id, true); + PERFORM set_config('vessel.name', vessel_rec.name, true); + --RAISE WARNING 'public.check_jwt() user_role vessel.id [%]', current_setting('vessel.id', false); + --RAISE WARNING 'public.check_jwt() user_role vessel.name [%]', current_setting('vessel.name', false); + ELSIF _role = 'vessel_role' THEN + SELECT current_setting('request.path', true) into _path; + --RAISE WARNING 'req path %', current_setting('request.path', true); + -- Function allow without defined vessel like for anonymous role + IF _path ~ '^\/rpc\/(oauth_\w+)$' THEN + RETURN; + END IF; + -- Extract vessel_id from jwt token + SELECT current_setting('request.jwt.claims', true)::json->>'vid' INTO _vid; + -- Check the vessel and user exist + SELECT auth.vessels.* INTO vessel_rec + FROM auth.vessels, auth.accounts + WHERE auth.vessels.owner_email = auth.accounts.email + AND auth.accounts.email = _email + AND auth.vessels.vessel_id = _vid; + IF vessel_rec.owner_email IS NULL THEN + RAISE EXCEPTION 'Invalid vessel' + USING HINT = 'Unknown vessel owner_email'; + END IF; + PERFORM set_config('vessel.id', vessel_rec.vessel_id, true); + PERFORM set_config('vessel.name', vessel_rec.name, true); + --RAISE WARNING 'public.check_jwt() user_role vessel.name %', current_setting('vessel.name', false); + --RAISE WARNING 'public.check_jwt() user_role vessel.id %', current_setting('vessel.id', false); + ELSIF _role = 'api_anonymous' THEN + --RAISE WARNING 'public.check_jwt() api_anonymous path[%] vid:[%]', current_setting('request.path', true), current_setting('vessel.id', false); + -- Check if path is the a valid allow anonymous path + SELECT current_setting('request.path', true) ~ '^/(logs_view|log_view|rpc/timelapse_fn|rpc/timelapse2_fn|monitoring_view|stats_logs_view|stats_moorages_view|rpc/stats_logs_fn)$' INTO _ppath; + if _ppath is True then + -- Check is custom header is present and valid + SELECT current_setting('request.headers', true)::json->>'x-is-public' into _pheader; + --RAISE WARNING 'public.check_jwt() api_anonymous _pheader [%]', _pheader; + if _pheader is null then + return; + --RAISE EXCEPTION 'Invalid public_header' + -- USING HINT = 'Stop being so evil and maybe you can log in'; + end if; + SELECT convert_from(decode(_pheader, 'base64'), 'utf-8') + ~ '\w+,public_(logs|logs_list|stats|timelapse|monitoring),\d+$' into _pvalid; + RAISE WARNING 'public.check_jwt() api_anonymous _pvalid [%]', _pvalid; + if _pvalid is null or _pvalid is False then + RAISE EXCEPTION 'Invalid public_valid' + USING HINT = 'Stop being so evil and maybe you can log in'; + end if; + WITH regex AS ( + SELECT regexp_match( + convert_from( + decode(_pheader, 'base64'), 'utf-8'), + '(\w+),(public_(logs|logs_list|stats|timelapse|monitoring)),(\d+)$') AS match + ) + SELECT match[1], match[2], match[4] into _pvessel, _ptype, _pid + FROM regex; + RAISE WARNING 'public.check_jwt() api_anonymous [%] [%] [%]', _pvessel, _ptype, _pid; + if _pvessel is not null and _ptype is not null then + -- Everything seem fine, get the vessel_id base on the vessel name. + SELECT _ptype::name = any(enum_range(null::public_type)::name[]) INTO valid_public_type; + IF valid_public_type IS False THEN + -- Ignore entry if type is invalid + RAISE EXCEPTION 'Invalid public_type' + USING HINT = 'Stop being so evil and maybe you can log in'; + END IF; + -- Check if boat name match public_vessel name + boat := '^' || _pvessel || '$'; + IF _ptype ~ '^public_(logs|timelapse)$' AND _pid > 0 THEN + WITH log as ( + SELECT vessel_id from api.logbook l where l.id = _pid + ) + SELECT v.vessel_id, v.name into anonymous_rec + FROM auth.accounts a, auth.vessels v, jsonb_each_text(a.preferences) as prefs, log l + WHERE v.vessel_id = l.vessel_id + AND a.email = v.owner_email + AND a.preferences->>'public_vessel'::text ~* boat + AND prefs.key = _ptype::TEXT + AND prefs.value::BOOLEAN = true; + RAISE WARNING '-> ispublic_fn public_logs output boat:[%], type:[%], result:[%]', _pvessel, _ptype, anonymous_rec; + IF anonymous_rec.vessel_id IS NOT NULL THEN + PERFORM set_config('vessel.id', anonymous_rec.vessel_id, true); + PERFORM set_config('vessel.name', anonymous_rec.name, true); + RETURN; + END IF; + ELSE + SELECT v.vessel_id, v.name into anonymous_rec + FROM auth.accounts a, auth.vessels v, jsonb_each_text(a.preferences) as prefs + WHERE a.email = v.owner_email + AND a.preferences->>'public_vessel'::text ~* boat + AND prefs.key = _ptype::TEXT + AND prefs.value::BOOLEAN = true; + RAISE WARNING '-> ispublic_fn output boat:[%], type:[%], result:[%]', _pvessel, _ptype, anonymous_rec; + IF anonymous_rec.vessel_id IS NOT NULL THEN + PERFORM set_config('vessel.id', anonymous_rec.vessel_id, true); + PERFORM set_config('vessel.name', anonymous_rec.name, true); + RETURN; + END IF; + END IF; + --RAISE sqlstate 'PT404' using message = 'unknown resource'; + END IF; -- end anonymous path + END IF; + ELSIF _role <> 'api_anonymous' THEN + RAISE EXCEPTION 'Invalid role' + USING HINT = 'Stop being so evil and maybe you can log in'; + END IF; +END +$function$ +; +-- Description +COMMENT ON FUNCTION + public.check_jwt() IS 'PostgREST API db-pre-request check, set_config according to role (api_anonymous,vessel_role,user_role)'; + +-- Remove DEBUG +-- Only one "new_stay" insert +CREATE OR REPLACE FUNCTION public.metrics_trigger_fn() + RETURNS trigger + LANGUAGE plpgsql +AS $function$ + DECLARE + previous_metric record; + stay_code INTEGER; + logbook_id INTEGER; + stay_id INTEGER; + valid_status BOOLEAN := False; + _vessel_id TEXT; + distance BOOLEAN := False; + BEGIN + --RAISE NOTICE 'metrics_trigger_fn'; + --RAISE WARNING 'metrics_trigger_fn [%] [%]', current_setting('vessel.id', true), NEW; + -- Ensure vessel.id to new value to allow RLS + IF NEW.vessel_id IS NULL THEN + -- set vessel_id from jwt if not present in INSERT query + NEW.vessel_id := current_setting('vessel.id'); + END IF; + -- Boat metadata are check using api.metrics REFERENCES to api.metadata + -- Fetch the latest entry to compare status against the new status to be insert + SELECT * INTO previous_metric + FROM api.metrics m + WHERE m.vessel_id IS NOT NULL + AND m.vessel_id = current_setting('vessel.id', true) + ORDER BY m.time DESC LIMIT 1; + --RAISE NOTICE 'Metrics Status, New:[%] Previous:[%]', NEW.status, previous_metric.status; + IF previous_metric.time = NEW.time THEN + -- Ignore entry if same time + --RAISE WARNING 'Metrics Ignoring metric, vessel_id [%], duplicate time [%] = [%]', NEW.vessel_id, previous_metric.time, NEW.time; + RETURN NULL; + END IF; + IF previous_metric.time > NEW.time THEN + -- Ignore entry if new time is later than previous time + --RAISE WARNING 'Metrics Ignoring metric, vessel_id [%], new time is older than previous_metric.time [%] > [%]', NEW.vessel_id, previous_metric.time, NEW.time; + RETURN NULL; + END IF; + -- Check if latitude or longitude are type double + --IF public.isdouble(NEW.latitude::TEXT) IS False OR public.isdouble(NEW.longitude::TEXT) IS False THEN + -- -- Ignore entry if null latitude,longitude + -- RAISE WARNING 'Metrics Ignoring metric, vessel_id [%], not a double type for latitude or longitude [%] [%]', NEW.vessel_id, NEW.latitude, NEW.longitude; + -- RETURN NULL; + --END IF; + -- Check if latitude or longitude are null + IF NEW.latitude IS NULL OR NEW.longitude IS NULL THEN + -- Ignore entry if null latitude,longitude + --RAISE WARNING 'Metrics Ignoring metric, vessel_id [%], null latitude or longitude [%] [%]', NEW.vessel_id, NEW.latitude, NEW.longitude; + RETURN NULL; + END IF; + -- Check if valid latitude + IF NEW.latitude >= 90 OR NEW.latitude <= -90 THEN + -- Ignore entry if invalid latitude,longitude + --RAISE WARNING 'Metrics Ignoring metric, vessel_id [%], invalid latitude >= 90 OR <= -90 [%] [%]', NEW.vessel_id, NEW.latitude, NEW.longitude; + RETURN NULL; + END IF; + -- Check if valid longitude + IF NEW.longitude >= 180 OR NEW.longitude <= -180 THEN + -- Ignore entry if invalid latitude,longitude + --RAISE WARNING 'Metrics Ignoring metric, vessel_id [%], invalid longitude >= 180 OR <= -180 [%] [%]', NEW.vessel_id, NEW.latitude, NEW.longitude; + RETURN NULL; + END IF; + -- Check if valid longitude and latitude not close to -0.0000001 from Victron Cerbo + IF NEW.latitude = NEW.longitude THEN + -- Ignore entry if latitude,longitude are equal + --RAISE WARNING 'Metrics Ignoring metric, vessel_id [%], latitude and longitude are equal [%] [%]', NEW.vessel_id, NEW.latitude, NEW.longitude; + RETURN NULL; + END IF; + -- Check distance with previous point is > 10km + --SELECT ST_Distance( + -- ST_MakePoint(NEW.latitude,NEW.longitude)::geography, + -- ST_MakePoint(previous_metric.latitude,previous_metric.longitude)::geography) > 10000 INTO distance; + --IF distance IS True THEN + -- RAISE WARNING 'Metrics Ignoring metric, distance between previous metric and new metric is too large, vessel_id [%] distance[%]', NEW.vessel_id, distance; + -- RETURN NULL; + --END IF; + -- Check if status is null but speed is over 3knots set status to sailing + IF NEW.status IS NULL AND NEW.speedoverground >= 3 THEN + RAISE WARNING 'Metrics Unknown NEW.status, vessel_id [%], null status, set to sailing because of speedoverground is +3 from [%]', NEW.vessel_id, NEW.status; + NEW.status := 'sailing'; + -- Check if status is null then set status to default moored + ELSIF NEW.status IS NULL THEN + RAISE WARNING 'Metrics Unknown NEW.status, vessel_id [%], null status, set to default moored from [%]', NEW.vessel_id, NEW.status; + NEW.status := 'moored'; + END IF; + IF previous_metric.status IS NULL THEN + IF NEW.status = 'anchored' THEN + RAISE WARNING 'Metrics Unknown previous_metric.status from vessel_id [%], [%] set to default current status [%]', NEW.vessel_id, previous_metric.status, NEW.status; + previous_metric.status := NEW.status; + ELSE + RAISE WARNING 'Metrics Unknown previous_metric.status from vessel_id [%], [%] set to default status moored vs [%]', NEW.vessel_id, previous_metric.status, NEW.status; + previous_metric.status := 'moored'; + END IF; + -- Add new stay as no previous entry exist + INSERT INTO api.stays + (vessel_id, active, arrived, latitude, longitude, stay_code) + VALUES (current_setting('vessel.id', true), true, NEW.time, NEW.latitude, NEW.longitude, 1) + RETURNING id INTO stay_id; + -- Add stay entry to process queue for further processing + --INSERT INTO process_queue (channel, payload, stored, ref_id) + -- VALUES ('new_stay', stay_id, now(), current_setting('vessel.id', true)); + --RAISE WARNING 'Metrics Insert first stay as no previous metrics exist, stay_id stay_id [%] [%] [%]', stay_id, NEW.status, NEW.time; + END IF; + -- Check if status is valid enum + SELECT NEW.status::name = any(enum_range(null::status_type)::name[]) INTO valid_status; + IF valid_status IS False THEN + -- Ignore entry if status is invalid + --RAISE WARNING 'Metrics Ignoring metric, vessel_id [%], invalid status [%]', NEW.vessel_id, NEW.status; + RETURN NULL; + END IF; + -- Check if speedOverGround is valid value + IF NEW.speedoverground >= 40 THEN + -- Ignore entry as speedOverGround is invalid + RAISE WARNING 'Metrics Ignoring metric, vessel_id [%], speedOverGround is invalid, over 40 < [%]', NEW.vessel_id, NEW.speedoverground; + RETURN NULL; + END IF; + + -- Check the state and if any previous/current entry + -- If change of state and new status is sailing or motoring + IF previous_metric.status::TEXT <> NEW.status::TEXT AND + ( (NEW.status::TEXT = 'sailing' AND previous_metric.status::TEXT <> 'motoring') + OR (NEW.status::TEXT = 'motoring' AND previous_metric.status::TEXT <> 'sailing') ) THEN + RAISE WARNING 'Metrics Update status, try new logbook, New:[%] Previous:[%]', NEW.status, previous_metric.status; + -- Start new log + logbook_id := public.trip_in_progress_fn(current_setting('vessel.id', true)::TEXT); + IF logbook_id IS NULL THEN + INSERT INTO api.logbook + (vessel_id, active, _from_time, _from_lat, _from_lng) + VALUES (current_setting('vessel.id', true), true, NEW.time, NEW.latitude, NEW.longitude) + RETURNING id INTO logbook_id; + RAISE WARNING 'Metrics Insert new logbook, logbook_id [%] [%] [%]', logbook_id, NEW.status, NEW.time; + ELSE + UPDATE api.logbook + SET + active = false, + _to_time = NEW.time, + _to_lat = NEW.latitude, + _to_lng = NEW.longitude + WHERE id = logbook_id; + RAISE WARNING 'Metrics Existing logbook logbook_id [%] [%] [%]', logbook_id, NEW.status, NEW.time; + END IF; + + -- End current stay + stay_id := public.stay_in_progress_fn(current_setting('vessel.id', true)::TEXT); + IF stay_id IS NOT NULL THEN + UPDATE api.stays + SET + active = false, + departed = NEW.time + WHERE id = stay_id; + -- Add stay entry to process queue for further processing + INSERT INTO process_queue (channel, payload, stored, ref_id) + VALUES ('new_stay', stay_id, NOW(), current_setting('vessel.id', true)); + RAISE WARNING 'Metrics Updating Stay end current stay_id [%] [%] [%]', stay_id, NEW.status, NEW.time; + ELSE + RAISE WARNING 'Metrics Invalid stay_id [%] [%]', stay_id, NEW.time; + END IF; + + -- If change of state and new status is moored or anchored + ELSIF previous_metric.status::TEXT <> NEW.status::TEXT AND + ( (NEW.status::TEXT = 'moored' AND previous_metric.status::TEXT <> 'anchored') + OR (NEW.status::TEXT = 'anchored' AND previous_metric.status::TEXT <> 'moored') ) THEN + -- Start new stays + RAISE WARNING 'Metrics Update status, try new stay, New:[%] Previous:[%]', NEW.status, previous_metric.status; + stay_id := public.stay_in_progress_fn(current_setting('vessel.id', true)::TEXT); + IF stay_id IS NULL THEN + RAISE WARNING 'Metrics Inserting new stay [%]', NEW.status; + -- If metric status is anchored set stay_code accordingly + stay_code = 1; + IF NEW.status = 'anchored' THEN + stay_code = 2; + END IF; + -- Add new stay + INSERT INTO api.stays + (vessel_id, active, arrived, latitude, longitude, stay_code) + VALUES (current_setting('vessel.id', true), true, NEW.time, NEW.latitude, NEW.longitude, stay_code) + RETURNING id INTO stay_id; + RAISE WARNING 'Metrics Insert new stay, stay_id [%] [%] [%]', stay_id, NEW.status, NEW.time; + ELSE + RAISE WARNING 'Metrics Invalid stay_id [%] [%]', stay_id, NEW.time; + UPDATE api.stays + SET + active = false, + departed = NEW.time, + notes = 'Invalid stay?' + WHERE id = stay_id; + END IF; + + -- End current log/trip + -- Fetch logbook_id by vessel_id + logbook_id := public.trip_in_progress_fn(current_setting('vessel.id', true)::TEXT); + IF logbook_id IS NOT NULL THEN + -- todo check on time start vs end + RAISE WARNING 'Metrics Updating logbook status [%] [%] [%]', logbook_id, NEW.status, NEW.time; + UPDATE api.logbook + SET + active = false, + _to_time = NEW.time, + _to_lat = NEW.latitude, + _to_lng = NEW.longitude + WHERE id = logbook_id; + -- Add logbook entry to process queue for later processing + INSERT INTO process_queue (channel, payload, stored, ref_id) + VALUES ('pre_logbook', logbook_id, NOW(), current_setting('vessel.id', true)); + ELSE + RAISE WARNING 'Metrics Invalid logbook_id [%] [%] [%]', logbook_id, NEW.status, NEW.time; + END IF; + END IF; + RETURN NEW; -- Finally insert the actual new metric + END; +$function$ +; +-- Description +COMMENT ON FUNCTION + public.metrics_trigger_fn() IS 'process metrics from vessel, generate pre_logbook and new_stay.'; + +-- Update alert message. add the date and format the numeric value +DROP FUNCTION IF EXISTS public.cron_alerts_fn(); +CREATE OR REPLACE FUNCTION public.cron_alerts_fn() RETURNS void AS $cron_alerts$ +DECLARE + alert_rec record; + default_last_metric TIMESTAMPTZ := NOW() - interval '1 day'; + last_metric TIMESTAMPTZ; + metric_rec record; + app_settings JSONB; + user_settings JSONB; + alerting JSONB; + _alarms JSONB; + alarms TEXT; + alert_default JSONB := '{ + "low_pressure_threshold": 990, + "high_wind_speed_threshold": 30, + "low_water_depth_threshold": 1, + "min_notification_interval": 6, + "high_pressure_drop_threshold": 12, + "low_battery_charge_threshold": 90, + "low_battery_voltage_threshold": 12.5, + "low_water_temperature_threshold": 10, + "low_indoor_temperature_threshold": 7, + "low_outdoor_temperature_threshold": 3 + }'; +BEGIN + -- Check for new event notification pending update + RAISE NOTICE 'cron_alerts_fn'; + FOR alert_rec in + SELECT + a.user_id,a.email,v.vessel_id, + COALESCE((a.preferences->'alert_last_metric')::TEXT, default_last_metric::TEXT) as last_metric, + (alert_default || (a.preferences->'alerting')::JSONB) as alerting, + (a.preferences->'alarms')::JSONB as alarms + FROM auth.accounts a + LEFT JOIN auth.vessels AS v ON v.owner_email = a.email + LEFT JOIN api.metadata AS m ON m.vessel_id = v.vessel_id + WHERE (a.preferences->'alerting'->'enabled')::boolean = True + AND m.active = True + LOOP + RAISE NOTICE '-> cron_alerts_fn for [%]', alert_rec; + PERFORM set_config('vessel.id', alert_rec.vessel_id, false); + PERFORM set_config('user.email', alert_rec.email, false); + --RAISE WARNING 'public.cron_process_alert_rec_fn() scheduler vessel.id %, user.id', current_setting('vessel.id', false), current_setting('user.id', false); + -- Gather user settings + user_settings := get_user_settings_from_vesselid_fn(alert_rec.vessel_id::TEXT); + RAISE NOTICE '-> cron_alerts_fn checking user_settings [%]', user_settings; + -- Get all metrics from the last last_metric avg by 5 minutes + FOR metric_rec in + SELECT time_bucket('5 minutes', m.time) AS time_bucket, + avg((m.metrics->'environment.inside.temperature')::numeric) AS intemp, + avg((m.metrics->'environment.outside.temperature')::numeric) AS outtemp, + avg((m.metrics->'environment.water.temperature')::numeric) AS wattemp, + avg((m.metrics->'environment.depth.belowTransducer')::numeric) AS watdepth, + avg((m.metrics->'environment.outside.pressure')::numeric) AS pressure, + avg((m.metrics->'environment.wind.speedTrue')::numeric) AS wind, + avg((m.metrics->'electrical.batteries.House.voltage')::numeric) AS voltage, + avg(coalesce((m.metrics->>'electrical.batteries.House.capacity.stateOfCharge')::numeric, 1)) AS charge + FROM api.metrics m + WHERE vessel_id = alert_rec.vessel_id + AND m.time >= alert_rec.last_metric::TIMESTAMPTZ + GROUP BY time_bucket + ORDER BY time_bucket ASC LIMIT 100 + LOOP + RAISE NOTICE '-> cron_alerts_fn checking metrics [%]', metric_rec; + RAISE NOTICE '-> cron_alerts_fn checking alerting [%]', alert_rec.alerting; + --RAISE NOTICE '-> cron_alerts_fn checking debug [%] [%]', kelvinToCel(metric_rec.intemp), (alert_rec.alerting->'low_indoor_temperature_threshold'); + IF kelvinToCel(metric_rec.intemp) < (alert_rec.alerting->'low_indoor_temperature_threshold')::numeric then + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', (alert_rec.alarms->'low_indoor_temperature_threshold'->>'date')::TIMESTAMPTZ; + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', metric_rec.time_bucket::TIMESTAMPTZ; + -- Get latest alarms + SELECT preferences->'alarms' INTO _alarms FROM auth.accounts a WHERE a.email = current_setting('user.email', false); + -- Is alarm in the min_notification_interval time frame + IF ( + ((_alarms->'low_indoor_temperature_threshold'->>'date') IS NULL) OR + (((_alarms->'low_indoor_temperature_threshold'->>'date')::TIMESTAMPTZ + + ((interval '1 hour') * (alert_rec.alerting->>'min_notification_interval')::NUMERIC)) + < metric_rec.time_bucket::TIMESTAMPTZ) + ) THEN + -- Add alarm + alarms := '{"low_indoor_temperature_threshold": {"value": '|| kelvinToCel(metric_rec.intemp) ||', "date":"' || metric_rec.time_bucket || '"}}'; + -- Merge alarms + SELECT public.jsonb_recursive_merge(_alarms::jsonb, alarms::jsonb) into _alarms; + -- Update alarms for user + PERFORM api.update_user_preferences_fn('{alarms}'::TEXT, _alarms::TEXT); + -- Gather user settings + user_settings := get_user_settings_from_vesselid_fn(current_setting('vessel.id', false)); + SELECT user_settings::JSONB || ('{"alert": "low_outdoor_temperature_threshold value:'|| kelvinToCel(metric_rec.intemp) ||' date:'|| metric_rec.time_bucket ||' "}'::text)::JSONB into user_settings; + -- Send notification + PERFORM send_notification_fn('alert'::TEXT, user_settings::JSONB); + -- DEBUG + RAISE NOTICE '-> cron_alerts_fn checking debug low_indoor_temperature_threshold +interval'; + END IF; + RAISE NOTICE '-> cron_alerts_fn checking debug low_indoor_temperature_threshold'; + END IF; + IF kelvinToCel(metric_rec.outtemp) < (alert_rec.alerting->'low_outdoor_temperature_threshold')::numeric then + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', (alert_rec.alarms->'low_outdoor_temperature_threshold'->>'date')::TIMESTAMPTZ; + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', metric_rec.time_bucket::TIMESTAMPTZ; + -- Get latest alarms + SELECT preferences->'alarms' INTO _alarms FROM auth.accounts a WHERE a.email = current_setting('user.email', false); + -- Is alarm in the min_notification_interval time frame + IF ( + ((_alarms->'low_outdoor_temperature_threshold'->>'date') IS NULL) OR + (((_alarms->'low_outdoor_temperature_threshold'->>'date')::TIMESTAMPTZ + + ((interval '1 hour') * (alert_rec.alerting->>'min_notification_interval')::NUMERIC)) + < metric_rec.time_bucket::TIMESTAMPTZ) + ) THEN + -- Add alarm + alarms := '{"low_outdoor_temperature_threshold": {"value": '|| kelvinToCel(metric_rec.outtemp) ||', "date":"' || metric_rec.time_bucket || '"}}'; + -- Merge alarms + SELECT public.jsonb_recursive_merge(_alarms::jsonb, alarms::jsonb) into _alarms; + -- Update alarms for user + PERFORM api.update_user_preferences_fn('{alarms}'::TEXT, _alarms::TEXT); + -- Gather user settings + user_settings := get_user_settings_from_vesselid_fn(current_setting('vessel.id', false)); + SELECT user_settings::JSONB || ('{"alert": "low_outdoor_temperature_threshold value:'|| kelvinToCel(metric_rec.outtemp) ||' date:'|| metric_rec.time_bucket ||' "}'::text)::JSONB into user_settings; + -- Send notification + PERFORM send_notification_fn('alert'::TEXT, user_settings::JSONB); + -- DEBUG + RAISE NOTICE '-> cron_alerts_fn checking debug low_outdoor_temperature_threshold +interval'; + END IF; + RAISE NOTICE '-> cron_alerts_fn checking debug low_outdoor_temperature_threshold'; + END IF; + IF kelvinToCel(metric_rec.wattemp) < (alert_rec.alerting->'low_water_temperature_threshold')::numeric then + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', (alert_rec.alarms->'low_water_temperature_threshold'->>'date')::TIMESTAMPTZ; + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', metric_rec.time_bucket::TIMESTAMPTZ; + -- Get latest alarms + SELECT preferences->'alarms' INTO _alarms FROM auth.accounts a WHERE a.email = current_setting('user.email', false); + -- Is alarm in the min_notification_interval time frame + IF ( + ((_alarms->'low_water_temperature_threshold'->>'date') IS NULL) OR + (((_alarms->'low_water_temperature_threshold'->>'date')::TIMESTAMPTZ + + ((interval '1 hour') * (alert_rec.alerting->>'min_notification_interval')::NUMERIC)) + < metric_rec.time_bucket::TIMESTAMPTZ) + ) THEN + -- Add alarm + alarms := '{"low_water_temperature_threshold": {"value": '|| kelvinToCel(metric_rec.wattemp) ||', "date":"' || metric_rec.time_bucket || '"}}'; + -- Merge alarms + SELECT public.jsonb_recursive_merge(_alarms::jsonb, alarms::jsonb) into _alarms; + -- Update alarms for user + PERFORM api.update_user_preferences_fn('{alarms}'::TEXT, _alarms::TEXT); + -- Gather user settings + user_settings := get_user_settings_from_vesselid_fn(current_setting('vessel.id', false)); + SELECT user_settings::JSONB || ('{"alert": "low_water_temperature_threshold value:'|| kelvinToCel(metric_rec.wattemp) ||' date:'|| metric_rec.time_bucket ||' "}'::text)::JSONB into user_settings; + -- Send notification + PERFORM send_notification_fn('alert'::TEXT, user_settings::JSONB); + -- DEBUG + RAISE NOTICE '-> cron_alerts_fn checking debug low_water_temperature_threshold +interval'; + END IF; + RAISE NOTICE '-> cron_alerts_fn checking debug low_water_temperature_threshold'; + END IF; + IF metric_rec.watdepth < (alert_rec.alerting->'low_water_depth_threshold')::numeric then + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', (alert_rec.alarms->'low_water_depth_threshold'->>'date')::TIMESTAMPTZ; + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', metric_rec.time_bucket::TIMESTAMPTZ; + -- Get latest alarms + SELECT preferences->'alarms' INTO _alarms FROM auth.accounts a WHERE a.email = current_setting('user.email', false); + -- Is alarm in the min_notification_interval time frame + IF ( + ((_alarms->'low_water_depth_threshold'->>'date') IS NULL) OR + (((_alarms->'low_water_depth_threshold'->>'date')::TIMESTAMPTZ + + ((interval '1 hour') * (alert_rec.alerting->>'min_notification_interval')::NUMERIC)) + < metric_rec.time_bucket::TIMESTAMPTZ) + ) THEN + -- Add alarm + alarms := '{"low_water_depth_threshold": {"value": '|| metric_rec.watdepth ||', "date":"' || metric_rec.time_bucket || '"}}'; + -- Merge alarms + SELECT public.jsonb_recursive_merge(_alarms::jsonb, alarms::jsonb) into _alarms; + -- Update alarms for user + PERFORM api.update_user_preferences_fn('{alarms}'::TEXT, _alarms::TEXT); + -- Gather user settings + user_settings := get_user_settings_from_vesselid_fn(current_setting('vessel.id', false)); + SELECT user_settings::JSONB || ('{"alert": "low_water_depth_threshold value:'|| ROUND(metric_rec.watdepth,2) ||' date:'|| metric_rec.time_bucket ||' "}'::text)::JSONB into user_settings; + -- Send notification + PERFORM send_notification_fn('alert'::TEXT, user_settings::JSONB); + -- DEBUG + RAISE NOTICE '-> cron_alerts_fn checking debug low_water_depth_threshold +interval'; + END IF; + RAISE NOTICE '-> cron_alerts_fn checking debug low_water_depth_threshold'; + END IF; + if metric_rec.pressure < (alert_rec.alerting->'high_pressure_drop_threshold')::numeric then + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', (alert_rec.alarms->'high_pressure_drop_threshold'->>'date')::TIMESTAMPTZ; + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', metric_rec.time_bucket::TIMESTAMPTZ; + -- Get latest alarms + SELECT preferences->'alarms' INTO _alarms FROM auth.accounts a WHERE a.email = current_setting('user.email', false); + -- Is alarm in the min_notification_interval time frame + IF ( + ((_alarms->'high_pressure_drop_threshold'->>'date') IS NULL) OR + (((_alarms->'high_pressure_drop_threshold'->>'date')::TIMESTAMPTZ + + ((interval '1 hour') * (alert_rec.alerting->>'min_notification_interval')::NUMERIC)) + < metric_rec.time_bucket::TIMESTAMPTZ) + ) THEN + -- Add alarm + alarms := '{"high_pressure_drop_threshold": {"value": '|| metric_rec.pressure ||', "date":"' || metric_rec.time_bucket || '"}}'; + -- Merge alarms + SELECT public.jsonb_recursive_merge(_alarms::jsonb, alarms::jsonb) into _alarms; + -- Update alarms for user + PERFORM api.update_user_preferences_fn('{alarms}'::TEXT, _alarms::TEXT); + -- Gather user settings + user_settings := get_user_settings_from_vesselid_fn(current_setting('vessel.id', false)); + SELECT user_settings::JSONB || ('{"alert": "high_pressure_drop_threshold value:'|| ROUND(metric_rec.pressure,2) ||' date:'|| metric_rec.time_bucket ||' "}'::text)::JSONB into user_settings; + -- Send notification + PERFORM send_notification_fn('alert'::TEXT, user_settings::JSONB); + -- DEBUG + RAISE NOTICE '-> cron_alerts_fn checking debug high_pressure_drop_threshold +interval'; + END IF; + RAISE NOTICE '-> cron_alerts_fn checking debug high_pressure_drop_threshold'; + END IF; + IF metric_rec.wind > (alert_rec.alerting->'high_wind_speed_threshold')::numeric then + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', (alert_rec.alarms->'high_wind_speed_threshold'->>'date')::TIMESTAMPTZ; + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', metric_rec.time_bucket::TIMESTAMPTZ; + -- Get latest alarms + SELECT preferences->'alarms' INTO _alarms FROM auth.accounts a WHERE a.email = current_setting('user.email', false); + -- Is alarm in the min_notification_interval time frame + IF ( + ((_alarms->'high_wind_speed_threshold'->>'date') IS NULL) OR + (((_alarms->'high_wind_speed_threshold'->>'date')::TIMESTAMPTZ + + ((interval '1 hour') * (alert_rec.alerting->>'min_notification_interval')::NUMERIC)) + < metric_rec.time_bucket::TIMESTAMPTZ) + ) THEN + -- Add alarm + alarms := '{"high_wind_speed_threshold": {"value": '|| metric_rec.wind ||', "date":"' || metric_rec.time_bucket || '"}}'; + -- Merge alarms + SELECT public.jsonb_recursive_merge(_alarms::jsonb, alarms::jsonb) into _alarms; + -- Update alarms for user + PERFORM api.update_user_preferences_fn('{alarms}'::TEXT, _alarms::TEXT); + -- Gather user settings + user_settings := get_user_settings_from_vesselid_fn(current_setting('vessel.id', false)); + SELECT user_settings::JSONB || ('{"alert": "high_wind_speed_threshold value:'|| ROUND(metric_rec.wind,2) ||' date:'|| metric_rec.time_bucket ||' "}'::text)::JSONB into user_settings; + -- Send notification + PERFORM send_notification_fn('alert'::TEXT, user_settings::JSONB); + -- DEBUG + RAISE NOTICE '-> cron_alerts_fn checking debug high_wind_speed_threshold +interval'; + END IF; + RAISE NOTICE '-> cron_alerts_fn checking debug high_wind_speed_threshold'; + END IF; + if metric_rec.voltage < (alert_rec.alerting->'low_battery_voltage_threshold')::numeric then + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', (alert_rec.alarms->'low_battery_voltage_threshold'->>'date')::TIMESTAMPTZ; + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', metric_rec.time_bucket::TIMESTAMPTZ; + -- Get latest alarms + SELECT preferences->'alarms' INTO _alarms FROM auth.accounts a WHERE a.email = current_setting('user.email', false); + -- Is alarm in the min_notification_interval time frame + IF ( + ((_alarms->'low_battery_voltage_threshold'->>'date') IS NULL) OR + (((_alarms->'low_battery_voltage_threshold'->>'date')::TIMESTAMPTZ + + ((interval '1 hour') * (alert_rec.alerting->>'min_notification_interval')::NUMERIC)) + < metric_rec.time_bucket::TIMESTAMPTZ) + ) THEN + -- Add alarm + alarms := '{"low_battery_voltage_threshold": {"value": '|| metric_rec.voltage ||', "date":"' || metric_rec.time_bucket || '"}}'; + -- Merge alarms + SELECT public.jsonb_recursive_merge(_alarms::jsonb, alarms::jsonb) into _alarms; + -- Update alarms for user + PERFORM api.update_user_preferences_fn('{alarms}'::TEXT, _alarms::TEXT); + -- Gather user settings + user_settings := get_user_settings_from_vesselid_fn(current_setting('vessel.id', false)); + SELECT user_settings::JSONB || ('{"alert": "low_battery_voltage_threshold value:'|| ROUND(metric_rec.voltage,2) ||' date:'|| metric_rec.time_bucket ||' "}'::text)::JSONB into user_settings; + -- Send notification + PERFORM send_notification_fn('alert'::TEXT, user_settings::JSONB); + -- DEBUG + RAISE NOTICE '-> cron_alerts_fn checking debug low_battery_voltage_threshold +interval'; + END IF; + RAISE NOTICE '-> cron_alerts_fn checking debug low_battery_voltage_threshold'; + END IF; + if (metric_rec.charge*100) < (alert_rec.alerting->'low_battery_charge_threshold')::numeric then + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', (alert_rec.alarms->'low_battery_charge_threshold'->>'date')::TIMESTAMPTZ; + RAISE NOTICE '-> cron_alerts_fn checking debug [%]', metric_rec.time_bucket::TIMESTAMPTZ; + -- Get latest alarms + SELECT preferences->'alarms' INTO _alarms FROM auth.accounts a WHERE a.email = current_setting('user.email', false); + -- Is alarm in the min_notification_interval time frame + IF ( + ((_alarms->'low_battery_charge_threshold'->>'date') IS NULL) OR + (((_alarms->'low_battery_charge_threshold'->>'date')::TIMESTAMPTZ + + ((interval '1 hour') * (alert_rec.alerting->>'min_notification_interval')::NUMERIC)) + < metric_rec.time_bucket::TIMESTAMPTZ) + ) THEN + -- Add alarm + alarms := '{"low_battery_charge_threshold": {"value": '|| (metric_rec.charge*100) ||', "date":"' || metric_rec.time_bucket || '"}}'; + -- Merge alarms + SELECT public.jsonb_recursive_merge(_alarms::jsonb, alarms::jsonb) into _alarms; + -- Update alarms for user + PERFORM api.update_user_preferences_fn('{alarms}'::TEXT, _alarms::TEXT); + -- Gather user settings + user_settings := get_user_settings_from_vesselid_fn(current_setting('vessel.id', false)); + SELECT user_settings::JSONB || ('{"alert": "low_battery_charge_threshold value:'|| ROUND(metric_rec.charge*100,2) ||' date:'|| metric_rec.time_bucket ||' "}'::text)::JSONB into user_settings; + -- Send notification + PERFORM send_notification_fn('alert'::TEXT, user_settings::JSONB); + -- DEBUG + RAISE NOTICE '-> cron_alerts_fn checking debug low_battery_charge_threshold +interval'; + END IF; + RAISE NOTICE '-> cron_alerts_fn checking debug low_battery_charge_threshold'; + END IF; + -- Record last metrics time + SELECT metric_rec.time_bucket INTO last_metric; + END LOOP; + PERFORM api.update_user_preferences_fn('{alert_last_metric}'::TEXT, last_metric::TEXT); + END LOOP; +END; +$cron_alerts$ language plpgsql; +-- Description +COMMENT ON FUNCTION + public.cron_alerts_fn + IS 'init by pg_cron to check for alerts'; + +-- Remove qgis dependency +DROP FUNCTION IF EXISTS public.process_post_logbook_fn; +CREATE OR REPLACE FUNCTION public.process_post_logbook_fn(IN _id integer) RETURNS void AS $process_post_logbook_queue$ + DECLARE + logbook_rec record; + log_settings jsonb; + user_settings jsonb; + extra_json jsonb; + log_img_url text; + --logs_img_url text; + log_stats text; + --extent_bbox text; + BEGIN + -- If _id is not NULL + IF _id IS NULL OR _id < 1 THEN + RAISE WARNING '-> process_post_logbook_fn invalid input %', _id; + RETURN; + END IF; + -- Get the logbook record with all necessary fields exist + SELECT * INTO logbook_rec + FROM api.logbook + WHERE active IS false + AND id = _id + AND _from_lng IS NOT NULL + AND _from_lat IS NOT NULL + AND _to_lng IS NOT NULL + AND _to_lat IS NOT NULL; + -- Ensure the query is successful + IF logbook_rec.vessel_id IS NULL THEN + RAISE WARNING '-> process_post_logbook_fn invalid logbook %', _id; + RETURN; + END IF; + + PERFORM set_config('vessel.id', logbook_rec.vessel_id, false); + --RAISE WARNING 'public.process_post_logbook_fn() scheduler vessel.id %, user.id', current_setting('vessel.id', false), current_setting('user.id', false); + + -- Generate logbook image map name from QGIS + SELECT CONCAT('log_', logbook_rec.vessel_id::TEXT, '_', logbook_rec.id, '.png') INTO log_img_url; + --SELECT ST_Extent(ST_Transform(logbook_rec.track_geom, 3857))::TEXT AS envelope INTO extent_bbox FROM api.logbook WHERE id = logbook_rec.id; + --PERFORM public.qgis_getmap_py_fn(logbook_rec.vessel_id::TEXT, logbook_rec.id, extent_bbox::TEXT, False); + -- Generate logs image map name from QGIS + --WITH merged AS ( + -- SELECT ST_Union(logbook_rec.track_geom) AS merged_geometry + -- FROM api.logbook WHERE vessel_id = logbook_rec.vessel_id + --) + --SELECT ST_Extent(ST_Transform(merged_geometry, 3857))::TEXT AS envelope INTO extent_bbox FROM merged; + --SELECT CONCAT('logs_', logbook_rec.vessel_id::TEXT, '_', logbook_rec.id, '.png') INTO logs_img_url; + --PERFORM public.qgis_getmap_py_fn(logbook_rec.vessel_id::TEXT, logbook_rec.id, extent_bbox::TEXT, True); + + -- Add formatted distance and duration for email notification + SELECT CONCAT(ROUND(logbook_rec.distance, 2), ' NM / ', ROUND(EXTRACT(epoch FROM logbook_rec.duration)/3600,2), 'H\n') INTO log_stats; + + -- Prepare notification, gather user settings + SELECT json_build_object('logbook_name', logbook_rec.name, + 'logbook_link', logbook_rec.id, + 'logbook_img', log_img_url, + 'logbook_stats', log_stats) INTO log_settings; + user_settings := get_user_settings_from_vesselid_fn(logbook_rec.vessel_id::TEXT); + SELECT user_settings::JSONB || log_settings::JSONB into user_settings; + RAISE NOTICE '-> debug process_post_logbook_fn get_user_settings_from_vesselid_fn [%]', user_settings; + RAISE NOTICE '-> debug process_post_logbook_fn log_settings [%]', log_settings; + -- Send notification + PERFORM send_notification_fn('logbook'::TEXT, user_settings::JSONB); + -- Process badges + RAISE NOTICE '-> debug process_post_logbook_fn user_settings [%]', user_settings->>'email'::TEXT; + PERFORM set_config('user.email', user_settings->>'email'::TEXT, false); + PERFORM badges_logbook_fn(logbook_rec.id, logbook_rec._to_time::TEXT); + PERFORM badges_geom_fn(logbook_rec.id, logbook_rec._to_time::TEXT); + END; +$process_post_logbook_queue$ LANGUAGE plpgsql; +-- Description +COMMENT ON FUNCTION + public.process_post_logbook_fn + IS 'Notify user for new logbook.'; + +-- Add new qgis and video URLs settings +DROP FUNCTION IF EXISTS public.get_app_settings_fn; +CREATE OR REPLACE FUNCTION public.get_app_settings_fn(OUT app_settings jsonb) + RETURNS jsonb + LANGUAGE plpgsql +AS $function$ +DECLARE +BEGIN + SELECT + jsonb_object_agg(name, value) INTO app_settings + FROM + public.app_settings + WHERE + name LIKE 'app.email%' + OR name LIKE 'app.pushover%' + OR name LIKE 'app.url' + OR name LIKE 'app.telegram%' + OR name LIKE 'app.grafana_admin_uri' + OR name LIKE 'app.keycloak_uri' + OR name LIKE 'app.windy_apikey' + OR name LIKE 'app.%_uri' + OR name LIKE 'app.%_url'; +END; +$function$ +; +COMMENT ON FUNCTION public.get_app_settings_fn(out jsonb) IS 'get application settings details, email, pushover, telegram, grafana_admin_uri'; + +-- Add error handling for invalid argument +DROP FUNCTION IF EXISTS public.qgis_bbox_py_fn; +CREATE OR REPLACE FUNCTION public.qgis_bbox_py_fn(IN vessel_id TEXT DEFAULT NULL, IN log_id NUMERIC DEFAULT NULL, IN width NUMERIC DEFAULT 1080, IN height NUMERIC DEFAULT 566, IN scaleout BOOLEAN DEFAULT True, OUT bbox TEXT) +AS $qgis_bbox_py$ + log_extent = None + if not vessel_id and not log_id: + plpy.error('Error qgis_bbox_py invalid input vessel_id [{}], log_id [{}]'.format(vessel_id, log_id)) + # If we have a vessel_id then it is logs image map + if vessel_id: + # Use the shared cache to avoid preparing the log extent + if vessel_id in SD: + plan = SD[vessel_id] + # A prepared statement from Python + else: + plan = plpy.prepare("WITH merged AS ( SELECT ST_Union(track_geom) AS merged_geometry FROM api.logbook WHERE vessel_id = $1 ) SELECT ST_Extent(ST_Transform(merged_geometry, 3857))::TEXT FROM merged;", ["text"]) + SD[vessel_id] = plan + # Execute the statement with the log extent param and limit to 1 result + rv = plpy.execute(plan, [vessel_id], 1) + log_extent = rv[0]['st_extent'] + # Else we have a log_id then it is single log image map + else: + # Use the shared cache to avoid preparing the log extent + if log_id in SD: + plan = SD[log_id] + # A prepared statement from Python + else: + plan = plpy.prepare("SELECT ST_Extent(ST_Transform(track_geom, 3857)) FROM api.logbook WHERE id = $1::NUMERIC", ["text"]) + SD[log_id] = plan + # Execute the statement with the log extent param and limit to 1 result + rv = plpy.execute(plan, [log_id], 1) + log_extent = rv[0]['st_extent'] + + # Extract extent + def parse_extent_from_db(extent_raw): + # Parse the extent_raw to extract coordinates + extent = extent_raw.replace('BOX(', '').replace(')', '').split(',') + min_x, min_y = map(float, extent[0].split()) + max_x, max_y = map(float, extent[1].split()) + return min_x, min_y, max_x, max_y + + # ZoomOut from linestring extent + def apply_scale_factor(extent, scale_factor=1.125): + min_x, min_y, max_x, max_y = extent + center_x = (min_x + max_x) / 2 + center_y = (min_y + max_y) / 2 + width = max_x - min_x + height = max_y - min_y + new_width = width * scale_factor + new_height = height * scale_factor + scaled_extent = ( + round(center_x - new_width / 2), + round(center_y - new_height / 2), + round(center_x + new_width / 2), + round(center_y + new_height / 2), + ) + return scaled_extent + + def adjust_bbox_to_fixed_size(scaled_extent, fixed_width, fixed_height): + min_x, min_y, max_x, max_y = scaled_extent + bbox_width = float(max_x - min_x) + bbox_height = float(max_y - min_y) + bbox_aspect_ratio = float(bbox_width / bbox_height) + image_aspect_ratio = float(fixed_width / fixed_height) + + if bbox_aspect_ratio > image_aspect_ratio: + # Adjust height to match aspect ratio + new_bbox_height = bbox_width / image_aspect_ratio + height_diff = new_bbox_height - bbox_height + min_y -= height_diff / 2 + max_y += height_diff / 2 + else: + # Adjust width to match aspect ratio + new_bbox_width = bbox_height * image_aspect_ratio + width_diff = new_bbox_width - bbox_width + min_x -= width_diff / 2 + max_x += width_diff / 2 + + adjusted_extent = (min_x, min_y, max_x, max_y) + return adjusted_extent + + if (not vessel_id and not log_id) or not log_extent: + plpy.error('Failed to get sql qgis_bbox_py vessel_id [{}], log_id [{}], extent [{}]'.format(vessel_id, log_id, log_extent)) + #plpy.notice('qgis_bbox_py log_id [{}], extent [{}]'.format(log_id, log_extent)) + # Parse extent and apply ZoomOut scale factor + if scaleout: + scaled_extent = apply_scale_factor(parse_extent_from_db(log_extent)) + else: + scaled_extent = parse_extent_from_db(log_extent) + #plpy.notice('qgis_bbox_py log_id [{}], scaled_extent [{}]'.format(log_id, scaled_extent)) + fixed_width = width # default 1080 + fixed_height = height # default 566 + adjusted_extent = adjust_bbox_to_fixed_size(scaled_extent, fixed_width, fixed_height) + #plpy.notice('qgis_bbox_py log_id [{}], adjusted_extent [{}]'.format(log_id, adjusted_extent)) + min_x, min_y, max_x, max_y = adjusted_extent + return f"{min_x},{min_y},{max_x},{max_y}" +$qgis_bbox_py$ LANGUAGE plpython3u; +-- Description +COMMENT ON FUNCTION + public.qgis_bbox_py_fn + IS 'Generate the BBOX base on log extent and adapt extent to the image size for QGIS Server'; + +-- Add error handling for invalid argument +DROP FUNCTION IF EXISTS public.qgis_bbox_trip_py_fn; +CREATE OR REPLACE FUNCTION public.qgis_bbox_trip_py_fn(IN _str_to_parse TEXT DEFAULT NULL, OUT bbox TEXT) +AS $qgis_bbox_trip_py$ + #plpy.notice('qgis_bbox_trip_py_fn _str_to_parse [{}]'.format(_str_to_parse)) + if not _str_to_parse or '_' not in _str_to_parse: + plpy.error('Error qgis_bbox_py invalid input _str_to_parse [{}]'.format(_str_to_parse)) + vessel_id, log_id, log_end = _str_to_parse.split('_') + width = 1080 + height = 566 + scaleout = True + log_extent = None + # If we have a vessel_id then it is full logs image map + if vessel_id and log_end is None: + # Use the shared cache to avoid preparing the log extent + if vessel_id in SD: + plan = SD[vessel_id] + # A prepared statement from Python + else: + plan = plpy.prepare("WITH merged AS ( SELECT ST_Union(track_geom) AS merged_geometry FROM api.logbook WHERE vessel_id = $1 ) SELECT ST_Extent(ST_Transform(merged_geometry, 3857))::TEXT FROM merged;", ["text"]) + SD[vessel_id] = plan + # Execute the statement with the log extent param and limit to 1 result + rv = plpy.execute(plan, [vessel_id], 1) + log_extent = rv[0]['st_extent'] + # If we have a vessel_id and a log_end then it is subset logs image map + elif vessel_id and log_end: + # Use the shared cache to avoid preparing the log extent + shared_cache = vessel_id + str(log_id) + str(log_end) + if shared_cache in SD: + plan = SD[shared_cache] + # A prepared statement from Python + else: + plan = plpy.prepare("WITH merged AS ( SELECT ST_Union(track_geom) AS merged_geometry FROM api.logbook WHERE vessel_id = $1 and id >= $2::NUMERIC and id <= $3::NUMERIC) SELECT ST_Extent(ST_Transform(merged_geometry, 3857))::TEXT FROM merged;", ["text","text","text"]) + SD[shared_cache] = plan + # Execute the statement with the log extent param and limit to 1 result + rv = plpy.execute(plan, [vessel_id,log_id,log_end], 1) + log_extent = rv[0]['st_extent'] + # Else we have a log_id then it is single log image map + else : + # Use the shared cache to avoid preparing the log extent + if log_id in SD: + plan = SD[log_id] + # A prepared statement from Python + else: + plan = plpy.prepare("SELECT ST_Extent(ST_Transform(track_geom, 3857)) FROM api.logbook WHERE id = $1::NUMERIC", ["text"]) + SD[log_id] = plan + # Execute the statement with the log extent param and limit to 1 result + rv = plpy.execute(plan, [log_id], 1) + log_extent = rv[0]['st_extent'] + + # Extract extent + def parse_extent_from_db(extent_raw): + # Parse the extent_raw to extract coordinates + extent = extent_raw.replace('BOX(', '').replace(')', '').split(',') + min_x, min_y = map(float, extent[0].split()) + max_x, max_y = map(float, extent[1].split()) + return min_x, min_y, max_x, max_y + + # ZoomOut from linestring extent + def apply_scale_factor(extent, scale_factor=1.125): + min_x, min_y, max_x, max_y = extent + center_x = (min_x + max_x) / 2 + center_y = (min_y + max_y) / 2 + width = max_x - min_x + height = max_y - min_y + new_width = width * scale_factor + new_height = height * scale_factor + scaled_extent = ( + round(center_x - new_width / 2), + round(center_y - new_height / 2), + round(center_x + new_width / 2), + round(center_y + new_height / 2), + ) + return scaled_extent + + def adjust_bbox_to_fixed_size(scaled_extent, fixed_width, fixed_height): + min_x, min_y, max_x, max_y = scaled_extent + bbox_width = float(max_x - min_x) + bbox_height = float(max_y - min_y) + bbox_aspect_ratio = float(bbox_width / bbox_height) + image_aspect_ratio = float(fixed_width / fixed_height) + + if bbox_aspect_ratio > image_aspect_ratio: + # Adjust height to match aspect ratio + new_bbox_height = bbox_width / image_aspect_ratio + height_diff = new_bbox_height - bbox_height + min_y -= height_diff / 2 + max_y += height_diff / 2 + else: + # Adjust width to match aspect ratio + new_bbox_width = bbox_height * image_aspect_ratio + width_diff = new_bbox_width - bbox_width + min_x -= width_diff / 2 + max_x += width_diff / 2 + + adjusted_extent = (min_x, min_y, max_x, max_y) + return adjusted_extent + + if not log_extent: + plpy.error('Failed to get sql qgis_bbox_trip_py_fn vessel_id [{}], log_id [{}], extent [{}]'.format(vessel_id, log_id, log_extent)) + #plpy.notice('qgis_bbox_trip_py_fn log_id [{}], extent [{}]'.format(log_id, log_extent)) + # Parse extent and apply ZoomOut scale factor + if scaleout: + scaled_extent = apply_scale_factor(parse_extent_from_db(log_extent)) + else: + scaled_extent = parse_extent_from_db(log_extent) + #plpy.notice('qgis_bbox_trip_py_fn log_id [{}], scaled_extent [{}]'.format(log_id, scaled_extent)) + fixed_width = width # default 1080 + fixed_height = height # default 566 + adjusted_extent = adjust_bbox_to_fixed_size(scaled_extent, fixed_width, fixed_height) + #plpy.notice('qgis_bbox_trip_py_fn log_id [{}], adjusted_extent [{}]'.format(log_id, adjusted_extent)) + min_x, min_y, max_x, max_y = adjusted_extent + return f"{min_x},{min_y},{max_x},{max_y}" +$qgis_bbox_trip_py$ LANGUAGE plpython3u; +-- Description +COMMENT ON FUNCTION + public.qgis_bbox_trip_py_fn + IS 'Generate the BBOX base on trip extent and adapt extent to the image size for QGIS Server'; + +-- Ensure name without double quote, Fix issue with when string contains quote +DROP FUNCTION public.process_lat_lon_fn(in numeric, in numeric, out int4, out int4, out text, out text); +-- DROP FUNCTION public.process_lat_lon_fn(in numeric, in numeric, out int4, out int4, out text, out text); + +CREATE OR REPLACE FUNCTION public.process_lat_lon_fn(lon numeric, lat numeric, OUT moorage_id integer, OUT moorage_type integer, OUT moorage_name text, OUT moorage_country text) + RETURNS record + LANGUAGE plpgsql +AS $function$ + DECLARE + stay_rec record; + --moorage_id INTEGER := NULL; + --moorage_type INTEGER := 1; -- Unknown + --moorage_name TEXT := NULL; + --moorage_country TEXT := NULL; + existing_rec record; + geo jsonb; + overpass jsonb; + BEGIN + RAISE NOTICE '-> process_lat_lon_fn'; + IF lon IS NULL OR lat IS NULL THEN + RAISE WARNING '-> process_lat_lon_fn invalid input lon %, lat %', lon, lat; + --return NULL; + END IF; + + -- Do we have an existing moorages within 300m of the new stay + FOR existing_rec in + SELECT + * + FROM api.moorages m + WHERE + m.latitude IS NOT NULL + AND m.longitude IS NOT NULL + AND m.geog IS NOT NULL + AND ST_DWithin( + Geography(ST_MakePoint(m.longitude, m.latitude)), + Geography(ST_MakePoint(lon, lat)), + 300 -- in meters + ) + AND m.vessel_id = current_setting('vessel.id', false) + ORDER BY id ASC + LOOP + -- found previous stay within 300m of the new moorage + IF existing_rec.id IS NOT NULL AND existing_rec.id > 0 THEN + RAISE NOTICE '-> process_lat_lon_fn found previous moorages within 300m %', existing_rec; + EXIT; -- exit loop + END IF; + END LOOP; + + -- if with in 300m use existing name and stay_code + -- else insert new entry + IF existing_rec.id IS NOT NULL AND existing_rec.id > 0 THEN + RAISE NOTICE '-> process_lat_lon_fn found close by moorage using existing name and stay_code %', existing_rec; + moorage_id := existing_rec.id; + moorage_name := existing_rec.name; + moorage_type := existing_rec.stay_code; + ELSE + RAISE NOTICE '-> process_lat_lon_fn create new moorage'; + -- query overpass api to guess moorage type + overpass := overpass_py_fn(lon::NUMERIC, lat::NUMERIC); + RAISE NOTICE '-> process_lat_lon_fn overpass name:[%] seamark:type:[%]', overpass->'name', overpass->'seamark:type'; + moorage_type = 1; -- Unknown + IF overpass->>'seamark:type' = 'harbour' AND overpass->>'seamark:harbour:category' = 'marina' then + moorage_type = 4; -- Dock + ELSIF overpass->>'seamark:type' = 'mooring' AND overpass->>'seamark:mooring:category' = 'buoy' then + moorage_type = 3; -- Mooring Buoy + ELSIF overpass->>'seamark:type' ~ '(anchorage|anchor_berth|berth)' OR overpass->>'natural' ~ '(bay|beach)' then + moorage_type = 2; -- Anchor + ELSIF overpass->>'seamark:type' = 'mooring' then + moorage_type = 3; -- Mooring Buoy + ELSIF overpass->>'leisure' = 'marina' then + moorage_type = 4; -- Dock + END IF; + -- geo reverse _lng _lat + geo := reverse_geocode_py_fn('nominatim', lon::NUMERIC, lat::NUMERIC); + moorage_country := geo->>'country_code'; + IF overpass->>'name:en' IS NOT NULL then + moorage_name = overpass->>'name:en'; + ELSIF overpass->>'name' IS NOT NULL then + moorage_name = overpass->>'name'; + ELSE + moorage_name := geo->>'name'; + END IF; + RAISE NOTICE '-> process_lat_lon_fn output name:[%] type:[%]', moorage_name, moorage_type; + RAISE NOTICE '-> process_lat_lon_fn insert new moorage for [%] name:[%] type:[%]', current_setting('vessel.id', false), moorage_name, moorage_type; + -- Insert new moorage from stay + INSERT INTO api.moorages + (vessel_id, name, country, stay_code, reference_count, latitude, longitude, geog, overpass, nominatim) + VALUES ( + current_setting('vessel.id', false), + coalesce(replace(moorage_name,'"', ''), null), + coalesce(moorage_country, null), + moorage_type, + 1, + lat, + lon, + Geography(ST_MakePoint(lon, lat)), + coalesce(overpass, null), + coalesce(geo, null) + ) returning id into moorage_id; + -- Add moorage entry to process queue for reference + --INSERT INTO process_queue (channel, payload, stored, ref_id, processed) + -- VALUES ('new_moorage', moorage_id, now(), current_setting('vessel.id', true), now()); + END IF; + --return json_build_object( + -- 'id', moorage_id, + -- 'name', moorage_name, + -- 'type', moorage_type + -- )::jsonb; + END; +$function$ +; + +COMMENT ON FUNCTION public.process_lat_lon_fn(in numeric, in numeric, out int4, out int4, out text, out text) IS 'Add or Update moorage base on lat/lon'; + +-- Refresh permissions for qgis_role +GRANT EXECUTE ON FUNCTION public.qgis_bbox_py_fn(in text, in numeric, in numeric, in numeric, in bool, out text) TO qgis_role; +GRANT EXECUTE ON FUNCTION public.qgis_bbox_trip_py_fn(in text, out text) TO qgis_role; + +-- Refresh user_role permissions +GRANT SELECT ON ALL TABLES IN SCHEMA api TO user_role; +GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA api TO user_role; +GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO user_role; + +-- Update version +UPDATE public.app_settings + SET value='0.7.6' + WHERE "name"='app.version'; + +\c postgres