esp32的参考文档:01studio科技
tcp协议以及mqtt协议的原理不做介绍,有兴趣的老哥自行csdn搜索。
巴法云官网:https://cloud.bemfa.com/
(巴法云怎么操作可以多看看官网的接入文档)
下面附上 代码片
。
import network,usocket,time
from machine import I2C,Pin,Timer
from ssd1306 import SSD1306_I2C #oled库
#初始化相关模块
i2c = I2C(sda=Pin(4), scl=Pin(0))#oled显示相关配置
oled = SSD1306_I2C(128, 64, i2c, addr=0x3c)
# WIFI 连接函数
def WIFI_Connect():
WIFI_LED=Pin(2, Pin.OUT) #初始化 WIFI 指示灯
wlan = network.WLAN(network.STA_IF) #STA 模式
wlan.active(True) #激活接口
start_time=time.time() #记录时间做超时判断
if not wlan.isconnected():
print('connecting to network...')
wlan.connect('WIFI名称', 'WIFI密码') #输入 WIFI 账号密码
while not wlan.isconnected():
#LED 闪烁提示
WIFI_LED.value(1)
time.sleep_ms(300)
WIFI_LED.value(0)
time.sleep_ms(300)
#超时判断,15 秒没连接成功判定为超时
if time.time()-start_time > 15 :
print('WIFI Connected Timeout!')
break
if wlan.isconnected():
#LED 点亮
WIFI_LED.value(1)
print('network information:', wlan.ifconfig())
#OLED 数据显示
oled.fill(0) #清屏背景黑色
oled.text('IP/Subnet/GW:',0,0)
oled.text(wlan.ifconfig()[0], 0, 20)
oled.text(wlan.ifconfig()[1],0,38)
oled.text(wlan.ifconfig()[2],0,56)
oled.show()
return True
else:
return False
if WIFI_Connect():
s=usocket.socket()#UDP协议的话把下面内容放括号里就ok
#usocket.AF_INET,usocket.SOCK_DGRAM,usocket.IPPROTO_UDP
addr=('192.168.1.11',10000) #服务器 IP 和端口
#这个是自己定的,根据收消息的设备的ip地址来定
s.connect(addr)
s.send('Welcome!')
#连接巴法云代码如下
s1=usocket.socket()
addr1=("bemfa.com",8340) #服务器 IP 和端口
s1.connect(addr1)
s1.send("cmd=1&uid=xxxxxxx&topic=tttttt\r\n")
#xxxxx为自己在巴法云的私钥tttttt为创建的主题名称
MQTT会用到下面的simple.py文件,下面是获取方式:
simple.py文件开源地址:https://github.com/micropython/micropython-lib/blob/master/micropython/umqtt.simple/umqtt/simple.py
此文件也可从vscode中的micropython库中直接获得:
附上完整 代码片
:
import gc
import _thread
import time,network,usocket
from machine import UART
from machine import I2C,Pin,ADC,Timer #从 machine 模块导入 I2C、Pin 子模块
from ssd1306 import SSD1306_I2C #从 ssd1306 模块中导入 SSD1306_I2C 子模块
exec(open('./socket.py').read(),globals())
i2c = I2C(sda=Pin(4), scl=Pin(0)) #I2C 初始化:sda-->4, scl -->0
#OLED 显示屏初始化:128*64 分辨率,OLED 的 I2C 地址是 0x3c
oled = SSD1306_I2C(128, 64, i2c, addr=0x3c)
adc = ADC(Pin(32))
def oledThread():
while 1:
oled.fill(0) # 清屏显示黑色背景
oled.text('self ip/servo ip', 0, 0)
oled.text('198.168.1.1', 0, 15)
oled.text('198.168.1.1', 0, 30)
#获取ADC数值
oled.text(str(adc.read()),0,45)
oled.text('(4095)',40,45)
oled.show()
time.sleep_ms(200)
def tcpThread():
# s=usocket.socket()#usocket.AF_INET,usocket.SOCK_DGRAM,usocket.IPPROTO_UDP
# addr=('192.168.1.1',10000) #服务器 IP 和端口
# s.connect(addr)
# s.send('Welcome!')
while True :
s.send(('adc value is:'+str(adc.read())).encode('utf-8'))
time.sleep_ms(300)
def mqttThread():
# s1=usocket.socket()#usocket.AF_INET,usocket.SOCK_DGRAM,usocket.IPPROTO_UDP
# addr1=("bemfa.com",8340) #服务器 IP 和端口
# s1.connect(addr1)
# s1.send("cmd=1&uid=xxxxxx&topic=tttttt\r\n")
while True :
# data = s.recv(1024)
# if data == ' ':
# pass
# else:
# print(data.decode('utf-8'))
message_adc=(str(adc.read()).encode('utf-8'))
print(message_adc)
s1.send("cmd=2&uid=xxxxxx&topic=tttttt&msg={}\r\n".format(message_adc))
time.sleep_ms(300)
if WIFI_Connect():
s=usocket.socket()#usocket.AF_INET,usocket.SOCK_DGRAM,usocket.IPPROTO_UDP
addr=('192.168.1.1',10000) #服务器 IP 和端口
s.connect(addr)
s.send('Welcome!')
s1=usocket.socket()#usocket.AF_INET,usocket.SOCK_DGRAM,usocket.IPPROTO_UDP
addr1=("bemfa.com",8340) #服务器 IP 和端口
s1.connect(addr1)
s1.send("cmd=1&uid=xxxxxx&topic=tttttt\r\n")
_thread.start_new_thread(oledThread, ())
_thread.start_new_thread(tcpThread, ())
_thread.start_new_thread(mqttThread, ())
版权声明:本文为CSDN博主「金牌厨师长」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/jojo1_csdn/article/details/122156018
esp32的参考文档:01studio科技
tcp协议以及mqtt协议的原理不做介绍,有兴趣的老哥自行csdn搜索。
巴法云官网:https://cloud.bemfa.com/
(巴法云怎么操作可以多看看官网的接入文档)
下面附上 代码片
。
import network,usocket,time
from machine import I2C,Pin,Timer
from ssd1306 import SSD1306_I2C #oled库
#初始化相关模块
i2c = I2C(sda=Pin(4), scl=Pin(0))#oled显示相关配置
oled = SSD1306_I2C(128, 64, i2c, addr=0x3c)
# WIFI 连接函数
def WIFI_Connect():
WIFI_LED=Pin(2, Pin.OUT) #初始化 WIFI 指示灯
wlan = network.WLAN(network.STA_IF) #STA 模式
wlan.active(True) #激活接口
start_time=time.time() #记录时间做超时判断
if not wlan.isconnected():
print('connecting to network...')
wlan.connect('WIFI名称', 'WIFI密码') #输入 WIFI 账号密码
while not wlan.isconnected():
#LED 闪烁提示
WIFI_LED.value(1)
time.sleep_ms(300)
WIFI_LED.value(0)
time.sleep_ms(300)
#超时判断,15 秒没连接成功判定为超时
if time.time()-start_time > 15 :
print('WIFI Connected Timeout!')
break
if wlan.isconnected():
#LED 点亮
WIFI_LED.value(1)
print('network information:', wlan.ifconfig())
#OLED 数据显示
oled.fill(0) #清屏背景黑色
oled.text('IP/Subnet/GW:',0,0)
oled.text(wlan.ifconfig()[0], 0, 20)
oled.text(wlan.ifconfig()[1],0,38)
oled.text(wlan.ifconfig()[2],0,56)
oled.show()
return True
else:
return False
if WIFI_Connect():
s=usocket.socket()#UDP协议的话把下面内容放括号里就ok
#usocket.AF_INET,usocket.SOCK_DGRAM,usocket.IPPROTO_UDP
addr=('192.168.1.11',10000) #服务器 IP 和端口
#这个是自己定的,根据收消息的设备的ip地址来定
s.connect(addr)
s.send('Welcome!')
#连接巴法云代码如下
s1=usocket.socket()
addr1=("bemfa.com",8340) #服务器 IP 和端口
s1.connect(addr1)
s1.send("cmd=1&uid=xxxxxxx&topic=tttttt\r\n")
#xxxxx为自己在巴法云的私钥tttttt为创建的主题名称
MQTT会用到下面的simple.py文件,下面是获取方式:
simple.py文件开源地址:https://github.com/micropython/micropython-lib/blob/master/micropython/umqtt.simple/umqtt/simple.py
此文件也可从vscode中的micropython库中直接获得:
附上完整 代码片
:
import gc
import _thread
import time,network,usocket
from machine import UART
from machine import I2C,Pin,ADC,Timer #从 machine 模块导入 I2C、Pin 子模块
from ssd1306 import SSD1306_I2C #从 ssd1306 模块中导入 SSD1306_I2C 子模块
exec(open('./socket.py').read(),globals())
i2c = I2C(sda=Pin(4), scl=Pin(0)) #I2C 初始化:sda-->4, scl -->0
#OLED 显示屏初始化:128*64 分辨率,OLED 的 I2C 地址是 0x3c
oled = SSD1306_I2C(128, 64, i2c, addr=0x3c)
adc = ADC(Pin(32))
def oledThread():
while 1:
oled.fill(0) # 清屏显示黑色背景
oled.text('self ip/servo ip', 0, 0)
oled.text('198.168.1.1', 0, 15)
oled.text('198.168.1.1', 0, 30)
#获取ADC数值
oled.text(str(adc.read()),0,45)
oled.text('(4095)',40,45)
oled.show()
time.sleep_ms(200)
def tcpThread():
# s=usocket.socket()#usocket.AF_INET,usocket.SOCK_DGRAM,usocket.IPPROTO_UDP
# addr=('192.168.1.1',10000) #服务器 IP 和端口
# s.connect(addr)
# s.send('Welcome!')
while True :
s.send(('adc value is:'+str(adc.read())).encode('utf-8'))
time.sleep_ms(300)
def mqttThread():
# s1=usocket.socket()#usocket.AF_INET,usocket.SOCK_DGRAM,usocket.IPPROTO_UDP
# addr1=("bemfa.com",8340) #服务器 IP 和端口
# s1.connect(addr1)
# s1.send("cmd=1&uid=xxxxxx&topic=tttttt\r\n")
while True :
# data = s.recv(1024)
# if data == ' ':
# pass
# else:
# print(data.decode('utf-8'))
message_adc=(str(adc.read()).encode('utf-8'))
print(message_adc)
s1.send("cmd=2&uid=xxxxxx&topic=tttttt&msg={}\r\n".format(message_adc))
time.sleep_ms(300)
if WIFI_Connect():
s=usocket.socket()#usocket.AF_INET,usocket.SOCK_DGRAM,usocket.IPPROTO_UDP
addr=('192.168.1.1',10000) #服务器 IP 和端口
s.connect(addr)
s.send('Welcome!')
s1=usocket.socket()#usocket.AF_INET,usocket.SOCK_DGRAM,usocket.IPPROTO_UDP
addr1=("bemfa.com",8340) #服务器 IP 和端口
s1.connect(addr1)
s1.send("cmd=1&uid=xxxxxx&topic=tttttt\r\n")
_thread.start_new_thread(oledThread, ())
_thread.start_new_thread(tcpThread, ())
_thread.start_new_thread(mqttThread, ())
版权声明:本文为CSDN博主「金牌厨师长」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/jojo1_csdn/article/details/122156018
暂无评论