/* PubSubClient.cpp - A simple client for MQTT. Nick O'Leary http://knolleary.net */ #include "PubSubClient.h" #include "Arduino.h" PubSubClient::PubSubClient() { this->_state = MQTT_DISCONNECTED; this->_client = NULL; this->stream = NULL; setCallback(NULL); } PubSubClient::PubSubClient(Client& client) { this->_state = MQTT_DISCONNECTED; setClient(client); this->stream = NULL; } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(addr, port); setClient(client); this->stream = NULL; } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(addr,port); setClient(client); setStream(stream); } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(addr, port); setCallback(callback); setClient(client); this->stream = NULL; } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(addr,port); setCallback(callback); setClient(client); setStream(stream); } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(ip, port); setClient(client); this->stream = NULL; } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(ip,port); setClient(client); setStream(stream); } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(ip, port); setCallback(callback); setClient(client); this->stream = NULL; } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(ip,port); setCallback(callback); setClient(client); setStream(stream); } PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setClient(client); this->stream = NULL; } PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setClient(client); setStream(stream); } PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setCallback(callback); setClient(client); this->stream = NULL; } PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setCallback(callback); setClient(client); setStream(stream); } boolean PubSubClient::connect(const char *id) { return connect(id,NULL,NULL,0,0,0,0); } boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { return connect(id,user,pass,0,0,0,0); } boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); } boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { if (!connected()) { int result = 0; if (domain != NULL) { result = _client->connect(this->domain, this->port); } else { result = _client->connect(this->ip, this->port); } if (result == 1) { nextMsgId = 1; // Leave room in the buffer for header and variable length field uint16_t length = 5; unsigned int j; #if MQTT_VERSION == MQTT_VERSION_3_1 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; #define MQTT_HEADER_VERSION_LENGTH 9 #elif MQTT_VERSION == MQTT_VERSION_3_1_1 uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; #define MQTT_HEADER_VERSION_LENGTH 7 #endif for (j = 0;j>1); } } buffer[length++] = v; buffer[length++] = ((MQTT_KEEPALIVE) >> 8); buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); length = writeString(id,buffer,length); if (willTopic) { length = writeString(willTopic,buffer,length); length = writeString(willMessage,buffer,length); } if(user != NULL) { length = writeString(user,buffer,length); if(pass != NULL) { length = writeString(pass,buffer,length); } } write(MQTTCONNECT,buffer,length-5); lastInActivity = lastOutActivity = millis(); while (!_client->available()) { unsigned long t = millis(); if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) { _state = MQTT_CONNECTION_TIMEOUT; _client->stop(); return false; } } uint8_t llen; uint16_t len = readPacket(&llen); if (len == 4) { if (buffer[3] == 0) { lastInActivity = millis(); pingOutstanding = false; _state = MQTT_CONNECTED; return true; } else { _state = buffer[3]; } } _client->stop(); } else { _state = MQTT_CONNECT_FAILED; } return false; } return true; } // reads a byte into result boolean PubSubClient::readByte(uint8_t * result) { uint32_t previousMillis = millis(); while(!_client->available()) { uint32_t currentMillis = millis(); if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){ return false; } } *result = _client->read(); return true; } // reads a byte into result[*index] and increments index boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ uint16_t current_index = *index; uint8_t * write_address = &(result[current_index]); if(readByte(write_address)){ *index = current_index + 1; return true; } return false; } uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { uint16_t len = 0; if(!readByte(buffer, &len)) return 0; bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; uint16_t length = 0; uint8_t digit = 0; uint16_t skip = 0; uint8_t start = 0; do { if(!readByte(&digit)) return 0; buffer[len++] = digit; length += (digit & 127) * multiplier; multiplier *= 128; } while ((digit & 128) != 0); *lengthLength = len-1; if (isPublish) { // Read in topic length to calculate bytes to skip over for Stream writing if(!readByte(buffer, &len)) return 0; if(!readByte(buffer, &len)) return 0; skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; start = 2; if (buffer[0]&MQTTQOS1) { // skip message id skip += 2; } } for (uint16_t i = start;istream) { if (isPublish && len-*lengthLength-2>skip) { this->stream->write(digit); } } if (len < MQTT_MAX_PACKET_SIZE) { buffer[len] = digit; } len++; } if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { len = 0; // This will cause the packet to be ignored. } return len; } boolean PubSubClient::loop() { if (connected()) { unsigned long t = millis(); if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { if (pingOutstanding) { this->_state = MQTT_CONNECTION_TIMEOUT; _client->stop(); return false; } else { buffer[0] = MQTTPINGREQ; buffer[1] = 0; _client->write(buffer,2); lastOutActivity = t; lastInActivity = t; pingOutstanding = true; } } if (_client->available()) { uint8_t llen; uint16_t len = readPacket(&llen); uint16_t msgId = 0; uint8_t *payload; if (len > 0) { lastInActivity = t; uint8_t type = buffer[0]&0xF0; if (type == MQTTPUBLISH) { if (callback) { uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */ memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ char *topic = (char*) buffer+llen+2; // msgId only present for QOS>0 if ((buffer[0]&0x06) == MQTTQOS1) { msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; payload = buffer+llen+3+tl+2; callback(topic,payload,len-llen-3-tl-2); buffer[0] = MQTTPUBACK; buffer[1] = 2; buffer[2] = (msgId >> 8); buffer[3] = (msgId & 0xFF); _client->write(buffer,4); lastOutActivity = t; } else { payload = buffer+llen+3+tl; callback(topic,payload,len-llen-3-tl); } } } else if (type == MQTTPINGREQ) { buffer[0] = MQTTPINGRESP; buffer[1] = 0; _client->write(buffer,2); } else if (type == MQTTPINGRESP) { pingOutstanding = false; } } } return true; } return false; } boolean PubSubClient::publish(const char* topic, const char* payload) { return publish(topic,(const uint8_t*)payload,strlen(payload),false); } boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { return publish(topic,(const uint8_t*)payload,strlen(payload),retained); } boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { return publish(topic, payload, plength, false); } boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { if (connected()) { if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) { // Too long return false; } // Leave room in the buffer for header and variable length field uint16_t length = 5; length = writeString(topic,buffer,length); uint16_t i; for (i=0;i 0) { digit |= 0x80; } buffer[pos++] = digit; llen++; } while(len>0); pos = writeString(topic,buffer,pos); rc += _client->write(buffer,pos); for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); } lastOutActivity = millis(); return rc == tlen + 4 + plength; } boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { uint8_t lenBuf[4]; uint8_t llen = 0; uint8_t digit; uint8_t pos = 0; uint16_t rc; uint16_t len = length; do { digit = len % 128; len = len / 128; if (len > 0) { digit |= 0x80; } lenBuf[pos++] = digit; llen++; } while(len>0); buf[4-llen] = header; for (int i=0;i 0) && result) { bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining; rc = _client->write(writeBuf,bytesToWrite); result = (rc == bytesToWrite); bytesRemaining -= rc; writeBuf += rc; } return result; #else rc = _client->write(buf+(4-llen),length+1+llen); lastOutActivity = millis(); return (rc == 1+llen+length); #endif } boolean PubSubClient::subscribe(const char* topic) { return subscribe(topic, 0); } boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { if (qos < 0 || qos > 1) { return false; } if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { // Too long return false; } if (connected()) { // Leave room in the buffer for header and variable length field uint16_t length = 5; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString((char*)topic, buffer,length); buffer[length++] = qos; return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); } return false; } boolean PubSubClient::unsubscribe(const char* topic) { if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { // Too long return false; } if (connected()) { uint16_t length = 5; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString(topic, buffer,length); return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); } return false; } void PubSubClient::disconnect() { buffer[0] = MQTTDISCONNECT; buffer[1] = 0; _client->write(buffer,2); _state = MQTT_DISCONNECTED; _client->stop(); lastInActivity = lastOutActivity = millis(); } uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { const char* idp = string; uint16_t i = 0; pos += 2; while (*idp) { buf[pos++] = *idp++; i++; } buf[pos-i-2] = (i >> 8); buf[pos-i-1] = (i & 0xFF); return pos; } boolean PubSubClient::connected() { boolean rc; if (_client == NULL ) { rc = false; } else { rc = (int)_client->connected(); if (!rc) { if (this->_state == MQTT_CONNECTED) { this->_state = MQTT_CONNECTION_LOST; _client->flush(); _client->stop(); } } } return rc; } PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) { IPAddress addr(ip[0],ip[1],ip[2],ip[3]); return setServer(addr,port); } PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) { this->ip = ip; this->port = port; this->domain = NULL; return *this; } PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) { this->domain = domain; this->port = port; return *this; } PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) { this->callback = callback; return *this; } PubSubClient& PubSubClient::setClient(Client& client){ this->_client = &client; return *this; } PubSubClient& PubSubClient::setStream(Stream& stream){ this->stream = &stream; return *this; } int PubSubClient::state() { return this->_state; }