一个超级简单的见证人监控报警装置
对于一个见证人来讲,最害怕三件事情是什么?答案是丢块、丢块还是丢块!
(图源 :pixabay)
如果一个见证人节点长时间、持续性地大量丢块,没有出块奖励是小事情,每次丢块都会使我们的链延迟三秒。
虽然下个见证人会打包链延迟期间的交易,但是经常轮到自己出块却不干活,终归会让人觉得这个见证人技术能力稍微欠缺了一点点,可能会导致别人大量地撤票,见证人排名下滑。
但是就好比前些天CloudFlare出现全球性大故障,见证人节点或者放机房、或者放家里总会有可能遇到电力故障、网络故障、主机故障等问题,偶发的丢块在所难免,如何在丢块发生时,及时妥当地处理才是我们要解决的问题。
最优雅的方案呢,就是有至少两台以上见证人节点,运行监控脚本,当主节点丢块时,马上通过更新见证人参数中的出块公钥,将出块节点切换到备份节点上去。
另外一种方案,假设我们只有一个节点,运行监控脚本,当主节点丢块时,马上通过更新见证人参数中的出块公钥,使见证人临时离线,然后排查解决好故障后,再重新上线。这样可以避免持续不断地丢块。
对于TOP20节点,或者排名靠前的节点,因为出块的频率很高(TOP20几乎每分钟轮到一次出块机会),所以应该选择上述两种方案。但对于排名靠后的节点,也可以使用简单的监控报警装置,及时发现丢块,然后手工处理就好。
下边是一个简单的监控报警脚本,供参考:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import requests
import time
import logging
import RPi.GPIO as GPIO
# ----------------------
# 配置区域
# ----------------------
RPC_URL = "https://api.hive.blog"
WITNESS = "oflyhigh"
PRE_MISSED = 40
BUZZ_PIN = 11
BUZZ_FREQ = 6000
BEEP_DURATION = 180 # 秒
RETRY = 3 # 网络重试次数
TIMEOUT = 5 # 请求超时秒数
# ----------------------
# 日志
# ----------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# ----------------------
# 蜂鸣器控制
# ----------------------
GPIO.setmode(GPIO.BOARD)
GPIO.setup(BUZZ_PIN, GPIO.OUT)
Buzz = GPIO.PWM(BUZZ_PIN, BUZZ_FREQ)
def beep(seconds):
"""蜂鸣提醒(避免阻塞主线程太久,可分段 beep)"""
logging.warning(f"🚨 Witness missed block! Beeping for {seconds} seconds.")
Buzz.start(50)
end = time.time() + seconds
while time.time() < end:
Buzz.ChangeFrequency(BUZZ_FREQ)
time.sleep(0.5)
Buzz.stop()
# ----------------------
# RPC 调用
# ----------------------
def fetch_witness_missed(session, witness):
"""查询 witness 并返回 missed 值"""
payload = {
"jsonrpc": "2.0",
"method": "condenser_api.get_witness_by_account",
"params": [witness],
"id": 1
}
headers = {
"accept": "application/json",
"content-type": "application/json"
}
for attempt in range(1, RETRY + 1):
try:
resp = session.post(RPC_URL, json=payload, headers=headers, timeout=TIMEOUT)
if resp.status_code != 200:
logging.error(f"RPC status {resp.status_code}: {resp.text}")
continue
data = resp.json()
if "result" not in data or data["result"] is None:
logging.error(f"Invalid RPC response: {data}")
continue
missed = data["result"].get("total_missed", None)
if missed is None:
logging.error("RPC result missing 'total_missed'")
continue
return missed
except Exception as e:
logging.error(f"RPC request failed (attempt {attempt}/{RETRY}): {e}")
time.sleep(1)
return None # 多次失败返回 None
# ----------------------
# 主流程
# ----------------------
def main():
session = requests.Session()
logging.info(f"Checking witness '{WITNESS}'...")
missed = fetch_witness_missed(session, WITNESS)
if missed is None:
logging.error("❌ Failed to get witness info.")
return
logging.info(f"Current missed: {missed}, Previous: {PRE_MISSED}")
if missed > PRE_MISSED:
beep(BEEP_DURATION)
else:
logging.info("👍 No new missed blocks.")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("Program interrupted by user.")
finally:
GPIO.cleanup()
logging.info("GPIO cleanup complete.")
这里我用的是之前香蕉派上的一个蜂鸣器。这个蜂鸣器报警器已经运行了五六年,为我立下了汗马功劳!
如果你用树莓派,也可以改成树莓派上的蜂鸣器(有源蜂鸣器模块),详情可以参考我的这个帖子:
当然,还可以改造成使用音箱播放报警音乐,但是考虑到万一半夜报警了,会吵到媳妇和孩子,有可能挨打,还是不要这么做的好。
见证人丢块时,除了通过程序让报警器发出蜂鸣声响,我们也可以让程序给我们的Telegram或者Discord账号发送通知。
或者我们还可以通过网络服务商或者SMS模块(或者USB连接手机)实现短信通知功能,这样就不担心错过通知啦。
(图源 :pixabay)
当然了,这里要再次强调一下,上述脚本仅适合排名不高的见证人哦,而且最后有人24小时都能注意到报警器的报警(嗯,比较适合我这种宅家的)。
更优雅的处理方式还是自动切换主备节点,我准备在这个脚本的基础上进一步完善,实现切换(或者自动离线功能),应该也挺容易的。
另外说一句,有些优秀的见证人分享了各种各样的优秀监控工具,为啥我还要重复地造轮子呢?我是觉得这类工具越多越好,而且这种简单的也适合懒人部署不是嘛?哈哈。对,我就是懒人之一。
相关链接
Leave 一个超级简单的见证人监控报警装置 to:
Read more #cn posts
Best Posts From oflyhigh
We have not curated any of oflyhigh's posts yet. But you can encourage our curation team to review posts by visiting them regularly and by referring other readers. Because we give priority to frequently read content.