Mailing List Archive

Multithreading? How?
I need to develop a Python application that is a sort of gateway between
to networks: a "serial bus" network (think of a serial port or a USB
connection) and a MQTT connection (in my case, it's AWS IoT service).

On the bus, a proprietary protocol is implemented. From the bus, the app
knows the status of the system (think of a simple ON/OFF status).
The user can retrieve the status of the system through MQTT: it sends a
message to read/status MQTT topic and receive back a message with the
current status on read/status/reply MQTT topic. Of course, they are just
examples.

On the contrary, when the app detects a status change reported from the
serial bus (think of a transition from ON to OFF), it sends a MQTT message.

I'm thinking to split the application in three classes: Bus, App and
IoT. Bus and IoT are also threads.
The thread of Bus manages the serial protocol, while the thread of IoT
manages MQTT connection with the broker (the server).

However I don't know if it could be a good architecture. Suppone Bus
thread receives a new status from the system. In the context of
ThreadBus, the object Bus could call a method of App object:

app.set_status(new_status_from_the_bus)

In the App I have:

class App():
..
def set_status(new_status): # Could be called from ThreadBus
if new_status != self.new_status:
self.new_status = new_status
# Do some actions on status change

def get_status(): # Could be called from ThreadIoT
return self.status

Of course, IoT object needs to know the current status of the system
when a message is received from MQTT. So ThreadIoT could call
app.get_status().

I think this architecture has some problems with race conditions or
threads synchronization. What happens if IoT calls get_status() exactly
when set_status() called by ThreadBus is executing? If status is a big
data structure, set_status() could be interrupted by get_status() that
could get a completely corrupted status, because it was only partly
updated by set_status().

I know I can use locks or semaphores in get_status() and set_status(),
but I don't know if this is a good approach. Consider that the system is
complex, it isn't composed by a simple single status. It has many many
parameters that are collected from the serial bus. Should I use a lock
for every [get|set]_status(), [get|set]_dimensions(),
[get|set]_battery_level(), [get|set]_mains_present(), and so on?


Another possibility is to use a Queue for Bus and a Queue for IoT. So
the set_status(new_status) called from Bus object will be transformed in
a put in the queue:

app_queue.put({"type": "new_status", "data": ...})

However how could be transformed the get_status() from IoT? How the
return value (the current status) is real retrieved?

class IoT():
..
def status_request_from_MQTT():
app_queue.put({"type": "get_status"})
# How to get the status to return?
return current_status

Should the app put the status on the same queue and should IoT waits for
a new message in the Queue?

def status_request_from_MQTT():
app_queue.put({"type": "get_status"})
try:
current_status = app_queue.get(timeout=10)
except Empty:
# What to do?
return current_status


Again another approach is to avoid multi-threading at all and create a
single "main loop" function that waits at the same time for incoming
events on the serial bus and MQTT (how?). I don't know if this could be
done in my case, because I'm using awscrt Python module and it works
through callbacks that I think is called from another thread.


Any suggestions on this architecture?
--
https://mail.python.org/mailman/listinfo/python-list
Re: Multithreading? How? [ In reply to ]
Hi there,

I hope this e-mail is still on time for you. I have implemented this
architecture a few times, and they all work fine nowadays. However, your
question made me review it and create a small gist.

I suggest you create a thread for every output and input connection. This
makes it easier to focus on reading or writing inside a given object. For
example, in your project, I would separate it into IotReader, IotWriter,
MqttReader, and MqttWriter. Another thing I do to avoid manipulating
multiple locks, semaphores, and so on is to create a central event loop.
For every event that comes from IotReader or MqttReader, I would pack it
into an event and send it to a central thread. This is the main gateway
thread, and I would call it Gateway.

I don't know if you have ever programmed in Android. But the Android
framework uses a similar approach to processing data. Whenever you need to
process something, you start a new 'Thread', and when you need to present
the result in the interface you dispatch events until the main thread is
notified and updates the corresponding Views. The point here is: never do
any extensive processing in the main thread as it is going delay. You will
probably not do it now, but if you ever need it, make a pool of workers to
process this and keep the Gateway free. Replace the Threads with
multiprocessing.Process, as well, as Python lacks true multithreading.

Regarding thread/process communication, I like to implement this using
Queues. The Gateway class would have a main_queue to receive events from
IotReader and MqttReader. IotWriter and MqttWriter have a particular queue
as well. Whenever the Gateway needs to send something to either of them, it
just needs to reference their respective queues, which I wrap inside a
method, for simplicity.

Another benefit of this architecture is the ability to scale to more
connections easily. In the past, I have used this strategy to schedule
tasks for up to about 20 devices (each with an input and output thread). I
believe it could go higher, but I haven't needed to. There are fully
distributed architectures more suitable for hundreds and thousands of
connections, but this is likely not what you need now.

The following is a possible implementation for the IotReader. You need to
replace the AnySerialReader class and its read method with the
initialization of your own Bus wrapper. The read method must have a timeout
parameter if you want to cancel the operation properly. The terminate
method is used to terminate the program properly.


































*class IotReader(Thread): def __init__(self, queue_master,
name='IotReader'): super().__init__()
self.queue_master = queue_master self.queue = Queue()
self.done = False self.name <http://self.name> = name
self.start() def terminate(self): self.done = True
def run(self): log.info <http://log.info>(f"Starting thread for
{self.name <http://self.name>}") serial_reader =
AnySerialReader('Serial' + self.name <http://self.name>) log.info
<http://log.info>(f"Serial reader for {self.name <http://self.name>}
initialized") while not self.done: try:
data = serial_reader.read(timeout=1)
if data is None: continue
self.queue_master.put(('on_iot_event', data)) except:
traceback.print_exc(file=sys.stdout)
log.warning("Terminating IotReader") serial_reader.terminate()*


The following is a possible implementation for IotWriter. It adds a method
named send that adds new tasks to the queue. The main loop, running inside
the thread, waits for these events and calls write in AnySerialWriter. This
may be a slow operation, the connection may be down, and we need to
reconnect, etc. This is why we need a thread for the output message as well.









































*class IotWriter(Thread): def __init__(self, name='IotWriter'):
super().__init__() self.queue = Queue() self.done
= False self.name <http://self.name> = name
self.start() def terminate(self): self.done = True
self.queue.put( ('terminate', None) ) def send(self, data):
self.queue.put( ('write_message', data) ) def run(self):
log.info <http://log.info>(f"Starting thread for {self.name
<http://self.name>}") serial_writer = AnySerialWriter('Serial' +
self.name <http://self.name>) log.info <http://log.info>(f"Serial
writer for {self.name <http://self.name>} initialized")
while not self.done: try: action, data =
self.queue.get() if action == 'terminate':
break elif action ==
'write_message': serial_writer.write(data)
else: log.error(f'Unknown action for
IotWriter - action={action}, data={data}') except:
traceback.print_exc(file=sys.stdout)
log.warning("Terminating IotWriter") serial_writer.terminate()*


I do not include the source for a MqttReceiver and MqttWriter as they are
very similar, in structure, to IotWriter and MqttWriter. The code below is
for the Gateway class. It initializes all readers and writers. Following
that, it will wait for input messages and process them adequately. You
could add as many events as you need. I used two to illustrate.





































































*class Gateway(Thread): def __init__(self): super().__init__()
self.queue_master = Queue() self.done = False
self.start() def run(self): log.info
<http://log.info>("Starting Gateway") while not self.done:
try: self.iot_reader =
IotReader(self.queue_master) self.iot_writer = IotWriter()
self.mqtt_reader =
MqttReader(self.queue_master) self.mqtt_writer =
MqttWriter() log.info <http://log.info>(f"Starting
{self.__class__.__name__}") while not
self.done: try: action, data =
self.queue_master.get() if
action in 'on_mqtt_event':
self.on_mqtt_event(data)
elif action == 'on_iot_event':
self.on_iot_event(data) elif
action == 'terminate': break
else:
log.error(f'Unknown action, action={action}, data={data}')
except: log.error("Error
during message parsing")
traceback.print_exc(file=sys.stdout) except:
log.error("Error during gateway configuration")
traceback.print_exc(file=sys.stdout)
self.iot_reader.terminate() self.iot_writer.terminate()
self.mqtt_reader.terminate() self.mqtt_writer.terminate()
self.iot_reader.join() self.iot_writer.join()
self.mqtt_reader.join() self.mqtt_writer.join()
log.warning('Terminating Gateway') def terminate(self):
self.done = True self.queue_master.put(('terminate', None))
def on_iot_event(self, data): log.info <http://log.info>(f'Event
from iot device, forwarding to mqtt, data={data}')
self.mqtt_writer.send(data) def on_mqtt_event(self, data):
log.info <http://log.info>(f'Event from iot device, forwarding to iot,
data={data}') self.iot_writer.send(data)*


I started the Gateway using the code below. It calls the method terminate
when I press Ctrl+C. This event could also come from the MQTT server or
anywhere else.




















*gateway = Gateway()# Your main thread is free here. You could start a
webserver and display # a dashboard. Or wait, like below.try:
gateway.join()except KeyboardInterrupt: log.info
<http://log.info>("Sending terminate command...")
gateway.terminate()try: gateway.join()except KeyboardInterrupt:
log.info <http://log.info>("Killing the app...") sys.exit(0)
passlog.info <http://log.info>("Bye!")*


If you want to check the full code, a small gist in the link below:
*https://gist.github.com/diegofps/87945a0c3e800c747f3af07833ff6b7e
<https://gist.github.com/diegofps/87945a0c3e800c747f3af07833ff6b7e>*


You also mentioned discovering the device status and sending it back
through MQTT. I can see two approaches to this. The first approach is to
cache the status emitted from the device. This is fine if the data is small
enough to keep in the gateway memory. Then, I would send it back through
MQTT immediately. The second approach is to forward the request to the
device. The device will later respond to your query with the original
question and response. You likely need the question as you need to remember
what you need to do with it now. It is stateless. You could also mix these
two approaches and cache the state for a certain amount of time. After
that, it would expire, and you would ask the device again.

This is an overview of how I implement it nowadays. I am sure other people
may have different strategies and ideas to improve it.

Best,
Diego




On Fri, Apr 28, 2023 at 1:10?PM pozz <pozzugno@gmail.com> wrote:

> I need to develop a Python application that is a sort of gateway between
> to networks: a "serial bus" network (think of a serial port or a USB
> connection) and a MQTT connection (in my case, it's AWS IoT service).
>
> On the bus, a proprietary protocol is implemented. From the bus, the app
> knows the status of the system (think of a simple ON/OFF status).
> The user can retrieve the status of the system through MQTT: it sends a
> message to read/status MQTT topic and receive back a message with the
> current status on read/status/reply MQTT topic. Of course, they are just
> examples.
>
> On the contrary, when the app detects a status change reported from the
> serial bus (think of a transition from ON to OFF), it sends a MQTT message.
>
> I'm thinking to split the application in three classes: Bus, App and
> IoT. Bus and IoT are also threads.
> The thread of Bus manages the serial protocol, while the thread of IoT
> manages MQTT connection with the broker (the server).
>
> However I don't know if it could be a good architecture. Suppone Bus
> thread receives a new status from the system. In the context of
> ThreadBus, the object Bus could call a method of App object:
>
> app.set_status(new_status_from_the_bus)
>
> In the App I have:
>
> class App():
> ..
> def set_status(new_status): # Could be called from ThreadBus
> if new_status != self.new_status:
> self.new_status = new_status
> # Do some actions on status change
>
> def get_status(): # Could be called from ThreadIoT
> return self.status
>
> Of course, IoT object needs to know the current status of the system
> when a message is received from MQTT. So ThreadIoT could call
> app.get_status().
>
> I think this architecture has some problems with race conditions or
> threads synchronization. What happens if IoT calls get_status() exactly
> when set_status() called by ThreadBus is executing? If status is a big
> data structure, set_status() could be interrupted by get_status() that
> could get a completely corrupted status, because it was only partly
> updated by set_status().
>
> I know I can use locks or semaphores in get_status() and set_status(),
> but I don't know if this is a good approach. Consider that the system is
> complex, it isn't composed by a simple single status. It has many many
> parameters that are collected from the serial bus. Should I use a lock
> for every [get|set]_status(), [get|set]_dimensions(),
> [get|set]_battery_level(), [get|set]_mains_present(), and so on?
>
>
> Another possibility is to use a Queue for Bus and a Queue for IoT. So
> the set_status(new_status) called from Bus object will be transformed in
> a put in the queue:
>
> app_queue.put({"type": "new_status", "data": ...})
>
> However how could be transformed the get_status() from IoT? How the
> return value (the current status) is real retrieved?
>
> class IoT():
> ..
> def status_request_from_MQTT():
> app_queue.put({"type": "get_status"})
> # How to get the status to return?
> return current_status
>
> Should the app put the status on the same queue and should IoT waits for
> a new message in the Queue?
>
> def status_request_from_MQTT():
> app_queue.put({"type": "get_status"})
> try:
> current_status = app_queue.get(timeout=10)
> except Empty:
> # What to do?
> return current_status
>
>
> Again another approach is to avoid multi-threading at all and create a
> single "main loop" function that waits at the same time for incoming
> events on the serial bus and MQTT (how?). I don't know if this could be
> done in my case, because I'm using awscrt Python module and it works
> through callbacks that I think is called from another thread.
>
>
> Any suggestions on this architecture?
> --
> https://mail.python.org/mailman/listinfo/python-list
>


--
Diego Souza
Wespa Intelligent Systems
Rio de Janeiro - Brasil
--
https://mail.python.org/mailman/listinfo/python-list