feat: Add pre logbook check. Split logbook process function

This commit is contained in:
xbgmsharp
2023-12-29 11:34:19 +01:00
parent 5f9a889a44
commit a5d5585366

View File

@@ -14,7 +14,6 @@ CREATE SCHEMA IF NOT EXISTS public;
-- Functions public schema
-- process single cron event, process_[logbook|stay|moorage]_queue_fn()
--
CREATE OR REPLACE FUNCTION public.logbook_metrics_dwithin_fn(
IN _start text,
IN _end text,
@@ -381,17 +380,7 @@ CREATE OR REPLACE FUNCTION process_logbook_queue_fn(IN _id integer) RETURNS void
log_settings jsonb;
user_settings jsonb;
geojson jsonb;
_invalid_time boolean;
_invalid_interval boolean;
_invalid_distance boolean;
_invalid_ratio boolean;
count_metric numeric;
previous_stays_id numeric;
current_stays_departed text;
current_stays_id numeric;
current_stays_active boolean;
extra_json jsonb;
geo jsonb;
BEGIN
-- If _id is not NULL
IF _id IS NULL OR _id < 1 THEN
@@ -416,91 +405,22 @@ CREATE OR REPLACE FUNCTION process_logbook_queue_fn(IN _id integer) RETURNS void
PERFORM set_config('vessel.id', logbook_rec.vessel_id, false);
--RAISE WARNING 'public.process_logbook_queue_fn() scheduler vessel.id %, user.id', current_setting('vessel.id', false), current_setting('user.id', false);
-- Check if all metrics are within 50meters base on geo loc
count_metric := logbook_metrics_dwithin_fn(logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT, logbook_rec._from_lng::NUMERIC, logbook_rec._from_lat::NUMERIC);
RAISE NOTICE '-> process_logbook_queue_fn logbook_metrics_dwithin_fn count:[%]', count_metric;
-- Calculate logbook data average and geo
-- Update logbook entry with the latest metric data and calculate data
avg_rec := logbook_update_avg_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT);
geo_rec := logbook_update_geom_distance_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT);
-- Avoid/ignore/delete logbook stationary movement or time sync issue
-- Check time start vs end
SELECT logbook_rec._to_time::TIMESTAMPTZ < logbook_rec._from_time::TIMESTAMPTZ INTO _invalid_time;
-- Is distance is less or equal than 0.010
SELECT geo_rec._track_distance <= 0.010 INTO _invalid_distance;
-- Is duration is less than 100sec
SELECT (logbook_rec._to_time::TIMESTAMPTZ - logbook_rec._from_time::TIMESTAMPTZ) < (100::text||' secs')::interval INTO _invalid_interval;
-- Is within metrics represent more or equal than 60% of the total entry
SELECT (count_metric::NUMERIC / avg_rec.count_metric::NUMERIC) >= 0.60 INTO _invalid_ratio;
-- if stationary fix data metrics,logbook,stays,moorage
IF _invalid_time IS True OR _invalid_distance IS True
OR _invalid_interval IS True OR _invalid_ratio IS True
OR avg_rec.count_metric <= 2 THEN
RAISE NOTICE '-> process_logbook_queue_fn invalid logbook data id [%], _invalid_time [%], _invalid_distance [%], _invalid_interval [%], count_metric_in_zone [%], count_metric_log [%], _invalid_ratio [%]',
logbook_rec.id, _invalid_time, _invalid_distance, _invalid_interval, count_metric, avg_rec.count_metric, _invalid_ratio;
-- Update metrics status to moored
UPDATE api.metrics
SET status = 'moored'
WHERE time >= logbook_rec._from_time::TIMESTAMPTZ
AND time <= logbook_rec._to_time::TIMESTAMPTZ
AND vessel_id = current_setting('vessel.id', false);
-- Update logbook
UPDATE api.logbook
SET notes = 'invalid logbook data, stationary need to fix metrics?'
WHERE id = logbook_rec.id;
-- Get related stays
SELECT id,departed,active INTO current_stays_id,current_stays_departed,current_stays_active
FROM api.stays s
WHERE s.vessel_id = current_setting('vessel.id', false)
AND s.arrived = logbook_rec._to_time;
-- Update related stays
UPDATE api.stays
SET notes = 'invalid stays data, stationary need to fix metrics?'
WHERE vessel_id = current_setting('vessel.id', false)
AND arrived = logbook_rec._to_time;
-- Find previous stays
SELECT id INTO previous_stays_id
FROM api.stays s
WHERE s.vessel_id = current_setting('vessel.id', false)
AND s.arrived < logbook_rec._to_time
ORDER BY s.arrived DESC LIMIT 1;
-- Update previous stays with the departed time from current stays
-- and set the active state from current stays
UPDATE api.stays
SET departed = current_stays_departed::TIMESTAMPTZ,
active = current_stays_active
WHERE vessel_id = current_setting('vessel.id', false)
AND id = previous_stays_id;
-- Clean up, remove invalid logbook and stay entry
DELETE FROM api.logbook WHERE id = logbook_rec.id;
RAISE WARNING '-> process_logbook_queue_fn delete invalid logbook [%]', logbook_rec.id;
DELETE FROM api.stays WHERE id = current_stays_id;
RAISE WARNING '-> process_logbook_queue_fn delete invalid stays [%]', current_stays_id;
-- TODO should we subtract (-1) moorages ref count or reprocess it?!?
RETURN;
END IF;
-- Do we have an existing moorage within 300m of the new log
-- generate logbook name, concat _from_location and _to_location from moorage name
from_moorage := process_lat_lon_fn(logbook_rec._from_lng::NUMERIC, logbook_rec._from_lat::NUMERIC);
to_moorage := process_lat_lon_fn(logbook_rec._to_lng::NUMERIC, logbook_rec._to_lat::NUMERIC);
SELECT CONCAT(from_moorage.moorage_name, ' to ' , to_moorage.moorage_name) INTO log_name;
-- Generate logbook name, concat _from_location and _to_location
-- geo reverse _from_lng _from_lat
-- geo reverse _to_lng _to_lat
--geo := reverse_geocode_py_fn('nominatim', logbook_rec._from_lng::NUMERIC, logbook_rec._from_lat::NUMERIC);
--from_name := geo->>'name';
--geo := reverse_geocode_py_fn('nominatim', logbook_rec._to_lng::NUMERIC, logbook_rec._to_lat::NUMERIC);
--to_name := geo->>'name';
--SELECT CONCAT(from_name, ' to ' , to_name) INTO log_name;
-- Process `propulsion.*.runTime` and `navigation.log`
-- Calculate extra json
extra_json := logbook_update_extra_json_fn(logbook_rec.id, logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT);
RAISE NOTICE 'Updating valid logbook entry [%] [%] [%]', logbook_rec.id, logbook_rec._from_time, logbook_rec._to_time;
RAISE NOTICE 'Updating valid logbook entry logbook id:[%] start:[%] end:[%]', logbook_rec.id, logbook_rec._from_time, logbook_rec._to_time;
UPDATE api.logbook
SET
duration = (logbook_rec._to_time::TIMESTAMPTZ - logbook_rec._from_time::TIMESTAMPTZ),
@@ -514,7 +434,8 @@ CREATE OR REPLACE FUNCTION process_logbook_queue_fn(IN _id integer) RETURNS void
name = log_name,
track_geom = geo_rec._track_geom,
distance = geo_rec._track_distance,
extra = extra_json
extra = extra_json,
notes = NULL -- reset pre_log process
WHERE id = logbook_rec.id;
-- GeoJSON require track_geom field
@@ -1349,11 +1270,13 @@ CREATE OR REPLACE FUNCTION public.process_logbook_valid_fn(IN _id integer) RETUR
_invalid_time boolean;
_invalid_interval boolean;
_invalid_distance boolean;
_invalid_ratio boolean;
count_metric numeric;
previous_stays_id numeric;
current_stays_departed text;
current_stays_id numeric;
current_stays_active boolean;
timebucket boolean;
BEGIN
-- If _id is not NULL
IF _id IS NULL OR _id < 1 THEN
@@ -1378,7 +1301,7 @@ CREATE OR REPLACE FUNCTION public.process_logbook_valid_fn(IN _id integer) RETUR
PERFORM set_config('vessel.id', logbook_rec.vessel_id, false);
--RAISE WARNING 'public.process_logbook_queue_fn() scheduler vessel.id %, user.id', current_setting('vessel.id', false), current_setting('user.id', false);
-- Check if all metrics are within 10meters base on geo loc
-- Check if all metrics are within 50meters base on geo loc
count_metric := logbook_metrics_dwithin_fn(logbook_rec._from_time::TEXT, logbook_rec._to_time::TEXT, logbook_rec._from_lng::NUMERIC, logbook_rec._from_lat::NUMERIC);
RAISE NOTICE '-> process_logbook_valid_fn logbook_metrics_dwithin_fn count:[%]', count_metric;
@@ -1394,11 +1317,17 @@ CREATE OR REPLACE FUNCTION public.process_logbook_valid_fn(IN _id integer) RETUR
SELECT geo_rec._track_distance < 0.010 INTO _invalid_distance;
-- Is duration is less than 100sec
SELECT (logbook_rec._to_time::TIMESTAMPTZ - logbook_rec._from_time::TIMESTAMPTZ) < (100::text||' secs')::interval INTO _invalid_interval;
-- Is within metrics represent more or equal than 60% of the total entry
IF count_metric::NUMERIC <= 10 THEN
SELECT (count_metric::NUMERIC / avg_rec.count_metric::NUMERIC) >= 0.60 INTO _invalid_ratio;
END IF;
-- if stationary fix data metrics,logbook,stays,moorage
IF _invalid_time IS True OR _invalid_distance IS True
OR _invalid_interval IS True OR count_metric = avg_rec.count_metric THEN
RAISE NOTICE '-> process_logbook_queue_fn invalid logbook data id [%], _invalid_time [%], _invalid_distance [%], _invalid_interval [%], within count_metric == total count_metric [%]',
logbook_rec.id, _invalid_time, _invalid_distance, _invalid_interval, count_metric;
OR _invalid_interval IS True OR count_metric = avg_rec.count_metric
OR _invalid_ratio IS True
OR avg_rec.count_metric <= 3 THEN
RAISE NOTICE '-> process_logbook_valid_fn invalid logbook data id [%], _invalid_time [%], _invalid_distance [%], _invalid_interval [%], count_metric_in_zone [%], count_metric_log [%], _invalid_ratio [%]',
logbook_rec.id, _invalid_time, _invalid_distance, _invalid_interval, count_metric, avg_rec.count_metric, _invalid_ratio;
-- Update metrics status to moored
UPDATE api.metrics
SET status = 'moored'
@@ -1435,13 +1364,34 @@ CREATE OR REPLACE FUNCTION public.process_logbook_valid_fn(IN _id integer) RETUR
AND id = previous_stays_id;
-- Clean up, remove invalid logbook and stay entry
DELETE FROM api.logbook WHERE id = logbook_rec.id;
RAISE WARNING '-> process_logbook_queue_fn delete invalid logbook [%]', logbook_rec.id;
RAISE WARNING '-> process_logbook_valid_fn delete invalid logbook [%]', logbook_rec.id;
DELETE FROM api.stays WHERE id = current_stays_id;
RAISE WARNING '-> process_logbook_queue_fn delete invalid stays [%]', current_stays_id;
-- TODO should we subtract (-1) moorages ref count or reprocess it?!?
RAISE WARNING '-> process_logbook_valid_fn delete invalid stays [%]', current_stays_id;
RETURN;
END IF;
IF (logbook_rec.notes IS NULL) THEN -- run one time only
-- If duration is over 24h or number of entry is over 400, check for stays and potential multiple logs with stationary location
IF (logbook_rec._to_time::TIMESTAMPTZ - logbook_rec._from_time::TIMESTAMPTZ) > INTERVAL '24 hours'
OR avg_rec.count_metric > 400 THEN
timebucket := public.logbook_metrics_timebucket_fn('15 minutes'::TEXT, logbook_rec.id, logbook_rec._from_time::TIMESTAMPTZ, logbook_rec._to_time::TIMESTAMPTZ);
-- If true exit current process as the current logbook need to be re-process.
IF timebucket IS True THEN
RETURN;
END IF;
ELSE
timebucket := public.logbook_metrics_timebucket_fn('5 minutes'::TEXT, logbook_rec.id, logbook_rec._from_time::TIMESTAMPTZ, logbook_rec._to_time::TIMESTAMPTZ);
-- If true exit current process as the current logbook need to be re-process.
IF timebucket IS True THEN
RETURN;
END IF;
END IF;
END IF;
-- Add logbook entry to process queue for later processing
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('new_logbook', logbook_rec.id, NOW(), current_setting('vessel.id', true));
END;
$process_logbook_valid$ LANGUAGE plpgsql;
-- Description
@@ -1466,10 +1416,9 @@ CREATE OR REPLACE FUNCTION process_lat_lon_fn(IN lon NUMERIC, IN lat NUMERIC,
geo jsonb;
overpass jsonb;
BEGIN
RAISE NOTICE 'process_lat_lon_fn';
-- If _id is valid, not NULL
RAISE NOTICE '-> process_lat_lon_fn';
IF lon IS NULL OR lat IS NULL THEN
RAISE WARNING '-> process_lat_lon_fn invalid input lon,lat %', _id;
RAISE WARNING '-> process_lat_lon_fn invalid input lon %, lat %', lon, lat;
--return NULL;
END IF;
@@ -1564,6 +1513,207 @@ COMMENT ON FUNCTION
public.process_lat_lon_fn
IS 'Add or Update moorage base on lat/lon';
CREATE OR REPLACE FUNCTION public.logbook_metrics_timebucket_fn(
IN bucket_interval TEXT,
IN _id INTEGER,
IN _start TIMESTAMPTZ,
IN _end TIMESTAMPTZ,
OUT timebucket boolean) AS $logbook_metrics_timebucket$
DECLARE
time_rec record;
stay_rec record;
log_rec record;
geo_rec record;
ref_time timestamptz;
stay_id integer;
stay_lat DOUBLE PRECISION;
stay_lng DOUBLE PRECISION;
stay_arv timestamptz;
in_interval boolean := False;
log_id integer;
log_lat DOUBLE PRECISION;
log_lng DOUBLE PRECISION;
log_start timestamptz;
in_log boolean := False;
BEGIN
timebucket := False;
-- Agg metrics over a bucket_interval
RAISE NOTICE '-> logbook_metrics_timebucket_fn Starting loop by [%], _start[%], _end[%]', bucket_interval, _start, _end;
for time_rec in
WITH tbl_bucket AS (
SELECT time_bucket(bucket_interval::INTERVAL, time) AS time_bucket,
avg(speedoverground) AS speed,
last(latitude, time) AS lat,
last(longitude, time) AS lng,
st_makepoint(avg(longitude),avg(latitude)) AS geo_point
FROM api.metrics m
WHERE
m.latitude IS NOT NULL
AND m.longitude IS NOT NULL
AND m.time >= _start::TIMESTAMPTZ
AND m.time <= _end::TIMESTAMPTZ
AND m.vessel_id = current_setting('vessel.id', false)
GROUP BY time_bucket
ORDER BY time_bucket asc
),
tbl_bucket2 AS (
SELECT time_bucket,
speed,
geo_point,lat,lng,
LEAD(time_bucket,1) OVER (
ORDER BY time_bucket asc
) time_interval,
LEAD(geo_point,1) OVER (
ORDER BY time_bucket asc
) geo_interval
FROM tbl_bucket
WHERE speed <= 0.5
)
SELECT time_bucket,
speed,
geo_point,lat,lng,
time_interval,
bucket_interval,
(bucket_interval::interval * 2) AS min_interval,
(time_bucket - time_interval) AS diff_interval,
(time_bucket - time_interval)::INTERVAL < (bucket_interval::interval * 2)::INTERVAL AS to_be_process
FROM tbl_bucket2
WHERE (time_bucket - time_interval)::INTERVAL < (bucket_interval::interval * 2)::INTERVAL
loop
RAISE NOTICE '-> logbook_metrics_timebucket_fn ref_time [%] interval [%] bucket_interval[%]', ref_time, time_rec.time_bucket, bucket_interval;
select ref_time + bucket_interval::interval * 1 >= time_rec.time_bucket into in_interval;
RAISE NOTICE '-> logbook_metrics_timebucket_fn ref_time+inverval[%] interval [%], in_interval [%]', ref_time + bucket_interval::interval * 1, time_rec.time_bucket, in_interval;
if ST_DWithin(Geography(ST_MakePoint(stay_lng, stay_lat)), Geography(ST_MakePoint(time_rec.lng, time_rec.lat)), 50) IS True then
in_interval := True;
end if;
if ST_DWithin(Geography(ST_MakePoint(log_lng, log_lat)), Geography(ST_MakePoint(time_rec.lng, time_rec.lat)), 50) IS False then
in_interval := False;
end if;
if in_interval is true then
ref_time := time_rec.time_bucket;
end if;
RAISE NOTICE '-> logbook_metrics_timebucket_fn ref_time is stay within of next point %', ST_DWithin(Geography(ST_MakePoint(stay_lng, stay_lat)), Geography(ST_MakePoint(time_rec.lng, time_rec.lat)), 50);
RAISE NOTICE '-> logbook_metrics_timebucket_fn ref_time is NOT log within of next point %', ST_DWithin(Geography(ST_MakePoint(log_lng, log_lat)), Geography(ST_MakePoint(time_rec.lng, time_rec.lat)), 50);
if time_rec.time_bucket::TIMESTAMPTZ < _start::TIMESTAMPTZ + bucket_interval::interval * 1 then
in_interval := True;
end if;
RAISE NOTICE '-> logbook_metrics_timebucket_fn ref_time is NOT before start[%] or +interval[%]', (time_rec.time_bucket::TIMESTAMPTZ < _start::TIMESTAMPTZ), (time_rec.time_bucket::TIMESTAMPTZ < _start::TIMESTAMPTZ + bucket_interval::interval * 1);
continue when in_interval is True;
RAISE NOTICE '-> logbook_metrics_timebucket_fn after continue stay_id[%], in_log[%]', stay_id, in_log;
if stay_id is null THEN
RAISE NOTICE '-> Close current logbook logbook_id ref_time [%] time_rec.time_bucket [%]', ref_time, time_rec.time_bucket;
-- Close current logbook
geo_rec := logbook_update_geom_distance_fn(_id, _start::TEXT, time_rec.time_bucket::TEXT);
UPDATE api.logbook
SET
active = false,
_to_time = time_rec.time_bucket,
_to_lat = time_rec.lat,
_to_lng = time_rec.lng,
track_geom = geo_rec._track_geom,
notes = 'updated time_bucket'
WHERE id = _id;
-- Add logbook entry to process queue for later processing
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('pre_logbook', _id, NOW(), current_setting('vessel.id', true));
RAISE WARNING '-> Updated existing logbook logbook_id [%] [%] and add to process_queue', _id, time_rec.time_bucket;
-- Add new stay
INSERT INTO api.stays
(vessel_id, active, arrived, latitude, longitude, notes)
VALUES (current_setting('vessel.id', false), false, time_rec.time_bucket, time_rec.lat, time_rec.lng, 'autogenerated time_bucket')
RETURNING id, latitude, longitude, arrived INTO stay_id, stay_lat, stay_lng, stay_arv;
RAISE WARNING '-> Add new stay stay_id [%] [%]', stay_id, time_rec.time_bucket;
timebucket := True;
elsif in_log is false THEN
-- Close current stays
UPDATE api.stays
SET
active = false,
departed = ref_time,
notes = 'autogenerated time_bucket'
WHERE id = stay_id;
-- Add stay entry to process queue for further processing
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('new_stay', stay_id, now(), current_setting('vessel.id', true));
RAISE WARNING '-> Updated existing stays stay_id [%] departed [%] and add to process_queue', stay_id, ref_time;
-- Add new logbook
INSERT INTO api.logbook
(vessel_id, active, _from_time, _from_lat, _from_lng, notes)
VALUES (current_setting('vessel.id', false), false, ref_time, stay_lat, stay_lng, 'autogenerated time_bucket')
RETURNING id, _from_lat, _from_lng, _from_time INTO log_id, log_lat, log_lng, log_start;
RAISE WARNING '-> Add new logbook, logbook_id [%] [%]', log_id, ref_time;
in_log := true;
stay_id := 0;
stay_lat := null;
stay_lng := null;
timebucket := True;
elsif in_log is true THEN
RAISE NOTICE '-> Close current logbook logbook_id [%], ref_time [%], time_rec.time_bucket [%]', log_id, ref_time, time_rec.time_bucket;
-- Close current logbook
geo_rec := logbook_update_geom_distance_fn(_id, log_start::TEXT, time_rec.time_bucket::TEXT);
UPDATE api.logbook
SET
active = false,
_to_time = time_rec.time_bucket,
_to_lat = time_rec.lat,
_to_lng = time_rec.lng,
track_geom = geo_rec._track_geom,
notes = 'autogenerated time_bucket'
WHERE id = log_id;
-- Add logbook entry to process queue for later processing
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('pre_logbook', log_id, NOW(), current_setting('vessel.id', true));
RAISE WARNING '-> Update Existing logbook logbook_id [%] [%] and add to process_queue', log_id, time_rec.time_bucket;
-- Add new stay
INSERT INTO api.stays
(vessel_id, active, arrived, latitude, longitude, notes)
VALUES (current_setting('vessel.id', false), false, time_rec.time_bucket, time_rec.lat, time_rec.lng, 'autogenerated time_bucket')
RETURNING id, latitude, longitude, arrived INTO stay_id, stay_lat, stay_lng, stay_arv;
RAISE WARNING '-> Add new stay stay_id [%] [%]', stay_id, time_rec.time_bucket;
in_log := false;
log_id := null;
log_lat := null;
log_lng := null;
timebucket := True;
end if;
RAISE WARNING '-> Update new ref_time [%]', ref_time;
ref_time := time_rec.time_bucket;
end loop;
RAISE NOTICE '-> logbook_metrics_timebucket_fn Ending loop stay_id[%], in_log[%]', stay_id, in_log;
if in_log is true then
RAISE NOTICE '-> Ending log ref_time [%] interval [%]', ref_time, time_rec.time_bucket;
end if;
if stay_id > 0 then
RAISE NOTICE '-> Ending stay ref_time [%] interval [%]', ref_time, time_rec.time_bucket;
select * into stay_rec from api.stays s where arrived = _end;
-- Close current stays
UPDATE api.stays
SET
active = false,
arrived = stay_arv,
notes = 'updated time_bucket'
WHERE id = stay_rec.id;
-- Add stay entry to process queue for further processing
INSERT INTO process_queue (channel, payload, stored, ref_id)
VALUES ('new_stay', stay_rec.id, now(), current_setting('vessel.id', true));
RAISE WARNING '-> Ending Update Existing stays stay_id [%] arrived [%] and add to process_queue', stay_rec.id, stay_arv;
delete from api.stays where id = stay_id;
RAISE WARNING '-> Ending Delete Existing stays stay_id [%]', stay_id;
stay_arv := null;
stay_id := null;
stay_lat := null;
stay_lng := null;
timebucket := True;
end if;
END;
$logbook_metrics_timebucket$ LANGUAGE plpgsql;
-- Description
COMMENT ON FUNCTION
public.logbook_metrics_timebucket_fn
IS 'Check if all entries for a logbook are in stationary movement per time bucket of 15 or 5 min, speed < 0.6knot, d_within 50m of the stay point';
---------------------------------------------------------------------------
-- TODO add alert monitoring for Battery