From 5a1451ff691849a6f2e9db2609f168393df3f175 Mon Sep 17 00:00:00 2001 From: xbgmsharp Date: Mon, 13 Feb 2023 23:56:39 +0100 Subject: [PATCH] Improve process_logbook_queue_fn. Detect and remove stationary movement. Add logbook_metrics_dwithin_fn function. --- initdb/02_3_2_signalk_public_functions.sql | 159 ++++++++++++++++----- 1 file changed, 125 insertions(+), 34 deletions(-) diff --git a/initdb/02_3_2_signalk_public_functions.sql b/initdb/02_3_2_signalk_public_functions.sql index 57dd36a..9d4cf90 100644 --- a/initdb/02_3_2_signalk_public_functions.sql +++ b/initdb/02_3_2_signalk_public_functions.sql @@ -15,6 +15,33 @@ CREATE SCHEMA IF NOT EXISTS public; -- process single cron event, process_[logbook|stay|moorage|badge]_queue_fn() -- +CREATE OR REPLACE FUNCTION logbook_metrics_dwithin_fn( + IN _start text, + IN _end text, + IN lgn float, + IN lat float, + OUT count_metric numeric) AS $logbook_metrics_dwithin$ + BEGIN + SELECT count(*) INTO count_metric + FROM api.metrics m + WHERE + m.latitude IS NOT NULL + AND m.longitude IS NOT NULL + AND m.time >= _start::TIMESTAMP WITHOUT TIME ZONE + AND m.time <= _end::TIMESTAMP WITHOUT TIME ZONE + AND client_id = current_setting('vessel.client_id', false) + AND ST_DWithin( + Geography(ST_MakePoint(m.longitude, m.latitude)), + Geography(ST_MakePoint(lgn, lat)), + 10 + ); + END; +$logbook_metrics_dwithin$ LANGUAGE plpgsql; +-- Description +COMMENT ON FUNCTION + public.logbook_metrics_dwithin_fn + IS 'Check if all entries for a logbook are in stationary movement with 10 meters'; + -- Update a logbook with avg data -- TODO using timescale function CREATE OR REPLACE FUNCTION logbook_update_avg_fn( @@ -23,19 +50,20 @@ CREATE OR REPLACE FUNCTION logbook_update_avg_fn( IN _end TEXT, OUT avg_speed double precision, OUT max_speed double precision, - OUT max_wind_speed double precision + OUT max_wind_speed double precision, + OUT count_metric double precision ) AS $logbook_update_avg$ BEGIN - RAISE NOTICE '-> Updating avg for logbook id=%, start: "%", end: "%"', _id, _start, _end; - SELECT AVG(speedoverground), MAX(speedoverground), MAX(windspeedapparent) INTO - avg_speed, max_speed, max_wind_speed + RAISE NOTICE '-> Updating avg for logbook id=%, start:"%", end:"%"', _id, _start, _end; + SELECT AVG(speedoverground), MAX(speedoverground), MAX(windspeedapparent), COUNT(*) INTO + avg_speed, max_speed, max_wind_speed, count_metric FROM api.metrics m WHERE m.latitude IS NOT NULL AND m.longitude IS NOT NULL AND m.time >= _start::TIMESTAMP WITHOUT TIME ZONE AND m.time <= _end::TIMESTAMP WITHOUT TIME ZONE AND client_id = current_setting('vessel.client_id', false); - RAISE NOTICE '-> Updated avg for logbook id=%, avg_speed:%, max_speed:%, max_wind_speed:%', _id, avg_speed, max_speed, max_wind_speed; + RAISE NOTICE '-> Updated avg for logbook id=%, avg_speed:%, max_speed:%, max_wind_speed:%, count:%', _id, avg_speed, max_speed, max_wind_speed, count_metric; END; $logbook_update_avg$ LANGUAGE plpgsql; -- Description @@ -144,9 +172,15 @@ CREATE OR REPLACE FUNCTION process_logbook_queue_fn(IN _id integer) RETURNS void geo_rec record; log_settings jsonb; user_settings jsonb; - app_settings jsonb; - vessel_settings jsonb; geojson jsonb; + _invalid_time boolean; + _invalid_interval boolean; + _invalid_distance boolean; + count_metric numeric; + previous_stays_id numeric; + current_stays_departed text; + current_stays_id numeric; + current_stays_active boolean; BEGIN -- If _id is not NULL IF _id IS NULL OR _id < 1 THEN @@ -171,20 +205,79 @@ CREATE OR REPLACE FUNCTION process_logbook_queue_fn(IN _id integer) RETURNS void PERFORM set_config('vessel.client_id', logbook_rec.client_id, false); --RAISE WARNING 'public.process_logbook_queue_fn() scheduler vessel.client_id %', current_setting('vessel.client_id', false); + -- Check if all metrics are within 10meters base on geo loc + count_metric := logbook_metrics_dwithin_fn(logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT, logbook_rec._from_lng::NUMERIC, logbook_rec._from_lat::NUMERIC); + RAISE NOTICE '-> process_logbook_queue_fn logbook_metrics_dwithin_fn count:[%]', count_metric; + + -- Calculate logbook data average and geo + -- Update logbook entry with the latest metric data and calculate data + avg_rec := logbook_update_avg_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT); + geo_rec := logbook_update_geom_distance_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT); + + -- Avoid/ignore/delete logbook stationary movement or time sync issue + -- Check time start vs end + SELECT logbook_rec._to_time::timestamp without time zone < logbook_rec._from_time::timestamp without time zone INTO _invalid_time; + -- Is distance is less than 0.005 + SELECT geo_rec._track_distance < 0.005 INTO _invalid_distance; + -- Is duration is less than 100sec + SELECT (logbook_rec._to_time::timestamp without time zone - logbook_rec._from_time::timestamp without time zone) < (100::text||' secs')::interval INTO _invalid_interval; + -- if stationnary fix data metrics,logbook,stays,moorage + IF _invalid_time IS True OR _invalid_distance IS True + OR _invalid_distance IS True OR count_metric = avg_rec.count_metric THEN + RAISE WARNING '-> process_logbook_queue_fn invalid logbook data [%]', logbook_rec.id; + -- Update metrics status to moored + UPDATE api.metrics + SET status = 'moored' + WHERE time >= logbook_rec._from_time::TIMESTAMP WITHOUT TIME ZONE + AND time <= logbook_rec._to_time::TIMESTAMP WITHOUT TIME ZONE + AND client_id = current_setting('vessel.client_id', false); + -- Update logbook + UPDATE api.logbook + SET notes = 'invalid logbook data, stationary need to fix metrics?' + WHERE id = logbook_rec.id; + -- Get related stays + SELECT id,departed,active INTO current_stays_id,current_stays_departed,current_stays_active + FROM api.stays s + WHERE s.client_id = current_setting('vessel.client_id', false) + AND s.arrived = logbook_rec._to_time; + -- Update related stays + UPDATE api.stays + SET notes = 'invalid stays data, stationary need to fix metrics?' + WHERE client_id = current_setting('vessel.client_id', false) + AND arrived = logbook_rec._to_time; + -- Find previous stays + SELECT id INTO previous_stays_id + FROM api.stays s + WHERE s.client_id = current_setting('vessel.client_id', false) + AND s.arrived < logbook_rec._to_time + ORDER BY s.arrived DESC LIMIT 1; + -- Update previous stays with the departed time from current stays + -- and set the active state from current stays + UPDATE api.stays + SET departed = current_stays_departed::timestamp without time zone, + active = current_stays_active + WHERE client_id = current_setting('vessel.client_id', false) + AND id = previous_stays_id; + -- Clean u, remove invalid logbook and stay entry + DELETE FROM api.logbook WHERE id = logbook_rec.id; + RAISE WARNING '-> process_logbook_queue_fn delete invalid logbook [%]', logbook_rec.id; + DELETE FROM api.stays WHERE id = current_stays_id; + RAISE WARNING '-> process_logbook_queue_fn delete invalid stays [%]', current_stays_id; + -- TODO should we substract (-1) moorages ref count or reprocess it?!? + RETURN; + END IF; + + -- Generate logbook name, concat _from_location and _to_locacion -- geo reverse _from_lng _from_lat -- geo reverse _to_lng _to_lat from_name := reverse_geocode_py_fn('nominatim', logbook_rec._from_lng::NUMERIC, logbook_rec._from_lat::NUMERIC); to_name := reverse_geocode_py_fn('nominatim', logbook_rec._to_lng::NUMERIC, logbook_rec._to_lat::NUMERIC); SELECT CONCAT(from_name, ' to ' , to_name) INTO log_name; - -- SELECT CONCAT("_from" , ' to ' ,"_to") from api.logbook where id = 1; - -- Generate logbook name, concat _from_location and to _to_locacion - -- Update logbook entry with the latest metric data and calculate data - avg_rec := logbook_update_avg_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT); - geo_rec := logbook_update_geom_distance_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT); - --geojson := logbook_update_geojson_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT); - -- todo check on time start vs end - RAISE NOTICE 'Updating logbook entry [%] [%] [%]', logbook_rec.id, logbook_rec._from_time, logbook_rec._to_time; + -- GeoJSON + geojson := logbook_update_geojson_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT); + + RAISE NOTICE 'Updating valid logbook entry [%] [%] [%]', logbook_rec.id, logbook_rec._from_time, logbook_rec._to_time; UPDATE api.logbook SET duration = (logbook_rec._to_time::timestamp without time zone - logbook_rec._from_time::timestamp without time zone), @@ -195,30 +288,18 @@ CREATE OR REPLACE FUNCTION process_logbook_queue_fn(IN _id integer) RETURNS void _to = to_name, name = log_name, track_geom = geo_rec._track_geom, - distance = geo_rec._track_distance + distance = geo_rec._track_distance, + track_geojson = geojson WHERE id = logbook_rec.id; - -- GeoJSON - geojson := logbook_update_geojson_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT); - UPDATE api.logbook - SET - track_geojson = geojson - WHERE id = logbook_rec.id; - -- Gather email and pushover app settings - --app_settings := get_app_settings_fn(); - -- Gather user settings - SELECT json_build_object('logbook_name', log_name, 'logbook_link', logbook_rec.id) into log_settings; + -- Prepare notification, gather user settings + SELECT json_build_object('logbook_name', log_name, 'logbook_link', logbook_rec.id) into log_settings; user_settings := get_user_settings_from_clientid_fn(logbook_rec.client_id::TEXT); SELECT user_settings::JSONB || log_settings::JSONB into user_settings; RAISE DEBUG '-> debug process_logbook_queue_fn get_user_settings_from_clientid_fn [%]', user_settings; RAISE DEBUG '-> debug process_logbook_queue_fn log_settings [%]', log_settings; - --user_settings := get_user_settings_from_log_fn(logbook_rec::RECORD); - --user_settings := '{"logbook_name": "' || log_name || '"}, "{"email": "' || account_rec.email || '", "recipient": "' || account_rec.first || '}'; - --user_settings := '{"logbook_name": "' || log_name || '"}'; - -- Send notification email, pushover + -- Send notification PERFORM send_notification_fn('logbook'::TEXT, user_settings::JSONB); - --PERFORM send_email_py_fn('logbook'::TEXT, user_settings::JSONB, app_settings::JSONB); - --PERFORM send_pushover_py_fn('logbook'::TEXT, user_settings::JSONB, app_settings::JSONB); END; $process_logbook_queue$ LANGUAGE plpgsql; -- Description @@ -237,6 +318,7 @@ CREATE OR REPLACE FUNCTION process_stay_queue_fn(IN _id integer) RETURNS void AS -- If _id is valid, not NULL IF _id IS NULL OR _id < 1 THEN RAISE WARNING '-> process_stay_queue_fn invalid input %', _id; + RETURN; END IF; -- Get the stay record with all necesary fields exist SELECT * INTO stay_rec @@ -282,6 +364,7 @@ CREATE OR REPLACE FUNCTION process_moorage_queue_fn(IN _id integer) RETURNS void -- If _id is not NULL IF _id IS NULL OR _id < 1 THEN RAISE WARNING '-> process_moorage_queue_fn invalid input %', _id; + RETURN; END IF; -- Get the stay record with all necesary fields exist SELECT * INTO stay_rec @@ -292,7 +375,13 @@ CREATE OR REPLACE FUNCTION process_moorage_queue_fn(IN _id integer) RETURNS void AND longitude IS NOT NULL AND latitude IS NOT NULL AND id = _id; + -- Ensure the query is successful + IF stay_rec.client_id IS NULL THEN + RAISE WARNING '-> process_moorage_queue_fn invalid stay %', _id; + RETURN; + END IF; + -- Do we have an existing stay within 100m of the new moorage FOR moorage_rec in SELECT * @@ -300,7 +389,8 @@ CREATE OR REPLACE FUNCTION process_moorage_queue_fn(IN _id integer) RETURNS void WHERE latitude IS NOT NULL AND longitude IS NOT NULL - AND ST_DWithin( + AND geog IS NOT NULL + AND ST_DWithin( -- Geography(ST_MakePoint(stay_rec._lng, stay_rec._lat)), stay_rec.geog, -- Geography(ST_MakePoint(longitude, latitude)), @@ -327,7 +417,7 @@ CREATE OR REPLACE FUNCTION process_moorage_queue_fn(IN _id integer) RETURNS void moorage_rec.stay_duration + (stay_rec.departed::timestamp without time zone - stay_rec.arrived::timestamp without time zone) WHERE id = moorage_rec.id; - else + ELSE RAISE NOTICE 'Insert new moorage entry from stay %', stay_rec; -- Ensure the stay as a name if lat,lon IF stay_rec.name IS NULL AND stay_rec.longitude IS NOT NULL AND stay_rec.latitude IS NOT NULL THEN @@ -577,6 +667,7 @@ AS $send_notification$ telegram_settings JSONB := NULL; _email TEXT := NULL; BEGIN + -- TODO input check --RAISE NOTICE '--> send_notification_fn type [%]', email_type; -- Gather notification app settings, eg: email, pushover, telegram app_settings := get_app_settings_fn();