sipp11 5 years ago
parent
commit
cf30d2df47
  1. 29
      src/main.py
  2. 28
      src/utils.py

29
src/main.py

@ -7,6 +7,7 @@ time python src/_detector.py --input ~/Desktop/5min.mp4 -l
# import the necessary packages
import sys
import time
import logging
import argparse
import pprint
import numpy as np
@ -22,6 +23,7 @@ from utils import (
DONTCARE,
)
logging.basicConfig(filename="trace.log", filemode="w", level=logging.DEBUG)
pp = pprint.PrettyPrinter(indent=2)
# tracking
OPENCV_OBJECT_TRACKERS = {"csrt": cv2.TrackerCSRT_create}
@ -229,13 +231,15 @@ while True:
untracking.append(trackers[idx])
continue
_last_pos = trackers[idx]["curr_position"]
(_x, _y, _w, _h) = _last_pos
_last_pos = trackers[idx]["history"][0]
curr_distance = box_distance(box, _last_pos)
last_distance = trackers[idx]["distance"]
trackers[idx]["distance"] = curr_distance
trackers[idx]["curr_position"] = box # [int(v) for v in box]
trackers[idx]["history"].insert(0, box)
_last_idx = 2 if len(trackers[idx]["history"]) > 2 else 1
_x, _y, _w, _h = trackers[idx]["history"][_last_idx]
_heading = get_heading(_x, _y, x, y)
tk = trackers[idx]
@ -276,7 +280,7 @@ while True:
# TODO: assign id here! for anything that is -1 id
gid = None
if tk['id'] == -1 and 5 < h_count and h_count < 15:
if tk["id"] == -1 and 5 < h_count and h_count < 15:
_area_id = f"id_{tk['origin']['id']}"
if _area_id in W4A:
# check candidates which has "matched" opportunity too
@ -284,7 +288,7 @@ while True:
_po = [_ for _ in _po if _frame_count > _[2]] # opportunity
_po = sorted(_po, key=lambda kk: kk[2]) # first one first
if _po:
trackers[idx]['id'] = gid = _po[0][0]["id"]
trackers[idx]["id"] = gid = _po[0][0]["id"]
# remove this id out of next W4A
W4A[_area_id]["objects"] = [
_ for _ in W4A[_area_id]["objects"] if _[0]["id"] != gid
@ -293,18 +297,18 @@ while True:
# else:
# print(f" --- no candidate")
print(f" {gid} RE-ENTERS")
elif tk['id'] == -1 and h_count >= 15:
elif tk["id"] == -1 and h_count >= 15:
# assign an id
trackers[idx]['id'] = tracker_counter
trackers[idx]["id"] = tracker_counter
tracker_counter += 1
print(f" {trackers[idx]['id']} ENTERS (& ID assigned)")
for lvng_cnd in OBJ_LEAVING_COND:
_origin = tk["origin"]["id"]
cond = dict(lvng_cnd)
if cond["origin_id"] != _origin or avg_heading != cond["heading"]:
if cond["origin_id"] != _origin:
continue
if cond["heading"] != "" and avg_heading != cond["heading"]:
continue
REACH_condition = False
if cond["x"] != -1:
@ -317,6 +321,10 @@ while True:
_cond = cond["x"](y + h) if callable(cond["x"]) else cond["x"]
if x < _cond:
REACH_condition = True
elif cond["heading"] == "":
_cond = cond["x"](y) if callable(cond["x"]) else cond["x"]
if x < _cond:
REACH_condition = True
elif cond["y"] != -1:
_cond = cond["y"](x + h) if callable(cond["y"]) else cond["y"]
# Don't have one yet
@ -444,8 +452,9 @@ while True:
"heading": [],
"origin": found_at,
"distance": -1,
"last_position": bbox,
"first_position": bbox,
"still": 0,
"history": [bbox],
}
trackers.append(t)
print(f" total #{len(trackers)}")

28
src/utils.py

@ -91,10 +91,18 @@ OBJ_LEAVING_COND = [
# ('next_area', []),
# ('duration_to_next', )
# ],
[
('origin_id', 2),
('heading', ''),
('x', lambda y: (y - 112.6) / -0.2270),
('y', -1),
('next_area', [5, 7]),
('duration_to_next', 30 * 5.5)
],
[
("origin_id", 3),
("heading", "N"),
("x", lambda y: (y - 80.5) / 2.19),
("x", lambda y: (y + 1993) / 15),
("y", -1),
("next_area", [5]),
("duration_to_next", 30 * 4),
@ -115,6 +123,14 @@ OBJ_LEAVING_COND = [
("next_area", [11]),
("duration_to_next", 30 * 3),
],
[
("origin_id", 5),
("heading", "S"),
("x", lambda y: (y - 165.3) / 0.4143),
("y", -1),
("next_area", [4]),
("duration_to_next", 30 * 4),
],
[
("origin_id", 7),
("heading", "S"),
@ -141,9 +157,17 @@ DONTCARE = (
)
def get_linear_equation(pnt1, pnt2):
if not (isinstance(pnt1, tuple) or isinstance(pnt2, tuple)):
return None
a = (pnt2[1] - pnt1[1]) / (pnt2[0] - pnt1[0])
b = pnt1[1] - a * pnt1[0]
return f"y = {a:.4f}x + {b:.1f}"
def get_avg_heading(headings):
latest = headings[:15]
_h = "".join(latest).replace('W', '').replace('E', '')
_h = "".join(latest).replace("W", "").replace("E", "")
chars = collections.Counter(_h).most_common(10)
if chars:
return chars[0][0]

Loading…
Cancel
Save