Skip to content

Commit dd63ba7

Browse files
committed
Begin publish/subscribe framework
Adding support for application capabilities Discard incomplete messages after a while
1 parent d901327 commit dd63ba7

File tree

6 files changed

+227
-90
lines changed

6 files changed

+227
-90
lines changed

MeshBase.cpp

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@
1313
#define TO_ADDRESS(x) (0xAA00000000LL + x)
1414

1515
#define PEER_DISCOVERY_TIME 4000
16-
#define PEER_CHECK_TIME 4000
16+
#define CHECK_TIME 4000
1717
#define PEER_TIMEOUT 3
18+
#define ASSEMBLY_TIMEOUT 2
1819

1920
MeshBase::MeshBase(uint8_t ce, uint8_t cs)
2021
: radio(ce, cs)
2122
, address(0)
2223
, last_broadcast_time(0)
23-
, last_peer_check_time(0)
24+
, last_check_time(0)
25+
, application_capabilities(0)
2426
{}
2527

2628
void MeshBase::Begin()
@@ -56,23 +58,47 @@ void MeshBase::Update()
5658
} while (!done);
5759
}
5860

59-
// Update peers
60-
if (millis() - last_peer_check_time > PEER_CHECK_TIME)
61+
// Do periodic checks
62+
if (millis() - last_check_time > CHECK_TIME)
6163
{
62-
LinkedList<Peer>::Node* current = peers.first;
63-
while(current != NULL)
64+
// Check for expired peers
6465
{
65-
current->item->time += 1;
66-
if (current->item->time >= PEER_TIMEOUT)
66+
LinkedList<Peer>::Node* current = peers.first;
67+
while(current != NULL)
6768
{
68-
Serial.print("Lost Peer: ");
69-
Serial.println(current->item->address, DEC);
70-
current = peers.Remove(current);
71-
} else {
72-
current = current->next;
69+
current->item->time += 1;
70+
if (current->item->time >= PEER_TIMEOUT)
71+
{
72+
Serial.print("Lost Peer: ");
73+
Serial.println(current->item->address, DEC);
74+
current = peers.Remove(current);
75+
} else {
76+
current = current->next;
77+
}
7378
}
7479
}
75-
last_peer_check_time = millis();
80+
81+
// Check for expired packets
82+
{
83+
Message* current = assembly_list.first;
84+
while(current != NULL)
85+
{
86+
current->age += 1;
87+
if (current->age >= ASSEMBLY_TIMEOUT)
88+
{
89+
Serial.print("Dropped partial message. address=");
90+
Serial.print(current->header.address_from, DEC);
91+
Serial.print(" msg_id=");
92+
Serial.print(current->header.msg_id);
93+
Serial.print(" blocks_recieved=");
94+
Serial.println(current->blocks_recieved);
95+
current = assembly_list.Remove(current);
96+
} else {
97+
current = current->next;
98+
}
99+
}
100+
}
101+
last_check_time = millis();
76102
}
77103
}
78104

@@ -101,6 +127,7 @@ void MeshBase::Message::AddPart(const void* payload, uint8_t len, uint8_t part_n
101127
header.split_more = false;
102128
header.split_part = part_num;
103129
}
130+
age = 0;
104131
}
105132

106133
bool MeshBase::Message::IsDone() const
@@ -110,16 +137,8 @@ bool MeshBase::Message::IsDone() const
110137
// So if split_more is false, and we have the right number of blocks_recieved
111138
// we are good to go.
112139
if (!header.split_more && blocks_recieved > header.split_part) {
113-
if (blocks_recieved > 1) {
114-
Serial.print(" R IsDone() : id=");
115-
Serial.print(header.msg_id);
116-
Serial.println(" - True **");
117-
}
118140
return true;
119141
}
120-
Serial.print(" R IsDone() : id=");
121-
Serial.print(header.msg_id);
122-
Serial.println(" - False");
123142
return false;
124143
}
125144

@@ -175,11 +194,12 @@ void MeshBase::HandlePeerDiscovery(const MeshBase::MessageHeader* msg, const voi
175194
Serial.print(" num_peers=");
176195
Serial.println(pd->num_peers, DEC);
177196
Peer* p = new Peer(msg->address_from);
197+
p->Update(pd);
178198
peers.Add(p);
179199
OnNewPeer(p);
180200
} else {
181201
// Existing peer, reset timer
182-
peer->time = 0;
202+
peer->Update(pd);
183203
}
184204
}
185205

@@ -189,7 +209,7 @@ void MeshBase::SendPeerDiscovery()
189209
MeshBase::PeerDiscoveryMessage payload;
190210
payload.protocol_version = 1;
191211
payload.network_capabilities = 0;
192-
payload.application_capabilities = 0;
212+
payload.application_capabilities = application_capabilities;
193213
payload.num_peers = peers.length;
194214
payload.uptime = millis() / 1000;
195215
SendMessage(PEER_DISCOVERY, type_peer_discovery, &payload, sizeof(payload), true);
@@ -273,3 +293,9 @@ MeshBase::Peer* MeshBase::GetPeer(uint32_t a)
273293
return NULL;
274294
}
275295

296+
void MeshBase::Peer::Update(const PeerDiscoveryMessage* msg)
297+
{
298+
application_capabilities = msg->application_capabilities;
299+
time = 0;
300+
}
301+

MeshBase.h

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,63 +7,82 @@
77

88
#define PACKED __attribute__ ((packed))
99

10+
typedef uint32_t address_t;
11+
1012
class MeshBase
1113
{
1214
public:
1315
MeshBase(uint8_t ce, uint8_t cs);
1416

15-
struct Peer {
16-
uint32_t address;
17-
uint16_t time;
18-
Peer(uint32_t address) : address(address), time(0) {}
19-
};
20-
2117
struct MessageHeader {
2218
uint8_t protocol_version : 4;
2319
uint8_t ttl : 4;
2420
uint8_t msg_id;
2521
bool split_more : 1;
2622
uint8_t split_part : 7;
2723
uint8_t type;
28-
uint32_t address_from;
24+
address_t address_from;
2925
} PACKED;
3026

3127
struct Message {
32-
Message(const MessageHeader& a) : header(a), data(NULL), data_used(0), blocks_recieved(0), next(0), prev(0) {}
28+
Message(const MessageHeader& a) : header(a), data(NULL), data_used(0), blocks_recieved(0), next(NULL), prev(NULL), age(0) {}
3329
~Message();
3430
MessageHeader header;
3531
void* data;
3632
uint8_t data_used;
3733
uint8_t blocks_recieved;
3834
Message* next;
3935
Message* prev;
36+
uint8_t age;
4037

4138
void AddPart(const void* data, uint8_t len, uint8_t part_num, bool more_parts);
4239
bool IsDone() const;
4340
};
4441

4542
// -- Message types --
46-
enum message_type {
43+
enum MessageType {
4744
type_peer_discovery,
4845
type_peer_list,
4946
type_user,
5047
};
5148

49+
enum ApplicationCapabilities {
50+
capability_publish_events = 1 >> 0,
51+
};
52+
5253
void Begin();
5354
void Update();
54-
void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length);
55-
void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length, bool is_broadcast);
56-
uint32_t GetAddress() const { return address; }
55+
void SendMessage(address_t address, uint8_t type, const void* data, uint8_t length);
56+
void SendMessage(address_t address, uint8_t type, const void* data, uint8_t length, bool is_broadcast);
57+
address_t GetAddress() const { return address; }
5758
bool IsReady() const { return address != 0; }
5859
protected:
60+
struct PeerDiscoveryMessage
61+
{
62+
uint8_t protocol_version;
63+
uint8_t network_capabilities; // What routing/networking can I do for the network
64+
uint8_t application_capabilities; // What type of data do I expose
65+
uint16_t num_peers; // Number of direct peers
66+
uint32_t uptime; // Seconds since boot
67+
} PACKED;
68+
69+
struct Peer {
70+
Peer(uint32_t address) : address(address), time(0), application_capabilities(0) {}
71+
uint32_t address;
72+
uint8_t time;
73+
uint8_t application_capabilities;
74+
void Update(const PeerDiscoveryMessage* msg);
75+
};
76+
5977
virtual void OnMessage(const MessageHeader* meta, const void* data, uint8_t length) = 0;
6078
virtual void OnNewPeer(Peer*) {}
6179
virtual void OnLostPeer(Peer*) {}
80+
uint8_t application_capabilities;
6281
private:
6382
uint32_t address;
6483
RF24 radio;
6584
unsigned long last_broadcast_time;
66-
unsigned long last_peer_check_time;
85+
unsigned long last_check_time;
6786

6887
void SendPeerDiscovery();
6988
void HandlePeerDiscovery(const MessageHeader* msg, const void* buff, uint8_t length);
@@ -75,15 +94,6 @@ class MeshBase
7594

7695
Peer* GetPeer(uint32_t address);
7796

78-
struct PeerDiscoveryMessage
79-
{
80-
uint8_t protocol_version;
81-
uint8_t network_capabilities; // What routing/networking can I do for the network
82-
uint8_t application_capabilities; // What type of data do I expose
83-
uint16_t num_peers; // Number of direct peers
84-
uint32_t uptime; // Seconds since boot
85-
} PACKED;
86-
8797
};
8898

8999
#endif

Publisher.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#ifndef PUBLISHER_H
2+
#define PUBLISHER_H
3+
4+
#include <Arduino.h>
5+
#include "MeshBase.h"
6+
7+
class PublishApp : public MeshBase
8+
{
9+
public:
10+
PublishApp() : MeshBase(9, 10)
11+
{
12+
application_capabilities |= MeshBase::capability_publish_events;
13+
}
14+
15+
void OnEvent(uint8_t event_data)
16+
{
17+
const Target* current = targets.first;
18+
while (current != NULL)
19+
{
20+
SendMessage(current->address, type_on_event, &event_data, sizeof(event_data));
21+
current = current->next;
22+
}
23+
}
24+
25+
enum PublishMessageType {
26+
type_on_event = MeshBase::type_user,
27+
type_subscribe,
28+
};
29+
protected:
30+
virtual void OnMessage(const MeshBase::MessageHeader* meta, const void* data, uint8_t length)
31+
{
32+
if (meta->type == type_subscribe)
33+
{
34+
targets.Add(new Target(meta->address_from));
35+
}
36+
}
37+
private:
38+
struct Target
39+
{
40+
Target(address_t target) : address(target), prev(NULL), next(NULL) {}
41+
address_t address;
42+
Target* prev;
43+
Target* next;
44+
};
45+
LinkedList2<Target> targets;
46+
};
47+
48+
#endif // PUBLISHER_H

RF_test.ino

Lines changed: 0 additions & 44 deletions
This file was deleted.

examples/Publisher/Publisher.ino

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#include <SPI.h>
2+
#include "RF24.h"
3+
#include "MeshBase.h"
4+
#include "Publisher.h"
5+
#include "LinkedList.h"
6+
7+
PublishApp app;
8+
9+
unsigned long last_time;
10+
uint8_t sequence;
11+
12+
void setup()
13+
{
14+
Serial.begin(19200);
15+
Serial.println("Starting...");
16+
randomSeed(analogRead(0));
17+
app.Begin();
18+
last_time = millis();
19+
sequence = 0;
20+
}
21+
22+
void loop()
23+
{
24+
app.Update();
25+
delay(100);
26+
if (millis() - last_time > 10000)
27+
{
28+
app.OnEvent(sequence);
29+
++sequence;
30+
last_time = millis();
31+
}
32+
}
33+

0 commit comments

Comments
 (0)