opc ua是一种应用层协议,基于tcp之上,其url通常为 两种模式#opc ua支持c/s模式,同时也支持类似mqtt的发布订阅模式,通常各种设备作为opc ua的服务端提供各种服务。 信息模型#opc ua采用面向对象的设计思路, 使用了对象(objects)作为过程系统表示数据和活动的基础。对象包含了 OPC UA 信息模型是节点的网络(Network of Node,),或者称为结构化图(graph),由 注意⚠️:opc ua中所说的节点是在一个opc ua服务器中,不要理解为一个服务器对应一个node 节点#opc ua定义了8种类型的节点
每种节点都包含一些公共属性,如下:
除了 引用#引用描述了两个节点之间的关系,用来连接多个节点。OPC UA预定义了多种引用,常见的引用有:
完整引用如下 服务#服务可以看成是OPC UA服务器提供的API集合,OPC UA与定义了37个标准服务,常用的服务有:
opc ua编程Sdk
客户端
模拟设备 可利用sdk自己开发 见下面的python demo golang Demo读取服务器数据#Copypackage main import ( "context" "log" "github.com/gopcua/opcua" "github.com/gopcua/opcua/ua" ) func main() { endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo" nodeID := "ns=2;s=Dynamic/RandomFloat" ctx := context.Background() c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone)) if err := c.Connect(ctx); err != nil { log.Fatal(err) } defer c.Close() id, err := ua.ParseNodeID(nodeID) if err != nil { log.Fatalf("invalid node id: %v", err) } req := &ua.ReadRequest{ MaxAge: 2000, NodesToRead: []*ua.ReadValueID{{NodeID: id}}, TimestampsToReturn: ua.TimestampsToReturnBoth, } resp, err := c.Read(req) if err != nil { log.Fatalf("Read failed: %s", err) } if resp.Results[0].Status != ua.StatusOK { log.Fatalf("Status not OK: %v", resp.Results[0].Status) } log.Printf("%#v", resp.Results[0].Value.Value()) } 向服务器写数据#Copypackage main import ( "context" "github.com/gopcua/opcua" "github.com/gopcua/opcua/ua" "log" ) func main() { endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo" nodeID := "ns=2;s=Dynamic/RandomFloat" ctx := context.Background() c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone)) if err := c.Connect(ctx); err != nil { log.Fatal(err) } defer c.Close() id, err := ua.ParseNodeID(nodeID) if err != nil { log.Fatalf("invalid node id: %v", err) } v, err := ua.NewVariant(10.0) if err != nil { log.Fatalf("invalid value: %v", err) } req := &ua.WriteRequest{ NodesToWrite: []*ua.WriteValue{ { NodeID: id, AttributeID: ua.AttributeIDValue, Value: &ua.DataValue{ EncodingMask: ua.DataValueValue, Value: v, }, }, }, } resp, err := c.Write(req) if err != nil { log.Fatalf("Read failed: %s", err) } log.Printf("%v", resp.Results[0]) } 监听服务器数据变化#Copypackage main import ( "context" "github.com/gopcua/opcua/monitor" "log" "os" "os/signal" "sync" "time" "github.com/gopcua/opcua" "github.com/gopcua/opcua/ua" ) func cleanup(sub *monitor.Subscription, wg *sync.WaitGroup) { log.Printf("stats: sub=%d delivered=%d dropped=%d", sub.SubscriptionID(), sub.Delivered(), sub.Dropped()) sub.Unsubscribe() wg.Done() } func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) { sub, err := m.Subscribe( ctx, &opcua.SubscriptionParameters{ Interval: interval, }, func(s *monitor.Subscription, msg *monitor.DataChangeMessage) { if msg.Error != nil { log.Printf("[callback] error=%s", msg.Error) } else { log.Printf("[callback] node=%s value=%v", msg.NodeID, msg.Value.Value()) } time.Sleep(lag) }, nodes...) if err != nil { log.Fatal(err) } defer cleanup(sub, wg) <-ctx.Done() } func main() { endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo" nodeID := "ns=2;s=Dynamic/RandomFloat" signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { <-signalCh println() cancel() }() c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone)) if err := c.Connect(ctx); err != nil { log.Fatal(err) } defer c.Close() m, err := monitor.NewNodeMonitor(c) if err != nil { log.Fatal(err) } m.SetErrorHandler(func(_ *opcua.Client, sub *monitor.Subscription, err error) { log.Printf("error: sub=%d err=%s", sub.SubscriptionID(), err.Error()) }) wg := &sync.WaitGroup{} // start callback-based subscription wg.Add(1) go startCallbackSub(ctx, m, time.Second, 0, wg, nodeID) <-ctx.Done() wg.Wait() } python opcua server demoCopy#!/usr/bin/env python3 from threading import Thread import random import time from opcua import ua, uamethod, Server @uamethod def set_temperature(parent, variant): print(f"set_temperature {variant.Value}") temperature_thread.temperature.set_value(variant.Value) @uamethod def set_onoff(parent, variant): print(f"set_onoff {variant.Value}") temperature_thread.temperature.set_value(variant.Value) # 这个类用于后台定时随机修改值 class Temperature(Thread): def __init__(self, temperature, onoff): Thread.__init__(self) self._stop = False self.temperature = temperature self.onoff = onoff def stop(self): self._stop = True def run(self): count = 1 while not self._stop: value = random.randint(-20, 100) self.temperature.set_value(value) print(f"random set temperature {value}") value = bool(random.randint(0, 1)) self.onoff.set_value(value) print(f"random set onoff {value}") led_event.event.Message = ua.LocalizedText("high_temperature %d" % count) led_event.event.Severity = count #led_event.event.temperature = random.randint(60, 100) led_event.event.onoff = bool(random.randint(0, 1)) led_event.trigger() count += 1 time.sleep(10) if __name__ == "__main__": # now setup our server server = Server() server.set_endpoint("opc.tcp://0.0.0.0:40840/tuyaopcua/server/") server.set_server_name("TuyaOpcUa Driver Demo Device") # set all possible endpoint policies for clients to connect through server.set_security_policy([ ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt, ua.SecurityPolicyType.Basic128Rsa15_Sign, ua.SecurityPolicyType.Basic256_SignAndEncrypt, ua.SecurityPolicyType.Basic256_Sign]) # setup our own namespace uri = "http://" idx = server.register_namespace(uri) # 添加一个 `空调` 对象 air_conditioner = server.nodes.objects.add_object(idx, "AirConditioner") temperature = air_conditioner.add_variable(idx, "temperature", 20) temperature.set_writable() onoff = air_conditioner.add_variable(idx, "onoff", True) onoff.set_writable() air_conditioner.add_method(idx, "set_temperature", set_temperature, [ua.VariantType.UInt32]) air_conditioner.add_method(idx, "set_onoff", set_onoff, [ua.VariantType.Boolean]) # creating a default event object, the event object automatically will have members for all events properties led_event_type = server.create_custom_event_type(idx, 'high_temperature', ua.ObjectIds.BaseEventType, [('temperature', ua.VariantType.UInt32), ('onoff', ua.VariantType.Boolean)]) led_event = server.get_event_generator(led_event_type, air_conditioner) led_event.event.Severity = 300 # start opcua server server.start() print("Start opcua server...") temperature_thread = Temperature(temperature, onoff) temperature_thread.start() try: led_event.trigger(message="This is BaseEvent") while True: time.sleep(5) finally: print("Exit opcua server...") temperature_thread.stop() server.stop() |
|