Feedback
Ok. Another example in MicroPython. Almost a complete one at that. There is almost too much to mention.
#!/usr/bin/env pybricks-micropython
from pybricks import ev3brick as brick
from pybricks.ev3devices import Motor, UltrasonicSensor, ColorSensor, GyroSensor, InfraredSensor
from pybricks.parameters import Port,Color
from pybricks.robotics import DriveBase
from pybricks.tools import print, wait, StopWatch
from time import sleep
import threading
import sys
import random
import socket
# YOU WILL NEED TO PUT THE ID ADDRESS of the platform this template is running on here!
hostIPA = "192.168.1.1"
port = random.randint(50000,50999)
left = Motor(Port.C)
right = Motor(Port.B)
# 56 is the diameter of the wheels in mm
# 114 is the distance between them in mm
robot = DriveBase(left, right, 56, 114)
cl = ColorSensor(Port.S1)
us = UltrasonicSensor(Port.S3)
ir = InfraredSensor(Port.S4)
gy = GyroSensor(Port.S2)
print("host ",hostIPA)
print("port",port)
online = True
renamed = False
transmit = True
peerMode = False
speed = 100
currentPitch = 0
currentRoll = 0
lastPitch = 0
lastRoll = 0
def configure(client):
global renamed
if renamed == True:
return
try:
# configure titles on buttons/ports
ports = ["title:1P:color","title:2P:gyro","title:3P:ultra","title:4P:infra","title:DP:speed","hide:AP:spare","title:BP:motorA","title:CP:motorB"]
for port in ports:
print("title",port)
data = bytes(port + "\n","utf-8")
client.send(data)
sleep(0.2)
sleep(0.2)
# configure buttons on buttons/ports
buttons = ["title:2S:forward","title:8S:back","title:4S:left","title:6S:right","title:1S:on","title:3S:off","title:7S:faster","title:9S:slower","title:5S:exit"]
for button in buttons:
print("title",button)
data = bytes(button + "\n","utf-8")
client.send(data)
sleep(0.2)
sleep(0.2)
renamed = True
# change the tags returns on buttons
#tags = ["tag:1S:1Q\n","tag:2S:2Q\n","tag:3S:3Q"]
#for tag in tags:
# print("tag",tag)
# data = bytes(tag,"utf-8")
# client.send(data)
# sleep(0.2)
# configure the colors of buttons black, blue, brown, cyan, green, magenta, orange, purpule, red, yellow, white, clear
bcolors = ["bcolor:1S:red","bcolor:2S:red","bcolor:3S:red","bcolor:4S:red","bcolor:5S:red","bcolor:6S:red","bcolor:7S:red","bcolor:8S:red","bcolor:9S:red"]
for color in bcolors:
print("color",color)
data = bytes(color + "\n","utf-8")
client.send(data)
sleep(0.2) # longer sleep needed else too quick and fails
# configure the colors of font on buttons black, blue, brown, cyan, green, magenta, orange, purpule, red, yellow, white, clear
fcolors = ["fcolor:1S:white","fcolor:2S:white","fcolor:3S:white","fcolor:4S:white","fcolor:5S:white","fcolor:6S:white","fcolor:7S:white","fcolor:8S:white","fcolor:9S:white"]
sleep(0.2)
for color in fcolors:
print("color",color)
data = bytes(color + "\n","utf-8")
client.send(data)
sleep(0.2) # longer sleep needed else too quick and fails
return(True)
feed = True
except AssertionError as error:
print("Closing config socket",error)
client.close()
s.close()
exit()
def colorFeed():
print("color")
global transmit
while transmit:
try:
# MUST feed strings ONLY port 1
color = cl.color()
if color == Color.BLUE:
colorFeed = "1P:blue\n"
elif color == Color.RED:
colorFeed = "1P:red\n"
elif color == Color.YELLOW:
colorFeed = "1P:yellow\n"
elif color == Color.ORANGE:
colorFeed = "1P:orange\n"
elif color == Color.GREEN:
colorFeed = "1P:green\n"
elif color == Color.PURPLE:
colorFeed = "1P:purple\n"
elif color == Color.BROWN:
colorFeed = "1P:brown\n"
elif color == Color.WHITE:
colorFeed = "1P:white\n"
elif color == Color.BLACK:
colorFeed = "1P:black\n"
elif color == None:
colorFeed = "1P:None\n"
data2 = bytes(colorFeed, 'utf-8')
client_s.send(data2)
sleep(0.5)
except AssertionError as error:
print("Closing color socket",error)
client_s.close()
s.close()
def ultraSonicFeed():
global transmit
while transmit:
try:
# MUST feed strings ONLY port 3
distance = us.distance()
ultraSonicFeed = "3P:" + str(distance)+ "\n"
data3 = bytes(ultraSonicFeed, 'utf-8')
client_s.send(data3)
sleep(0.5)
except AssertionError as error:
print("Closing ultra socket",error)
client_s.close()
s.close()
def gyroFeed():
global transmit
while transmit:
try:
# MUST feed strings ONLY port 2
gyro = gy.angle()
gyroFeed = "2P:" + str(gyro)+ "\n"
data1 = bytes(gyroFeed, 'utf-8')
client_s.send(data1)
sleep(0.5)
except AssertionError as error:
print("Closing gyro socket",error)
client_s.close()
s.close()
def infraredFeed():
global transmit
while transmit:
# MUST feed strings ONLY
# all strings MUST be teminated with a return
try:
distance = ir.distance()
feedme = "4P:" + str(distance) + "\n"
data2 = bytes(feedme, "utf-8")
client_s.send(data2)
sleep(1)
except AssertionError as error:
print("portFeed",error)
online = False
client_s.close()
def superTap(data):
global speed
if data == "1S":
robot.drive(speed,0)
elif data == "2S":
pass
elif data == "3S":
robot.stop()
elif data == "4S":
robot.drive(speed/2,25)
elif data == "5S":
sys.exit()
elif data == "6S":
robot.drive(speed/2,-25)
elif data == "7S":
if speed > 10:
speed = speed - 10
print("speed",speed)
elif data == "8S":
robot.drive(-speed,0)
elif data == "9S":
if speed < 100:
speed = speed + 10
print("speed",speed)
def actionTrigger(data, client):
global transmit
if data[:5] == "#:end":
stopMotors()
brick.sound.beep()
peerMode = False
if data[:6] == "#:peer":
peerMode = True
if data[:7] == "#:begin":
pass
if data[:8] == "#:keypad":
configure(client)
transmit = True
if data[:10] == "#:touchpad":
pass
if data[:8] == "#:motion":
pass
if data[:5] == "#:con": # connected
brick.sound.beep()
doFeeds()
if data[:5] == "#:dis": # disconnect
brick.sound.beep()
sleep(1)
brick.sound.beep()
exit()
if data[:8] == "#:short":
stopMotors()
brick.sound.beep()
if data[:6] == "#:long":
stopMotors()
brick.sound.beep()
if data[:6] == "#:long":
stopMotors()
brick.sound.beep()
def stopMotors():
print("STOP STOP STOP ")
robot.stop()
def joystick(pitch, roll):
global lastPitch
global lastRoll
calcPitch = int(round(pitch / 180 * 100,0)) # needs to be an integer
calcRoll = int(round(roll / 720 * 100,0)) # turns need to be slow and deliberate, this reduces roll by 4
if lastPitch != calcPitch or lastRoll != calcRoll:
print("calc",calcPitch,calcRoll)
robot.drive(calcPitch,calcRoll)
lastPitch = calcPitch
lastRoll = calcRoll
def nextMove(pitch, roll):
global currentPitch
global currentRoll
global lastPitch
global lastRoll
if currentPitch == 0:
currentPitch = round(pitch / 18 * 100,0)
currentRoll = round(roll / 72 * 100,0) # turns need to be slow and deliberate, this reduces roll by 4
return
# convert values into percentages and make gradual change
newPitch = round((((pitch / 180 * 100) + currentPitch)/2),0)
newRoll = round((((roll / 720 * 100) + currentRoll)/2),0)
currentPitch = newPitch
currentRoll = newRoll
try:
if lastPitch != currentPitch or lastRoll != currentRoll:
#joy_pair.on(currentRoll,currentPitch,radius=100)
#joystick(currentPitch, currentRoll)
robot.drive(currentPitch,currentRoll)
lastPitch = currentPitch
lastRoll = currentRoll
except AssertionError as error:
print("oh foo bar",error)
pass
def doFeeds():
print("dofeeds")
t1.start()
t2.start()
t3.start()
t4.start()
t1 = threading.Thread(target=infraredFeed)
t2 = threading.Thread(target=colorFeed)
t3 = threading.Thread(target=ultraSonicFeed)
t4 = threading.Thread(target=gyroFeed)
ai = socket.getaddrinfo(hostIPA,port)
addr = ai[0][-1]
backlog = 5
size = 1024
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(backlog)
try:
res = s.accept()
while online:
client_s = res[0]
#client_addr = res[1]
req =client_s.recv(1024)
data=req.decode('utf-8')
if data[:2] == "#:":
actionTrigger(data, client_s)
elif data[:2] == "=:":
superTap(data[-2:])
elif data[:2] == "@:":
mCommands = data.split("\n")
for mCommand in mCommands:
if len(mCommand) != 0:
cords = mCommand[2:]
cord = cords.split(":")
try:
roll = float(cord[0])
pitch = float(cord[1])
joystick(pitch,roll)
except: # AssertionError as error:
#print("error",error)
pass
elif data[:2] == "&:": # analog or motion co-ordinate stream peer
mCommands = data.split("\n")
for mCommand in mCommands:
if len(mCommand) != 0:
cord = mCommand.split(":")
try:
pitch = float(cord[3]) # max 157?
roll = float(cord[2]) # max 314?
# left or right or both
if cord[1] == "10":
if abs(roll) > abs(pitch):
nextMove(currentPitch,roll)
# up or down or both
if cord[1] == "5":
if abs(pitch) > abs(roll):
nextMove(pitch,roll)
# up, down, left and right
if (cord[1] == "15"):
nextMove(pitch, roll)
# left only restict up/down
if cord[1] == "2":
if abs(pitch) < abs(roll):
# current pitch may be zero if you have never moved
if currentPitch == 0:
currentPitch = pitch
nextMove(currentPitch, roll)
# right only restict up/down
if cord[1] == "8":
if abs(pitch) < abs(roll):
# current pitch may be zero if you have never moved
if currentPitch == 0:
currentPitch = pitch
nextMove(currentPitch, roll)
# up only, restrict left/right
if cord[1] == "1":
# only go forward if left/right tilt is less than pitch
if abs(pitch) > abs(roll):
nextMove(pitch, roll)
# down only, restrict left/right
if cord[1] == "4":
print("this one 4",pitch,roll)
# only go backwards if left/right tilt is less than pitch
if pitch > abs(roll):
print("move ahole")
nextMove(pitch, roll)
# left + up
if cord[1] == "9":
if roll < 0 and pitch > 0:
nextMove(pitch, roll)
# right + down
if cord[1] == "6" and roll > 0 and pitch < 0:
nextMove(pitch, roll)
# right + up
if cord[1] == "3" and roll > 0 and pitch > 0:
nextMove(pitch, roll)
# left + down
if cord[1] == "12" and roll < 0 and pitch < 0:
nextMove(pitch, roll)
except: # AssertionError as error:
# we skip any errors cause we know we will have too much data anyway
pass
except AssertionError as error:
print("Closing socket",error)
client_s.close()
s.close()