multisocket added

This commit is contained in:
tixi
2018-12-15 15:43:15 +01:00
parent e33229af35
commit b751731096
5 changed files with 599 additions and 128 deletions

View File

@@ -1,14 +1,13 @@
# Domoticz-Tuya-SmartPlug-Plugin
A Domoticz plugin to manage Tuya Smart Plug
A Domoticz plugin to manage Tuya Smart Plug (single and multi socket device)
## ONLY TESTED FOR Raspberry Pi
With Python version 3.5 & Domoticz version 4.9700 (stable) and 4.9999 (beta)
With Python version 3.5 & Domoticz version 4.9700 (stable)
## Prerequisites
This plugin is based on the latest pytuya Python library. For the installation of this library,
This plugin is based on the pytuya Python library. For the installation of this library,
follow the Installation guide below.
See [`https://github.com/clach04/python-tuya/`](https://github.com/clach04/python-tuya/) for more information.
@@ -33,7 +32,9 @@ sudo /etc/init.d/domoticz.sh restart
```
In the web UI, navigate to the Hardware page. In the hardware dropdown there will be an entry called "Tuya SmartPlug".
## Known issue
## Known issues
1/ python environment
Domoticz may not have the path to the pycrypto library in its python environment.
In this case you will observe something starting like that in the log:
@@ -53,6 +54,14 @@ cd ~/domoticz/plugins/Domoticz-Tuya-SmartPlug-Plugin
ln -s /home/pi/.local/lib/python3.5/site-packages/Crypto Crypto
```
2/ Tuya app
The tuya app must be close. This limitation is due to the tuya device itself that support only one connection.
3/ Alternative crypto libraries
PyCryptodome or pyaes can be used instead of pycrypto.
## Updating
Like other plugins, in the Domoticz-Tuya-SmartPlug-Plugin directory:
@@ -68,14 +77,30 @@ sudo /etc/init.d/domoticz.sh restart
| **IP address** | IP of the Smart Plug eg. 192.168.1.231 |
| **DevID** | devID of the Smart Plug |
| **Local Key** | Local Key of the Smart Plug |
| **Debug** | default is False |
| **DPS** | 1 for single socket device and a list of dps separated by ';' for multisocket device eg. 1;2;3;7
| **DPS group** | None for single socket device and a list of list of dps separated by ':' for multisocket device eg. 1;2 : 3;7
| **DPS always ON** | None for single socket device and a list of dps separated by ; for multisocket device eg. 1;2
| **Debug** | default is 0 |
**DPS** should only includes values that correspond to plug's dps id. Be careful some devices also have timers in the dps state.
**DPS group** can be used to group multiple sockets in one Domoticz switch.
**DPS always ON** can be used to force some sockets to be always on (usb for instance).
Helper scripts get_dps.py turnON.py and turnOFF.py can help:
* to determine the dps list
* to check that the needed information are valid (i.e. devID and Local Key) before using the plugin.
## DevID & Local Key Extraction
Recommanded method:
[`https://github.com/codetheweb/tuyapi/blob/master/docs/SETUP.md`](https://github.com/codetheweb/tuyapi/blob/master/docs/SETUP.md)
All the information can be found here:
[`https://github.com/clach04/python-tuya/`](https://github.com/clach04/python-tuya/)
## Acknowledgements
* Special thanks for all the hard work of [clach04](https://github.com/clach04), [codetheweb](https://github.com/codetheweb/) and all the other contributers on [python-tuya](https://github.com/clach04/python-tuya) and [tuyapi](https://github.com/codetheweb/tuyapi) who have made communicating to Tuya devices possible with open source code.
* Special thanks for all the hard work of [clach04](https://github.com/clach04), [codetheweb](https://github.com/codetheweb/) and all the other contributers on [python-tuya](https://github.com/clach04/python-tuya) and [tuyapi](https://github.com/codetheweb/tuyapi) who have made communicating to Tuya devices possible with open source code.
* Domoticz team

66
get_dps.py Executable file
View File

@@ -0,0 +1,66 @@
#!/usr/bin/python3
########################################################################################
# Domoticz Tuya Smart Plug Python Plugin #
# #
# MIT License #
# #
# Copyright (c) 2018 tixi #
# #
# Permission is hereby granted, free of charge, to any person obtaining a copy #
# of this software and associated documentation files (the "Software"), to deal #
# in the Software without restriction, including without limitation the rights #
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell #
# copies of the Software, and to permit persons to whom the Software is #
# furnished to do so, subject to the following conditions: #
# #
# The above copyright notice and this permission notice shall be included in all #
# copies or substantial portions of the Software. #
# #
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR #
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, #
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE #
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER #
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, #
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE #
# SOFTWARE. #
# #
########################################################################################
import sys
import pytuya
import socket #needed for socket.timeout exception
if(len(sys.argv)!=3):
print("usage: " + sys.argv[0] + " <IP> <DevID>")
exit(1)
ip = sys.argv[1]
devid = sys.argv[2]
device = pytuya.OutletDevice(devid,ip,"")
data = 0 #stub for the try except
try:
data = device.status()
except (ConnectionResetError, socket.timeout, OSError) as e:
print("A problem occur please retry...")
exit(1)
print("\nPlug State Information:")
print(data)
print("\nPlug DPS List:")
dps_list = ""
first=True
for key in data['dps'].keys():
if(type (data['dps'][key]) is bool):
if(not first):
dps_list += ";"
dps_list += str(int(key))
first=False
print(dps_list)

500
plugin.py
View File

@@ -25,93 +25,181 @@
# #
########################################################################################
"""
<plugin key="tixi_tuya_smartplug_plugin" name="Tuya SmartPlug" author="tixi" version="2.0.2" externallink=" https://github.com/tixi/Domoticz-Tuya-SmartPlug-Plugin">
<plugin key="tixi_tuya_smartplug_plugin" name="Tuya SmartPlug" author="tixi" version="3.0.0" externallink=" https://github.com/tixi/Domoticz-Tuya-SmartPlug-Plugin">
<params>
<param field="Address" label="IP address" width="200px" required="true"/>
<param field="Mode1" label="DevID" width="200px" required="true"/>
<param field="Mode2" label="Local Key" width="200px" required="true"/>
<param field="Mode3" label="DPS" width="200px" required="true" default="1"/>
<param field="Mode4" label="DPS group" width="200px" required="true" default="None"/>
<param field="Mode5" label="DPS always ON" width="200px" required="true" default="None"/>
<param field="Mode6" label="Debug" width="75px">
<options>
<option label="True" value="Debug"/>
<option label="False" value="Normal" default="true"/>
<option label="0" value="0" default="true"/>
<option label="1" value="1"/>
<option label="2" value="2"/>
<option label="4" value="4"/>
<option label="8" value="8"/>
<option label="16" value="16"/>
<option label="32" value="32"/>
<option label="64" value="64"/>
<option label="128" value="128"/>
</options>
</param>
</params>
</plugin>
"""
# https://wiki.domoticz.com/wiki/Developing_a_Python_plugin
# Debugging
# Value Meaning
# 0 None. All Python and framework debugging is disabled.
# 1 All. Very verbose log from plugin framework and plugin debug messages.
# 2 Mask value. Shows messages from Plugin Domoticz.Debug() calls only.
# 4 Mask Value. Shows high level framework messages only about major the plugin.
# 8 Mask Value. Shows plugin framework debug messages related to Devices objects.
# 16 Mask Value. Shows plugin framework debug messages related to Connections objects.
# 32 Mask Value. Shows plugin framework debug messages related to Images objects.
# 64 Mask Value. Dumps contents of inbound and outbound data from Connection objects.
# 128 Mask Value. Shows plugin framework debug messages related to the message queue.
import Domoticz
import pytuya
import json
########################################################################################
#
# plug object (represents a socket of the Tuya device)
#
########################################################################################
class Plug:
#######################################################################
#
# constructor
#
#######################################################################
def __init__(self,unit):
self.__dps_id = unit # dps id
self.__command = None # command ('On'/'Off'/None)
self.__alwaysON = False # True if the socket should be always ON, False otherwise
return
#######################################################################
# update_state function
# update the domoticz device
# and checks if the last command is equal to the current state
#
# parameters:
# state: True <=> On ; False <=> Off
#
# returns:
# True in case of an error (the state does not correspond to the command)
# False otherwise
#######################################################################
def update_state(self,state): #state: True <=> On ; False <=> Off
if(state):
UpdateDevice(self.__dps_id, 1, "On")
if(self.__command == 'Off'):
return True
else:
self.__command = None
elif(self.__alwaysON): #if not state: need to change the state for always_on devices
self.__command = 'On'
return True
else:
UpdateDevice(self.__dps_id, 0, "Off")
if(self.__command == 'On'):
return True
else:
self.__command = None
return False
#######################################################################
#
# set_command function
# set the command for the next request
#
#######################################################################
def set_command(self,cmd):
if(self.__alwaysON):
self.__command = 'On'
else:
self.__command = cmd
#######################################################################
#
# set_alwaysON function
# set __alwaysON to True
#
#######################################################################
def set_alwaysON(self):
self.__alwaysON = True
self.__command = 'On'
#######################################################################
#
# put_payload function
# add to dict_payload the command to be sent to the device
#
#######################################################################
def put_payload(self,dict_payload):
if(self.__command == None):
return
if(self.__command =="On"):
dict_payload[str(self.__dps_id)] = True
else:
dict_payload[str(self.__dps_id)] = False
########################################################################################
########################################################################################
#
# plugin object
#
########################################################################################
class BasePlugin:
__UNIT = 1
__HB_BASE_FREQ = 2
__VALID_CMD = ('status','On','Off')
#######################################################################
#
# constant definition
#
#######################################################################
__HB_BASE_FREQ = 2 #heartbeat frequency (val x 10 seconds)
__VALID_CMD = ('On','Off') #list of valid command
def __init__(self):
self.__address = None #IP address of the smartplug
self.__devID = None #devID of the smartplug
self.__localKey = None #localKey of the smartplug
self.__device = None #pytuya object of the smartplug
self.__runAgain = self.__HB_BASE_FREQ #heartbeat frequency (20 seconds)
self.__connection = None #connection to the tuya plug
self.__last_cmd = None #last command (None/'On'/'Off'/'status')
#######################################################################
#
# private functions definition
# __extract_status
# __is_encoded
# __command_to_execute
#
#######################################################################
return
#onStart Domoticz function
def onStart(self):
# Debug mode
if(Parameters["Mode6"] == "Debug"):
Domoticz.Debugging(1)
Domoticz.Debug("onStart called")
else:
Domoticz.Debugging(0)
#get parameters
self.__address = Parameters["Address"]
self.__devID = Parameters["Mode1"]
self.__localKey = Parameters["Mode2"]
#initialize the defined device in Domoticz
if (len(Devices) == 0):
Domoticz.Device(Name="Tuya SmartPlug", Unit=self.__UNIT, TypeName="Switch").Create()
Domoticz.Log("Tuya SmartPlug Device created.")
#create the pytuya object
self.__device = pytuya.OutletDevice(self.__devID, self.__address, self.__localKey)
#start the connection
self.__last_cmd = 'status'
self.__connection = Domoticz.Connection(Name="Tuya", Transport="TCP/IP", Address=self.__address, Port="6668")
self.__connection.Connect()
def onConnect(self, Connection, Status, Description):
if (Connection == self.__connection):
if (Status == 0):
Domoticz.Debug("Connected successfully to: "+Connection.Address+":"+Connection.Port)
if(self.__last_cmd != None):
self.__command_to_execute(self.__last_cmd)
else:
Domoticz.Debug("OnConnect Error Status: " + str(Status))
if(Status==113):#no route to host error (skip to avoid intempestive connect call)
return
if(self.__connection.Connected()):
self.__connection.Disconnect()
if(not self.__connection.Connecting()):
self.__connection.Connect()
#######################################################################
#
# __extract_status
#
# Parameter
# Data: a received payload from the tuya smart plug
#
# Returns a tuple (bool,dict)
# first: set to True if an error occur and False otherwise
# second: dict of the dps (irrelevant if first is True )
#
#######################################################################
def __extract_status(self, Data):
""" Returns a tuple (bool,bool)
first: set to True if an error occur and False otherwise
second: set to True if the device is on and to False if the device is off
second is irrelevant if first is True
"""
start=Data.find(b'{"devId')
if(start==-1):
@@ -131,90 +219,276 @@ class BasePlugin:
try:
result = json.loads(result)
return (False,result['dps']['1'])
return (False,result['dps'])
except (JSONError, KeyError) as e:
return (True,"")
#######################################################################
#
# __is_encoded
#
# Parameter
# Data: a received payload from the tuya smart plug
#
# Returns
# True if Data is encoded
# False otherwise
#
# Remark: for debugging purpose
#
#######################################################################
#~ def __is_encoded(self, Data):
#~ tmp = Data[20:-8] # hard coded offsets
#~ if(tmp.startswith(b'3.1')):#PROTOCOL_VERSION_BYTES
#~ return True
#~ else:
#~ return False
#######################################################################
#
# __command_to_execute
# send a command (set or status) to the tuya device
#
#
#######################################################################
def __command_to_execute(self):
self.__runAgain = self.__HB_BASE_FREQ
if(self.__connection.Connected()):
dict_payload = {}
for key in self.__plugs:
self.__plugs[key].put_payload(dict_payload)
if(len(dict_payload) != 0):
self.__state_machine = 1
payload = self.__device.generate_payload('set', dict_payload)
self.__connection.Send(payload)
else:
self.__state_machine = 2
payload=self.__device.generate_payload('status')
self.__connection.Send(payload)
else:
if(not self.__connection.Connecting()):
self.__connection.Connect()
#######################################################################
#
# constructor
#
#######################################################################
def __init__(self):
self.__address = None #IP address of the smartplug
self.__devID = None #devID of the smartplug
self.__localKey = None #localKey of the smartplug
self.__device = None #pytuya object of the smartplug
self.__runAgain = self.__HB_BASE_FREQ #heartbeat frequency
self.__connection = None #connection to the tuya plug
self.__unit2dps_id_list = None #mapping between Unit and list of dps id
self.__plugs = None #mapping between dps id and a plug object
self.__state_machine = 0 #state_machine: 0 -> no waiting msg ; 1 -> set command sent ; 2 -> status command sent
return
#######################################################################
#
# onStart Domoticz function
#
#######################################################################
def onStart(self):
# Debug mode
Domoticz.Debugging(int(Parameters["Mode6"]))
Domoticz.Debug("onStart called")
#get parameters
self.__address = Parameters["Address"]
self.__devID = Parameters["Mode1"]
self.__localKey = Parameters["Mode2"]
#set the next heartbeat
self.__runAgain = self.__HB_BASE_FREQ
#build internal maps (__unit2dps_id_list and __plugs)
self.__unit2dps_id_list = {}
self.__plugs = {}
max_unit = 0
max_dps = 0
for val in sorted(Parameters["Mode3"].split(";")):
self.__unit2dps_id_list[int(val)]=[int(val),]
self.__plugs[int(val)]=Plug(int(val))
if(int(val)>max_unit):
max_unit=int(val)
max_dps = max_unit
#groups management: #syntax: 1;2 : 3;4
max_unit = max_unit + 1
if(Parameters["Mode4"]!="None"):
groups = Parameters["Mode4"].split(":")
for group in groups:
self.__unit2dps_id_list[max_unit]=[]
for val in sorted(group.split(";")):
self.__unit2dps_id_list[max_unit].append(int(val))
max_unit = max_unit + 1
#create domoticz devices
if(len(Devices) == 0):
for val in self.__unit2dps_id_list:
if(val <= max_dps): #single socket dps
Domoticz.Device(Name="Tuya SmartPlug #" + str(val), Unit=val, TypeName="Switch").Create()
Domoticz.Log("Tuya SmartPlug Device #" + str(val) +" created.")
else: #group: selector switch
Options = {"LevelActions": "|",
"LevelNames": "Off|On",
"LevelOffHidden": "false",
"SelectorStyle": "0"}
Domoticz.Device(Name="Tuya SmartPlug #" + str(val), Unit=val, TypeName="Selector Switch", Options=Options).Create()
Domoticz.Log("Tuya SmartPlug Device #" + str(val) +" created.")
#manage always on
if(Parameters["Mode5"]!="None"):
for val in sorted(Parameters["Mode5"].split(";")):
self.__plugs[int(val)].set_alwaysON()
#create the pytuya object
self.__device = pytuya.OutletDevice(self.__devID, self.__address, self.__localKey)
#state machine
self.__state_machine = 0
#start the connection
self.__connection = Domoticz.Connection(Name="Tuya", Transport="TCP/IP", Address=self.__address, Port="6668")
self.__connection.Connect()
#######################################################################
#
# onConnect Domoticz function
#
#######################################################################
def onConnect(self, Connection, Status, Description):
if (Connection == self.__connection):
if (Status == 0):
Domoticz.Debug("Connected successfully to: "+Connection.Address+":"+Connection.Port)
self.__command_to_execute()
else:
Domoticz.Debug("OnConnect Error Status: " + str(Status))
if(Status==113):#no route to host error (skip to avoid intempestive connect call)
return
if(self.__connection.Connected()):
self.__connection.Disconnect()
if(not self.__connection.Connecting()):
self.__connection.Connect()
#######################################################################
#
# onMessage Domoticz function
#
#######################################################################
def onMessage(self, Connection, Data):
Domoticz.Debug("onMessage called: " + Connection.Address + ":" + Connection.Port +" "+ str(Data))
if (Connection == self.__connection):
if(self.__last_cmd == None):#skip nothing was waiting
if(self.__state_machine == 0):#skip nothing was waiting
return
(error,is_on) = self.__extract_status(Data)
if(self.__state_machine == 1):#after a set command: need to ask the status
self.__state_machine = 2
payload=self.__device.generate_payload('status')
self.__connection.Send(payload)#TODO active connection check (it should be because we just get a message)
return
#now self.__state_machine == 2
self.__state_machine = 0
(error,state) = self.__extract_status(Data)
if(error):
self.__command_to_execute(self.__last_cmd)
self.__command_to_execute()
return
if(self.__last_cmd == 'status'):
self.__last_cmd = None
error = False
for key in self.__plugs:
error = error or self.__plugs[key].update_state(state[str(key)])
if(is_on):
UpdateDevice(self.__UNIT, 1, "On")
if(self.__last_cmd == 'On'):
self.__last_cmd = None
if(error):
self.__command_to_execute()
#######################################################################
#
# onCommand Domoticz function
#
#######################################################################
def onCommand(self, Unit, Command, Level, Hue):
Domoticz.Debug("onCommand called for Unit " + str(Unit) + ": Parameter '" + str(Command) + " Level: " + str(Level))
if(Command=="Set Level"): #group (selector switch): convert level to command (0 <=> 'Off' ; 10 <=> 'On')
if(Level==0):
Command = 'Off'
elif(Level==10):
Command = 'On'
else:
UpdateDevice(self.__UNIT, 0, "Off")
if(self.__last_cmd == 'Off'):
self.__last_cmd = None
if(self.__last_cmd != None):
self.__command_to_execute(self.__last_cmd)
def __command_to_execute(self,Command):
Domoticz.Error("Undefined Level: " + str(Level))
return
if(Command not in self.__VALID_CMD):
Domoticz.Error("Undefined command: " + Command)
return
if(Command == 'status'):
if(self.__last_cmd == None):
self.__last_cmd = Command
else:#On/Off
self.__last_cmd = Command
if(self.__connection.Connected()):
if(Command == 'On'):
payload = self.__device.generate_payload('set', {'1':True})
self.__connection.Send(payload)
status_request = True
elif(Command == 'Off'):
payload = self.__device.generate_payload('set', {'1':False})
self.__connection.Send(payload)
status_request = True
else: #(Command == 'status')
status_request = True
if(status_request):
payload=self.__device.generate_payload('status')
self.__connection.Send(payload)
else:
self.__connection.Connect()
def onCommand(self, Unit, Command, Level, Hue):
Domoticz.Debug("onCommand called for Unit " + str(Unit) + ": Parameter '" + str(Command))
self.__command_to_execute(Command)
for val in self.__unit2dps_id_list[Unit]:
self.__plugs[val].set_command(Command)
self.__command_to_execute()
#######################################################################
#
# onDisconnect Domoticz function
#
#######################################################################
def onDisconnect(self, Connection):
Domoticz.Debug("Disconnected from: "+Connection.Address+":"+Connection.Port)
#######################################################################
#
# onHeartbeat Domoticz function
#
#######################################################################
def onHeartbeat(self):
self.__runAgain -= 1
if(self.__runAgain == 0):
self.__runAgain = self.__HB_BASE_FREQ
self.__command_to_execute('status')
self.__command_to_execute()
#onStop Domoticz function
#######################################################################
#
# onStop Domoticz function
#
#######################################################################
def onStop(self):
self.__device = None
self.__last_cmd = None
if(self.__connection.Connected()):
self.__plugs = None
self.__unit2dps_id_list = None
if(self.__connection.Connected() or self.__connection.Connecting()):
self.__connection.Disconnect()
self.__connection = None
self.__state_machine = 0
########################################################################################
#
# Domoticz plugin management
#
########################################################################################
global _plugin
_plugin = BasePlugin()

53
turnOFF.py Executable file
View File

@@ -0,0 +1,53 @@
#!/usr/bin/python3
########################################################################################
# Domoticz Tuya Smart Plug Python Plugin #
# #
# MIT License #
# #
# Copyright (c) 2018 tixi #
# #
# Permission is hereby granted, free of charge, to any person obtaining a copy #
# of this software and associated documentation files (the "Software"), to deal #
# in the Software without restriction, including without limitation the rights #
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell #
# copies of the Software, and to permit persons to whom the Software is #
# furnished to do so, subject to the following conditions: #
# #
# The above copyright notice and this permission notice shall be included in all #
# copies or substantial portions of the Software. #
# #
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR #
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, #
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE #
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER #
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, #
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE #
# SOFTWARE. #
# #
########################################################################################
import sys
import pytuya
import socket #needed for socket.timeout exception
if(len(sys.argv)!=5):
print("usage: " + sys.argv[0] + " <IP> <DevID> <Local key> <DPS value>")
exit(1)
ip = sys.argv[1]
devid = sys.argv[2]
localkey = sys.argv[3]
dps_value = sys.argv[4]
device = pytuya.OutletDevice(devid,ip,localkey)
try:
payload = device.generate_payload('set', {str(dps_value):False})
device._send_receive(payload)
except (ConnectionResetError, socket.timeout, OSError) as e:
print("A problem occur please retry...")
exit(1)

53
turnON.py Executable file
View File

@@ -0,0 +1,53 @@
#!/usr/bin/python3
########################################################################################
# Domoticz Tuya Smart Plug Python Plugin #
# #
# MIT License #
# #
# Copyright (c) 2018 tixi #
# #
# Permission is hereby granted, free of charge, to any person obtaining a copy #
# of this software and associated documentation files (the "Software"), to deal #
# in the Software without restriction, including without limitation the rights #
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell #
# copies of the Software, and to permit persons to whom the Software is #
# furnished to do so, subject to the following conditions: #
# #
# The above copyright notice and this permission notice shall be included in all #
# copies or substantial portions of the Software. #
# #
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR #
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, #
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE #
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER #
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, #
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE #
# SOFTWARE. #
# #
########################################################################################
import sys
import pytuya
import socket #needed for socket.timeout exception
if(len(sys.argv)!=5):
print("usage: " + sys.argv[0] + " <IP> <DevID> <Local key> <DPS value>")
exit(1)
ip = sys.argv[1]
devid = sys.argv[2]
localkey = sys.argv[3]
dps_value = sys.argv[4]
device = pytuya.OutletDevice(devid,ip,localkey)
try:
payload = device.generate_payload('set', {str(dps_value):True})
device._send_receive(payload)
except (ConnectionResetError, socket.timeout, OSError) as e:
print("A problem occur please retry...")
exit(1)