Changeset View
Changeset View
Standalone View
Standalone View
app/src/main/java/it/reyboz/bustorino/backend/mato/MQTTMatoClient.kt
Show All 18 Lines | |||||
typealias PositionsMap = HashMap<String, HashMap<String, LivePositionUpdate> > | typealias PositionsMap = HashMap<String, HashMap<String, LivePositionUpdate> > | ||||
class MQTTMatoClient(): MqttCallbackExtended{ | class MQTTMatoClient(): MqttCallbackExtended{ | ||||
private var isStarted = false | private var isStarted = false | ||||
private var subscribedToAll = false | private var subscribedToAll = false | ||||
private lateinit var client: MqttAndroidClient | private var client: MqttAndroidClient? = null | ||||
//private var clientID = "" | //private var clientID = "" | ||||
private val respondersMap = HashMap<String, ArrayList<WeakReference<MQTTMatoListener>>>() | private val respondersMap = HashMap<String, ArrayList<WeakReference<MQTTMatoListener>>>() | ||||
private val currentPositions = PositionsMap() | private val currentPositions = PositionsMap() | ||||
private lateinit var lifecycle: LifecycleOwner | private lateinit var lifecycle: LifecycleOwner | ||||
//TODO: remove class reference to context (always require context in all methods) | //TODO: remove class reference to context (always require context in all methods) | ||||
Show All 25 Lines | private fun connect(context: Context, iMqttActionListener: IMqttActionListener?){ | ||||
options.isCleanSession = true | options.isCleanSession = true | ||||
val headersPars = Properties() | val headersPars = Properties() | ||||
headersPars.setProperty("Origin","https://mato.muoversiatorino.it") | headersPars.setProperty("Origin","https://mato.muoversiatorino.it") | ||||
headersPars.setProperty("Host","mapi.5t.torino.it") | headersPars.setProperty("Host","mapi.5t.torino.it") | ||||
options.customWebSocketHeaders = headersPars | options.customWebSocketHeaders = headersPars | ||||
Log.d(DEBUG_TAG,"client name: $clientID") | Log.d(DEBUG_TAG,"client name: $clientID") | ||||
//actually connect | //actually connect | ||||
client.connect(options,null, iMqttActionListener) | client!!.connect(options,null, iMqttActionListener) | ||||
isStarted = true | isStarted = true | ||||
client.setCallback(this) | client!!.setCallback(this) | ||||
if (this.context ==null) | if (this.context ==null) | ||||
this.context = context.applicationContext | this.context = context.applicationContext | ||||
} | } | ||||
override fun connectComplete(reconnect: Boolean, serverURI: String?) { | override fun connectComplete(reconnect: Boolean, serverURI: String?) { | ||||
Log.d(DEBUG_TAG, "Connected to server, reconnect: $reconnect") | Log.d(DEBUG_TAG, "Connected to server, reconnect: $reconnect") | ||||
Log.d(DEBUG_TAG, "Have listeners: $respondersMap") | Log.d(DEBUG_TAG, "Have listeners: $respondersMap") | ||||
} | } | ||||
private fun connectTopic(topic: String){ | private fun connectTopic(topic: String){ | ||||
if(context==null){ | if(context==null){ | ||||
Log.e(DEBUG_TAG, "Trying to connect but context is null") | Log.e(DEBUG_TAG, "Trying to connect but context is null") | ||||
return | return | ||||
} | } | ||||
connectionTrials += 1 | connectionTrials += 1 | ||||
connect(context!!, object : IMqttActionListener{ | connect(context!!, object : IMqttActionListener{ | ||||
override fun onSuccess(asyncActionToken: IMqttToken?) { | override fun onSuccess(asyncActionToken: IMqttToken?) { | ||||
val disconnectedBufferOptions = DisconnectedBufferOptions() | val disconnectedBufferOptions = DisconnectedBufferOptions() | ||||
disconnectedBufferOptions.isBufferEnabled = true | disconnectedBufferOptions.isBufferEnabled = true | ||||
disconnectedBufferOptions.bufferSize = 100 | disconnectedBufferOptions.bufferSize = 100 | ||||
disconnectedBufferOptions.isPersistBuffer = false | disconnectedBufferOptions.isPersistBuffer = false | ||||
disconnectedBufferOptions.isDeleteOldestMessages = false | disconnectedBufferOptions.isDeleteOldestMessages = false | ||||
client.setBufferOpts(disconnectedBufferOptions) | client!!.setBufferOpts(disconnectedBufferOptions) | ||||
client.subscribe(topic, QoS.AtMostOnce.value) | client!!.subscribe(topic, QoS.AtMostOnce.value) | ||||
isStarted = true | isStarted = true | ||||
} | } | ||||
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { | override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { | ||||
Log.e(DEBUG_TAG, "FAILED To connect to the server",exception) | Log.e(DEBUG_TAG, "FAILED To connect to the server",exception) | ||||
if (connectionTrials < 10) { | if (connectionTrials < 10) { | ||||
Log.d(DEBUG_TAG, "Reconnecting") | Log.d(DEBUG_TAG, "Reconnecting") | ||||
connectTopic(topic) | connectTopic(topic) | ||||
Show All 12 Lines | fun startAndSubscribe(lineId: String, responder: MQTTMatoListener, context: Context): Boolean{ | ||||
val topic = mapTopic(lineId) | val topic = mapTopic(lineId) | ||||
this.context = context.applicationContext | this.context = context.applicationContext | ||||
synchronized(this) { | synchronized(this) { | ||||
if(!isStarted){ | if(!isStarted){ | ||||
connectTopic(topic) | connectTopic(topic) | ||||
//wait for connection | //wait for connection | ||||
} else { | } else { | ||||
client.subscribe(topic, QoS.AtMostOnce.value) | client!!.subscribe(topic, QoS.AtMostOnce.value) | ||||
} | } | ||||
} | } | ||||
synchronized(this){ | synchronized(this){ | ||||
if (!respondersMap.contains(lineId)) | if (!respondersMap.contains(lineId)) | ||||
respondersMap[lineId] = ArrayList() | respondersMap[lineId] = ArrayList() | ||||
Show All 17 Lines | fun stopMatoRequests(responder: MQTTMatoListener){ | ||||
} | } | ||||
if (done) | if (done) | ||||
break | break | ||||
} | } | ||||
if(done) Log.d(DEBUG_TAG, "Removed one listener for line $line, listeners: $v") | if(done) Log.d(DEBUG_TAG, "Removed one listener for line $line, listeners: $v") | ||||
//if (done) break | //if (done) break | ||||
if (v.isEmpty()){ | if (v.isEmpty()){ | ||||
//actually unsubscribe | //actually unsubscribe | ||||
client.unsubscribe( mapTopic(line)) | client!!.unsubscribe( mapTopic(line)) | ||||
} | } | ||||
removed = done || removed | removed = done || removed | ||||
} | } | ||||
// check responders map, remove lines that have no responders | // check responders map, remove lines that have no responders | ||||
for(line in respondersMap.keys){ | for(line in respondersMap.keys){ | ||||
if(respondersMap[line]?.isEmpty() == true){ | if(respondersMap[line]?.isEmpty() == true){ | ||||
respondersMap.remove(line) | respondersMap.remove(line) | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | override fun connectionLost(cause: Throwable?) { | ||||
connect(context!!, object: IMqttActionListener{ | connect(context!!, object: IMqttActionListener{ | ||||
override fun onSuccess(asyncActionToken: IMqttToken?) { | override fun onSuccess(asyncActionToken: IMqttToken?) { | ||||
//relisten to messages | //relisten to messages | ||||
for ((line,elms) in respondersMap.entries){ | for ((line,elms) in respondersMap.entries){ | ||||
val topic = mapTopic(line) | val topic = mapTopic(line) | ||||
if(elms.isEmpty()) | if(elms.isEmpty()) | ||||
respondersMap.remove(line) | respondersMap.remove(line) | ||||
else { | else { | ||||
client.subscribe(topic, QoS.AtMostOnce.value, null, null) | client!!.subscribe(topic, QoS.AtMostOnce.value, null, null) | ||||
Log.d(DEBUG_TAG, "Resubscribed with topic $topic") | Log.d(DEBUG_TAG, "Resubscribed with topic $topic") | ||||
} | } | ||||
} | } | ||||
Log.d(DEBUG_TAG, "Reconnected to MQTT Mato Client") | Log.d(DEBUG_TAG, "Reconnected to MQTT Mato Client") | ||||
} | } | ||||
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { | override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { | ||||
▲ Show 20 Lines • Show All 83 Lines • ▼ Show 20 Lines | private fun parseMessageAndAddToList(topic: String, message: MqttMessage){ | ||||
if(!en.isEmpty()){ | if(!en.isEmpty()){ | ||||
emptyResp=false | emptyResp=false | ||||
break | break | ||||
} | } | ||||
} | } | ||||
//try unsubscribing to all | //try unsubscribing to all | ||||
if(emptyResp) { | if(emptyResp) { | ||||
Log.d(DEBUG_TAG, "Unsubscribe all") | Log.d(DEBUG_TAG, "Unsubscribe all") | ||||
client.unsubscribe(LINES_ALL) | client!!.unsubscribe(LINES_ALL) | ||||
} | } | ||||
} | } | ||||
//Log.d(DEBUG_TAG, "We have update on line $lineId, vehicle $vehicleId") | //Log.d(DEBUG_TAG, "We have update on line $lineId, vehicle $vehicleId") | ||||
} catch (e: JSONException){ | } catch (e: JSONException){ | ||||
Log.w(DEBUG_TAG,"Cannot decipher message on topic $topic, line $lineId, veh $vehicleId (bad JSON)") | Log.w(DEBUG_TAG,"Cannot decipher message on topic $topic, line $lineId, veh $vehicleId (bad JSON)") | ||||
} catch (e: Exception){ | } catch (e: Exception){ | ||||
Log.e(DEBUG_TAG, "Exception occurred", e) | Log.e(DEBUG_TAG, "Exception occurred", e) | ||||
Show All 9 Lines | class MQTTMatoClient(): MqttCallbackExtended{ | ||||
* Stop the service forever. Client has not to be used again!! | * Stop the service forever. Client has not to be used again!! | ||||
*/ | */ | ||||
fun closeClientForever(){ | fun closeClientForever(){ | ||||
client.disconnect() | client.disconnect() | ||||
client.close() | client.close() | ||||
}*/ | }*/ | ||||
fun disconnect(){ | fun disconnect(){ | ||||
client.disconnect() | client?.disconnect() | ||||
} | } | ||||
companion object{ | companion object{ | ||||
const val SERVER_ADDR="wss://mapi.5t.torino.it:443/scre" | const val SERVER_ADDR="wss://mapi.5t.torino.it:443/scre" | ||||
const val LINES_ALL="ALL" | const val LINES_ALL="ALL" | ||||
private const val DEBUG_TAG="BusTO-MatoMQTT" | private const val DEBUG_TAG="BusTO-MatoMQTT" | ||||
Show All 32 Lines | data class MQTTPositionUpdate( | ||||
val longitude: Double, | val longitude: Double, | ||||
val heading: Int?, | val heading: Int?, | ||||
val speed: Int?, | val speed: Int?, | ||||
val tripId: String?, | val tripId: String?, | ||||
val direct: Int?, | val direct: Int?, | ||||
val nextStop: Int?, | val nextStop: Int?, | ||||
//val full: Int? | //val full: Int? | ||||
) | ) | ||||
No newline at end of file | No newline at end of file |
Public contents are in Creative Commons Attribution-ShareAlike 4.0 (CC-BY-SA) or GNU Free Documentation License (at your option) unless otherwise noted. · Contact / Register