There is only one led/buzzer at the moment.
I went your suggestion and after a few hours of development got it working.
Have an Active and Clear Script block that increments or decrements a counter tags for amber, red and buzzer. Red>Amber>Green
Active Script Block
# Alarm Pipeline Script Block (Active)
log = system.util.getLogger("led_buzzer_pipeline")
BASE = "[default]DV2103"
CNT_RED = BASE + "/alarm_pipeline/cntRed"
CNT_AMBER = BASE + "/alarm_pipeline/cntAmber"
BUZ_CNT = BASE + "/alarm_pipeline/buzzerCnt"
WORD_TAG = BASE + "/cfg/control_word" # single INT register
# ---------------- helpers ----------------
def clamp0(v):
try:
v = int(v or 0)
return v if v > 0 else 0
except:
return 0
def priority_name(ev):
try:
return str(ev.getPriority()).lower()
except:
try:
return str(ev["priority"]).lower()
except:
return ""
def bucket_for_priority(p):
if "critical" in p or "high" in p:
return "RED"
if "medium" in p or "low" in p:
return "AMBER"
return None
def buzzer_enabled(ev):
val = None
try:
val = ev.get("buzzer_enable")
except:
pass
if val is None:
return False
s = str(val).strip().lower()
return s in ("true", "1", "yes", "y", "on")
def decode_led_int_and_buzzer(control_word):
w = int(control_word or 0)
red = (w >> 0) & 1
amber = (w >> 1) & 1
green = (w >> 2) & 1
if red:
led_int = 1
elif amber:
led_int = 2
elif green:
led_int = 3
else:
led_int = 0
buz_on = ((w >> 8) & 1) == 1
return led_int, buz_on
def desired_led_int(cnt_red, cnt_amber):
if cnt_red > 0:
return 1
if cnt_amber > 0:
return 2
return 3
# ---------------- logic ----------------
p = priority_name(event)
bucket = bucket_for_priority(p)
buzz = buzzer_enabled(event)
vals = system.tag.readBlocking([CNT_RED, CNT_AMBER, BUZ_CNT, WORD_TAG])
cnt_red = clamp0(vals[0].value)
cnt_amber = clamp0(vals[1].value)
buz_cnt = clamp0(vals[2].value)
cur_word = vals[3].value
# increment for activating alarm
if bucket == "RED":
cnt_red += 1
elif bucket == "AMBER":
cnt_amber += 1
if buzz and bucket in ("RED", "AMBER"):
buz_cnt += 1
new_led = desired_led_int(cnt_red, cnt_amber)
new_buz = (buz_cnt > 0)
# persist counters
system.tag.writeBlocking([CNT_RED, CNT_AMBER, BUZ_CNT], [cnt_red, cnt_amber, buz_cnt])
# compare against real device state
cur_led, cur_buz = decode_led_int_and_buzzer(cur_word)
if new_led != cur_led or new_buz != cur_buz:
ifm.write_outputs_atomic(WORD_TAG, led_int=new_led, buzzer_on=new_buz)
Clear Script Block
# Alarm Pipeline Script Block (Clear / Inactive)
log = system.util.getLogger("led_buzzer_pipeline")
BASE = "[default]DV2103"
CNT_RED = BASE + "/alarm_pipeline/cntRed"
CNT_AMBER = BASE + "/alarm_pipeline/cntAmber"
BUZ_CNT = BASE + "/alarm_pipeline/buzzerCnt"
WORD_TAG = BASE + "/cfg/control_word" # single INT register
# ---------------- helpers ----------------
def clamp0(v):
try:
v = int(v or 0)
return v if v > 0 else 0
except:
return 0
def priority_name(ev):
try:
return str(ev.getPriority()).lower()
except:
try:
return str(ev["priority"]).lower()
except:
return ""
def bucket_for_priority(p):
# High/Critical -> Red, Medium/Low -> Amber, others ignored
if "critical" in p or "high" in p:
return "RED"
if "medium" in p or "low" in p:
return "AMBER"
return None
def buzzer_enabled(ev):
# custom alarm property: buzzer_enable (string "true"/"false")
val = None
try:
val = ev.get("buzzer_enable")
except:
pass
if val is None:
return False
s = str(val).strip().lower()
return s in ("true", "1", "yes", "y", "on")
def decode_led_int_and_buzzer(control_word):
"""Return (led_int, buz_on) from control word bits."""
w = int(control_word or 0)
red = (w >> 0) & 1
amber = (w >> 1) & 1
green = (w >> 2) & 1
# led_int mapping: 1=Red, 2=Amber, 3=Green (choose priority if multiple bits)
if red:
led_int = 1
elif amber:
led_int = 2
elif green:
led_int = 3
else:
led_int = 0 # none on
buz_on = ((w >> 8) & 1) == 1
return led_int, buz_on
def desired_led_int(cnt_red, cnt_amber):
# 1=Red, 2=Amber, 3=Green
if cnt_red > 0:
return 1
if cnt_amber > 0:
return 2
return 3
# ---------------- logic ----------------
p = priority_name(event)
bucket = bucket_for_priority(p)
buzz = buzzer_enabled(event)
vals = system.tag.readBlocking([CNT_RED, CNT_AMBER, BUZ_CNT, WORD_TAG])
cnt_red = clamp0(vals[0].value)
cnt_amber = clamp0(vals[1].value)
buz_cnt = clamp0(vals[2].value)
cur_word = vals[3].value
# decrement for clearing alarm
if bucket == "RED":
cnt_red = clamp0(cnt_red - 1)
elif bucket == "AMBER":
cnt_amber = clamp0(cnt_amber - 1)
if buzz and bucket in ("RED", "AMBER"):
buz_cnt = clamp0(buz_cnt - 1)
new_led = desired_led_int(cnt_red, cnt_amber)
new_buz = (buz_cnt > 0)
# persist counters
system.tag.writeBlocking([CNT_RED, CNT_AMBER, BUZ_CNT], [cnt_red, cnt_amber, buz_cnt])
# compare against real device state
cur_led, cur_buz = decode_led_int_and_buzzer(cur_word)
if new_led != cur_led or new_buz != cur_buz:
ifm.write_outputs_atomic(WORD_TAG, led_int=new_led, buzzer_on=new_buz)
The only annoying thing is that the ifm io master link does not support masked bit writes so I cannot write to multiple bits at the same time.