mirror of
https://github.com/xbgmsharp/postgsail.git
synced 2025-09-17 19:27:49 +00:00
Improve process_logbook_queue_fn. Detect and remove stationary movement.
Add logbook_metrics_dwithin_fn function.
This commit is contained in:
@@ -15,6 +15,33 @@ CREATE SCHEMA IF NOT EXISTS public;
|
||||
-- process single cron event, process_[logbook|stay|moorage|badge]_queue_fn()
|
||||
--
|
||||
|
||||
CREATE OR REPLACE FUNCTION logbook_metrics_dwithin_fn(
|
||||
IN _start text,
|
||||
IN _end text,
|
||||
IN lgn float,
|
||||
IN lat float,
|
||||
OUT count_metric numeric) AS $logbook_metrics_dwithin$
|
||||
BEGIN
|
||||
SELECT count(*) INTO count_metric
|
||||
FROM api.metrics m
|
||||
WHERE
|
||||
m.latitude IS NOT NULL
|
||||
AND m.longitude IS NOT NULL
|
||||
AND m.time >= _start::TIMESTAMP WITHOUT TIME ZONE
|
||||
AND m.time <= _end::TIMESTAMP WITHOUT TIME ZONE
|
||||
AND client_id = current_setting('vessel.client_id', false)
|
||||
AND ST_DWithin(
|
||||
Geography(ST_MakePoint(m.longitude, m.latitude)),
|
||||
Geography(ST_MakePoint(lgn, lat)),
|
||||
10
|
||||
);
|
||||
END;
|
||||
$logbook_metrics_dwithin$ LANGUAGE plpgsql;
|
||||
-- Description
|
||||
COMMENT ON FUNCTION
|
||||
public.logbook_metrics_dwithin_fn
|
||||
IS 'Check if all entries for a logbook are in stationary movement with 10 meters';
|
||||
|
||||
-- Update a logbook with avg data
|
||||
-- TODO using timescale function
|
||||
CREATE OR REPLACE FUNCTION logbook_update_avg_fn(
|
||||
@@ -23,19 +50,20 @@ CREATE OR REPLACE FUNCTION logbook_update_avg_fn(
|
||||
IN _end TEXT,
|
||||
OUT avg_speed double precision,
|
||||
OUT max_speed double precision,
|
||||
OUT max_wind_speed double precision
|
||||
OUT max_wind_speed double precision,
|
||||
OUT count_metric double precision
|
||||
) AS $logbook_update_avg$
|
||||
BEGIN
|
||||
RAISE NOTICE '-> Updating avg for logbook id=%, start: "%", end: "%"', _id, _start, _end;
|
||||
SELECT AVG(speedoverground), MAX(speedoverground), MAX(windspeedapparent) INTO
|
||||
avg_speed, max_speed, max_wind_speed
|
||||
RAISE NOTICE '-> Updating avg for logbook id=%, start:"%", end:"%"', _id, _start, _end;
|
||||
SELECT AVG(speedoverground), MAX(speedoverground), MAX(windspeedapparent), COUNT(*) INTO
|
||||
avg_speed, max_speed, max_wind_speed, count_metric
|
||||
FROM api.metrics m
|
||||
WHERE m.latitude IS NOT NULL
|
||||
AND m.longitude IS NOT NULL
|
||||
AND m.time >= _start::TIMESTAMP WITHOUT TIME ZONE
|
||||
AND m.time <= _end::TIMESTAMP WITHOUT TIME ZONE
|
||||
AND client_id = current_setting('vessel.client_id', false);
|
||||
RAISE NOTICE '-> Updated avg for logbook id=%, avg_speed:%, max_speed:%, max_wind_speed:%', _id, avg_speed, max_speed, max_wind_speed;
|
||||
RAISE NOTICE '-> Updated avg for logbook id=%, avg_speed:%, max_speed:%, max_wind_speed:%, count:%', _id, avg_speed, max_speed, max_wind_speed, count_metric;
|
||||
END;
|
||||
$logbook_update_avg$ LANGUAGE plpgsql;
|
||||
-- Description
|
||||
@@ -144,9 +172,15 @@ CREATE OR REPLACE FUNCTION process_logbook_queue_fn(IN _id integer) RETURNS void
|
||||
geo_rec record;
|
||||
log_settings jsonb;
|
||||
user_settings jsonb;
|
||||
app_settings jsonb;
|
||||
vessel_settings jsonb;
|
||||
geojson jsonb;
|
||||
_invalid_time boolean;
|
||||
_invalid_interval boolean;
|
||||
_invalid_distance boolean;
|
||||
count_metric numeric;
|
||||
previous_stays_id numeric;
|
||||
current_stays_departed text;
|
||||
current_stays_id numeric;
|
||||
current_stays_active boolean;
|
||||
BEGIN
|
||||
-- If _id is not NULL
|
||||
IF _id IS NULL OR _id < 1 THEN
|
||||
@@ -171,20 +205,79 @@ CREATE OR REPLACE FUNCTION process_logbook_queue_fn(IN _id integer) RETURNS void
|
||||
PERFORM set_config('vessel.client_id', logbook_rec.client_id, false);
|
||||
--RAISE WARNING 'public.process_logbook_queue_fn() scheduler vessel.client_id %', current_setting('vessel.client_id', false);
|
||||
|
||||
-- Check if all metrics are within 10meters base on geo loc
|
||||
count_metric := logbook_metrics_dwithin_fn(logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT, logbook_rec._from_lng::NUMERIC, logbook_rec._from_lat::NUMERIC);
|
||||
RAISE NOTICE '-> process_logbook_queue_fn logbook_metrics_dwithin_fn count:[%]', count_metric;
|
||||
|
||||
-- Calculate logbook data average and geo
|
||||
-- Update logbook entry with the latest metric data and calculate data
|
||||
avg_rec := logbook_update_avg_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT);
|
||||
geo_rec := logbook_update_geom_distance_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT);
|
||||
|
||||
-- Avoid/ignore/delete logbook stationary movement or time sync issue
|
||||
-- Check time start vs end
|
||||
SELECT logbook_rec._to_time::timestamp without time zone < logbook_rec._from_time::timestamp without time zone INTO _invalid_time;
|
||||
-- Is distance is less than 0.005
|
||||
SELECT geo_rec._track_distance < 0.005 INTO _invalid_distance;
|
||||
-- Is duration is less than 100sec
|
||||
SELECT (logbook_rec._to_time::timestamp without time zone - logbook_rec._from_time::timestamp without time zone) < (100::text||' secs')::interval INTO _invalid_interval;
|
||||
-- if stationnary fix data metrics,logbook,stays,moorage
|
||||
IF _invalid_time IS True OR _invalid_distance IS True
|
||||
OR _invalid_distance IS True OR count_metric = avg_rec.count_metric THEN
|
||||
RAISE WARNING '-> process_logbook_queue_fn invalid logbook data [%]', logbook_rec.id;
|
||||
-- Update metrics status to moored
|
||||
UPDATE api.metrics
|
||||
SET status = 'moored'
|
||||
WHERE time >= logbook_rec._from_time::TIMESTAMP WITHOUT TIME ZONE
|
||||
AND time <= logbook_rec._to_time::TIMESTAMP WITHOUT TIME ZONE
|
||||
AND client_id = current_setting('vessel.client_id', false);
|
||||
-- Update logbook
|
||||
UPDATE api.logbook
|
||||
SET notes = 'invalid logbook data, stationary need to fix metrics?'
|
||||
WHERE id = logbook_rec.id;
|
||||
-- Get related stays
|
||||
SELECT id,departed,active INTO current_stays_id,current_stays_departed,current_stays_active
|
||||
FROM api.stays s
|
||||
WHERE s.client_id = current_setting('vessel.client_id', false)
|
||||
AND s.arrived = logbook_rec._to_time;
|
||||
-- Update related stays
|
||||
UPDATE api.stays
|
||||
SET notes = 'invalid stays data, stationary need to fix metrics?'
|
||||
WHERE client_id = current_setting('vessel.client_id', false)
|
||||
AND arrived = logbook_rec._to_time;
|
||||
-- Find previous stays
|
||||
SELECT id INTO previous_stays_id
|
||||
FROM api.stays s
|
||||
WHERE s.client_id = current_setting('vessel.client_id', false)
|
||||
AND s.arrived < logbook_rec._to_time
|
||||
ORDER BY s.arrived DESC LIMIT 1;
|
||||
-- Update previous stays with the departed time from current stays
|
||||
-- and set the active state from current stays
|
||||
UPDATE api.stays
|
||||
SET departed = current_stays_departed::timestamp without time zone,
|
||||
active = current_stays_active
|
||||
WHERE client_id = current_setting('vessel.client_id', false)
|
||||
AND id = previous_stays_id;
|
||||
-- Clean u, remove invalid logbook and stay entry
|
||||
DELETE FROM api.logbook WHERE id = logbook_rec.id;
|
||||
RAISE WARNING '-> process_logbook_queue_fn delete invalid logbook [%]', logbook_rec.id;
|
||||
DELETE FROM api.stays WHERE id = current_stays_id;
|
||||
RAISE WARNING '-> process_logbook_queue_fn delete invalid stays [%]', current_stays_id;
|
||||
-- TODO should we substract (-1) moorages ref count or reprocess it?!?
|
||||
RETURN;
|
||||
END IF;
|
||||
|
||||
-- Generate logbook name, concat _from_location and _to_locacion
|
||||
-- geo reverse _from_lng _from_lat
|
||||
-- geo reverse _to_lng _to_lat
|
||||
from_name := reverse_geocode_py_fn('nominatim', logbook_rec._from_lng::NUMERIC, logbook_rec._from_lat::NUMERIC);
|
||||
to_name := reverse_geocode_py_fn('nominatim', logbook_rec._to_lng::NUMERIC, logbook_rec._to_lat::NUMERIC);
|
||||
SELECT CONCAT(from_name, ' to ' , to_name) INTO log_name;
|
||||
-- SELECT CONCAT("_from" , ' to ' ,"_to") from api.logbook where id = 1;
|
||||
|
||||
-- Generate logbook name, concat _from_location and to _to_locacion
|
||||
-- Update logbook entry with the latest metric data and calculate data
|
||||
avg_rec := logbook_update_avg_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT);
|
||||
geo_rec := logbook_update_geom_distance_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT);
|
||||
--geojson := logbook_update_geojson_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT);
|
||||
-- todo check on time start vs end
|
||||
RAISE NOTICE 'Updating logbook entry [%] [%] [%]', logbook_rec.id, logbook_rec._from_time, logbook_rec._to_time;
|
||||
-- GeoJSON
|
||||
geojson := logbook_update_geojson_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT);
|
||||
|
||||
RAISE NOTICE 'Updating valid logbook entry [%] [%] [%]', logbook_rec.id, logbook_rec._from_time, logbook_rec._to_time;
|
||||
UPDATE api.logbook
|
||||
SET
|
||||
duration = (logbook_rec._to_time::timestamp without time zone - logbook_rec._from_time::timestamp without time zone),
|
||||
@@ -195,30 +288,18 @@ CREATE OR REPLACE FUNCTION process_logbook_queue_fn(IN _id integer) RETURNS void
|
||||
_to = to_name,
|
||||
name = log_name,
|
||||
track_geom = geo_rec._track_geom,
|
||||
distance = geo_rec._track_distance
|
||||
distance = geo_rec._track_distance,
|
||||
track_geojson = geojson
|
||||
WHERE id = logbook_rec.id;
|
||||
|
||||
-- GeoJSON
|
||||
geojson := logbook_update_geojson_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT);
|
||||
UPDATE api.logbook
|
||||
SET
|
||||
track_geojson = geojson
|
||||
WHERE id = logbook_rec.id;
|
||||
-- Gather email and pushover app settings
|
||||
--app_settings := get_app_settings_fn();
|
||||
-- Gather user settings
|
||||
SELECT json_build_object('logbook_name', log_name, 'logbook_link', logbook_rec.id) into log_settings;
|
||||
-- Prepare notification, gather user settings
|
||||
SELECT json_build_object('logbook_name', log_name, 'logbook_link', logbook_rec.id) into log_settings;
|
||||
user_settings := get_user_settings_from_clientid_fn(logbook_rec.client_id::TEXT);
|
||||
SELECT user_settings::JSONB || log_settings::JSONB into user_settings;
|
||||
RAISE DEBUG '-> debug process_logbook_queue_fn get_user_settings_from_clientid_fn [%]', user_settings;
|
||||
RAISE DEBUG '-> debug process_logbook_queue_fn log_settings [%]', log_settings;
|
||||
--user_settings := get_user_settings_from_log_fn(logbook_rec::RECORD);
|
||||
--user_settings := '{"logbook_name": "' || log_name || '"}, "{"email": "' || account_rec.email || '", "recipient": "' || account_rec.first || '}';
|
||||
--user_settings := '{"logbook_name": "' || log_name || '"}';
|
||||
-- Send notification email, pushover
|
||||
-- Send notification
|
||||
PERFORM send_notification_fn('logbook'::TEXT, user_settings::JSONB);
|
||||
--PERFORM send_email_py_fn('logbook'::TEXT, user_settings::JSONB, app_settings::JSONB);
|
||||
--PERFORM send_pushover_py_fn('logbook'::TEXT, user_settings::JSONB, app_settings::JSONB);
|
||||
END;
|
||||
$process_logbook_queue$ LANGUAGE plpgsql;
|
||||
-- Description
|
||||
@@ -237,6 +318,7 @@ CREATE OR REPLACE FUNCTION process_stay_queue_fn(IN _id integer) RETURNS void AS
|
||||
-- If _id is valid, not NULL
|
||||
IF _id IS NULL OR _id < 1 THEN
|
||||
RAISE WARNING '-> process_stay_queue_fn invalid input %', _id;
|
||||
RETURN;
|
||||
END IF;
|
||||
-- Get the stay record with all necesary fields exist
|
||||
SELECT * INTO stay_rec
|
||||
@@ -282,6 +364,7 @@ CREATE OR REPLACE FUNCTION process_moorage_queue_fn(IN _id integer) RETURNS void
|
||||
-- If _id is not NULL
|
||||
IF _id IS NULL OR _id < 1 THEN
|
||||
RAISE WARNING '-> process_moorage_queue_fn invalid input %', _id;
|
||||
RETURN;
|
||||
END IF;
|
||||
-- Get the stay record with all necesary fields exist
|
||||
SELECT * INTO stay_rec
|
||||
@@ -292,7 +375,13 @@ CREATE OR REPLACE FUNCTION process_moorage_queue_fn(IN _id integer) RETURNS void
|
||||
AND longitude IS NOT NULL
|
||||
AND latitude IS NOT NULL
|
||||
AND id = _id;
|
||||
-- Ensure the query is successful
|
||||
IF stay_rec.client_id IS NULL THEN
|
||||
RAISE WARNING '-> process_moorage_queue_fn invalid stay %', _id;
|
||||
RETURN;
|
||||
END IF;
|
||||
|
||||
-- Do we have an existing stay within 100m of the new moorage
|
||||
FOR moorage_rec in
|
||||
SELECT
|
||||
*
|
||||
@@ -300,7 +389,8 @@ CREATE OR REPLACE FUNCTION process_moorage_queue_fn(IN _id integer) RETURNS void
|
||||
WHERE
|
||||
latitude IS NOT NULL
|
||||
AND longitude IS NOT NULL
|
||||
AND ST_DWithin(
|
||||
AND geog IS NOT NULL
|
||||
AND ST_DWithin(
|
||||
-- Geography(ST_MakePoint(stay_rec._lng, stay_rec._lat)),
|
||||
stay_rec.geog,
|
||||
-- Geography(ST_MakePoint(longitude, latitude)),
|
||||
@@ -327,7 +417,7 @@ CREATE OR REPLACE FUNCTION process_moorage_queue_fn(IN _id integer) RETURNS void
|
||||
moorage_rec.stay_duration +
|
||||
(stay_rec.departed::timestamp without time zone - stay_rec.arrived::timestamp without time zone)
|
||||
WHERE id = moorage_rec.id;
|
||||
else
|
||||
ELSE
|
||||
RAISE NOTICE 'Insert new moorage entry from stay %', stay_rec;
|
||||
-- Ensure the stay as a name if lat,lon
|
||||
IF stay_rec.name IS NULL AND stay_rec.longitude IS NOT NULL AND stay_rec.latitude IS NOT NULL THEN
|
||||
@@ -577,6 +667,7 @@ AS $send_notification$
|
||||
telegram_settings JSONB := NULL;
|
||||
_email TEXT := NULL;
|
||||
BEGIN
|
||||
-- TODO input check
|
||||
--RAISE NOTICE '--> send_notification_fn type [%]', email_type;
|
||||
-- Gather notification app settings, eg: email, pushover, telegram
|
||||
app_settings := get_app_settings_fn();
|
||||
|
Reference in New Issue
Block a user