Changeset View
Changeset View
Standalone View
Standalone View
app/src/main/java/it/reyboz/bustorino/backend/mato/MQTTMatoClient.kt
package it.reyboz.bustorino.backend.mato | package it.reyboz.bustorino.backend.mato | ||||
import android.app.Notification | |||||
import android.app.NotificationManager | |||||
import android.content.Context | import android.content.Context | ||||
import android.os.Build | |||||
import android.util.Log | import android.util.Log | ||||
import androidx.lifecycle.LifecycleOwner | import androidx.lifecycle.LifecycleOwner | ||||
import info.mqtt.android.service.Ack | import info.mqtt.android.service.Ack | ||||
import it.reyboz.bustorino.backend.gtfs.LivePositionUpdate | |||||
import org.eclipse.paho.client.mqttv3.* | |||||
import info.mqtt.android.service.MqttAndroidClient | import info.mqtt.android.service.MqttAndroidClient | ||||
import info.mqtt.android.service.QoS | import info.mqtt.android.service.QoS | ||||
import it.reyboz.bustorino.backend.Notifications | |||||
import it.reyboz.bustorino.backend.gtfs.LivePositionUpdate | |||||
import org.eclipse.paho.client.mqttv3.* | |||||
import org.json.JSONArray | import org.json.JSONArray | ||||
import org.json.JSONException | import org.json.JSONException | ||||
import java.lang.ref.WeakReference | import java.lang.ref.WeakReference | ||||
import java.util.ArrayList | import java.util.* | ||||
import java.util.Properties | |||||
typealias PositionsMap = HashMap<String, HashMap<String, LivePositionUpdate> > | typealias PositionsMap = HashMap<String, HashMap<String, LivePositionUpdate> > | ||||
class MQTTMatoClient private constructor(): MqttCallbackExtended{ | class MQTTMatoClient(): MqttCallbackExtended{ | ||||
private var isStarted = false | private var isStarted = false | ||||
private var subscribedToAll = false | private var subscribedToAll = false | ||||
private lateinit var client: MqttAndroidClient | private lateinit var client: MqttAndroidClient | ||||
//private var clientID = "" | //private var clientID = "" | ||||
private val respondersMap = HashMap<String, ArrayList<WeakReference<MQTTMatoListener>>>() | private val respondersMap = HashMap<String, ArrayList<WeakReference<MQTTMatoListener>>>() | ||||
private val currentPositions = PositionsMap() | private val currentPositions = PositionsMap() | ||||
private lateinit var lifecycle: LifecycleOwner | private lateinit var lifecycle: LifecycleOwner | ||||
//TODO: remove class reference to context (always require context in all methods) | |||||
private var context: Context?= null | private var context: Context?= null | ||||
private var connectionTrials = 0 | private var connectionTrials = 0 | ||||
private var notification: Notification? = null | |||||
//private lateinit var notification: Notification | |||||
private fun connect(context: Context, iMqttActionListener: IMqttActionListener?){ | private fun connect(context: Context, iMqttActionListener: IMqttActionListener?){ | ||||
val clientID = "mqtt-explorer-${getRandomString(8)}"//"mqttjs_${getRandomString(8)}" | val clientID = "mqtt-explorer-${getRandomString(8)}"//"mqttjs_${getRandomString(8)}" | ||||
//notification = Notifications.makeMQTTServiceNotification(context) | |||||
client = MqttAndroidClient(context,SERVER_ADDR,clientID,Ack.AUTO_ACK) | client = MqttAndroidClient(context,SERVER_ADDR,clientID,Ack.AUTO_ACK) | ||||
if(Build.VERSION.SDK_INT >= Build.VERSION_CODES.O){ | |||||
//we need a notification | |||||
val notific = Notifications.makeMQTTServiceNotification(context) | |||||
client.setForegroundService(notific) | |||||
notification=notific | |||||
} | |||||
val options = MqttConnectOptions() | val options = MqttConnectOptions() | ||||
//options.sslProperties = | //options.sslProperties = | ||||
options.isCleanSession = true | options.isCleanSession = true | ||||
val headersPars = Properties() | val headersPars = Properties() | ||||
headersPars.setProperty("Origin","https://mato.muoversiatorino.it") | headersPars.setProperty("Origin","https://mato.muoversiatorino.it") | ||||
headersPars.setProperty("Host","mapi.5t.torino.it") | headersPars.setProperty("Host","mapi.5t.torino.it") | ||||
options.customWebSocketHeaders = headersPars | options.customWebSocketHeaders = headersPars | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | fun startAndSubscribe(lineId: String, responder: MQTTMatoListener, context: Context): Boolean{ | ||||
respondersMap[lineId] = ArrayList() | respondersMap[lineId] = ArrayList() | ||||
respondersMap[lineId]!!.add(WeakReference(responder)) | respondersMap[lineId]!!.add(WeakReference(responder)) | ||||
Log.d(DEBUG_TAG, "Add MQTT Listener for line $lineId, topic $topic") | Log.d(DEBUG_TAG, "Add MQTT Listener for line $lineId, topic $topic") | ||||
} | } | ||||
return true | return true | ||||
} | } | ||||
fun desubscribe(responder: MQTTMatoListener){ | fun stopMatoRequests(responder: MQTTMatoListener){ | ||||
var removed = false | var removed = false | ||||
for ((line,v)in respondersMap.entries){ | for ((line,v)in respondersMap.entries){ | ||||
var done = false | var done = false | ||||
for (el in v){ | for (el in v){ | ||||
if (el.get()==null){ | if (el.get()==null){ | ||||
v.remove(el) | v.remove(el) | ||||
} else if(el.get() == responder){ | } else if(el.get() == responder){ | ||||
v.remove(el) | v.remove(el) | ||||
done = true | done = true | ||||
} | } | ||||
if (done) | if (done) | ||||
break | break | ||||
} | } | ||||
if(done) Log.d(DEBUG_TAG, "Removed one listener for line $line, listeners: $v") | if(done) Log.d(DEBUG_TAG, "Removed one listener for line $line, listeners: $v") | ||||
//if (done) break | //if (done) break | ||||
if (v.isEmpty()){ | if (v.isEmpty()){ | ||||
//actually unsubscribe | //actually unsubscribe | ||||
client.unsubscribe( mapTopic(line)) | client.unsubscribe( mapTopic(line)) | ||||
} | } | ||||
removed = done || removed | removed = done || removed | ||||
} | } | ||||
// remove lines that have no responders | // check responders map, remove lines that have no responders | ||||
for(line in respondersMap.keys){ | for(line in respondersMap.keys){ | ||||
if(respondersMap[line]?.isEmpty() == true){ | if(respondersMap[line]?.isEmpty() == true){ | ||||
respondersMap.remove(line) | respondersMap.remove(line) | ||||
} | } | ||||
} | } | ||||
Log.d(DEBUG_TAG, "Removed: $removed, respondersMap: $respondersMap") | Log.d(DEBUG_TAG, "Removed: $removed, respondersMap: $respondersMap") | ||||
} | } | ||||
fun getPositions(): PositionsMap{ | fun getPositions(): PositionsMap{ | ||||
return currentPositions | return currentPositions | ||||
} | } | ||||
/** | |||||
* Cancel the notification | |||||
*/ | |||||
fun removeNotification(context: Context){ | |||||
val notifManager = context.applicationContext.getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager | |||||
notifManager.cancel(MQTT_NOTIFICATION_ID) | |||||
} | |||||
private fun sendUpdateToResponders(responders: ArrayList<WeakReference<MQTTMatoListener>>): Int{ | private fun sendUpdateToResponders(responders: ArrayList<WeakReference<MQTTMatoListener>>): Int{ | ||||
//var sent = false | //var sent = false | ||||
var count = 0 | var count = 0 | ||||
for (wrD in responders) { | for (wrD in responders) { | ||||
if (wrD.get() == null) { | if (wrD.get() == null) { | ||||
Log.d(DEBUG_TAG, "Removing weak reference") | Log.d(DEBUG_TAG, "Removing weak reference") | ||||
responders.remove(wrD) | responders.remove(wrD) | ||||
} | } | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | private fun parseMessageAndAddToList(topic: String, message: MqttMessage){ | ||||
val vehicleId = vals[2] | val vehicleId = vals[2] | ||||
val timestamp = (System.currentTimeMillis() / 1000 ) as Long | val timestamp = (System.currentTimeMillis() / 1000 ) as Long | ||||
val messString = String(message.payload) | val messString = String(message.payload) | ||||
try { | try { | ||||
val jsonList = JSONArray(messString) | val jsonList = JSONArray(messString) | ||||
//val full = if(jsonList.length()>7) { | |||||
// if (jsonList.get(7).equals(null)) null else jsonList.getInt(7) | |||||
//}else null | |||||
/*val posUpdate = MQTTPositionUpdate(lineId+"U", vehicleId, | /*val posUpdate = MQTTPositionUpdate(lineId+"U", vehicleId, | ||||
jsonList.getDouble(0), | jsonList.getDouble(0), | ||||
jsonList.getDouble(1), | jsonList.getDouble(1), | ||||
if(jsonList.get(2).equals(null)) null else jsonList.getInt(2), | if(jsonList.get(2).equals(null)) null else jsonList.getInt(2), | ||||
if(jsonList.get(3).equals(null)) null else jsonList.getInt(3), | if(jsonList.get(3).equals(null)) null else jsonList.getInt(3), | ||||
if(jsonList.get(4).equals(null)) null else jsonList.getString(4)+"U", | if(jsonList.get(4).equals(null)) null else jsonList.getString(4)+"U", | ||||
if(jsonList.get(5).equals(null)) null else jsonList.getInt(5), | if(jsonList.get(5).equals(null)) null else jsonList.getInt(5), | ||||
if(jsonList.get(6).equals(null)) null else jsonList.getInt(6), | if(jsonList.get(6).equals(null)) null else jsonList.getInt(6), | ||||
//full | //full | ||||
) | ) | ||||
*/ | */ | ||||
if(jsonList.get(4)==null){ | if(jsonList.get(4)==null){ | ||||
Log.d(DEBUG_TAG, "We have null tripId: line $lineId veh $vehicleId: $jsonList") | Log.d(DEBUG_TAG, "We have null tripId: line $lineId veh $vehicleId: $jsonList") | ||||
return | return | ||||
} | } | ||||
val posUpdate = LivePositionUpdate( | val posUpdate = LivePositionUpdate( | ||||
jsonList.getString(4)+"U", | jsonList.getString(4)+"U", | ||||
null, | null, | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | private fun parseMessageAndAddToList(topic: String, message: MqttMessage){ | ||||
} | } | ||||
} | } | ||||
override fun deliveryComplete(token: IMqttDeliveryToken?) { | override fun deliveryComplete(token: IMqttDeliveryToken?) { | ||||
//NOT USED (we're not sending any messages) | //NOT USED (we're not sending any messages) | ||||
} | } | ||||
/** | |||||
* Stop the service forever. Client has not to be used again!! | |||||
*/ | |||||
fun closeClientForever(){ | |||||
client.close() | |||||
} | |||||
companion object{ | companion object{ | ||||
const val SERVER_ADDR="wss://mapi.5t.torino.it:443/scre" | const val SERVER_ADDR="wss://mapi.5t.torino.it:443/scre" | ||||
const val LINES_ALL="ALL" | const val LINES_ALL="ALL" | ||||
private const val DEBUG_TAG="BusTO-MatoMQTT" | private const val DEBUG_TAG="BusTO-MatoMQTT" | ||||
@Volatile | //this has to match the value in MQTT library (MQTTAndroidClient) | ||||
private var instance: MQTTMatoClient? = null | const val MQTT_NOTIFICATION_ID: Int = 77 | ||||
fun getInstance() = instance?: synchronized(this){ | |||||
instance?: MQTTMatoClient().also { instance= it } | |||||
} | |||||
@JvmStatic | @JvmStatic | ||||
fun mapTopic(lineId: String): String{ | fun mapTopic(lineId: String): String{ | ||||
return if(lineId== LINES_ALL || lineId == "#") | return if(lineId== LINES_ALL || lineId == "#") | ||||
"#" | "#" | ||||
else{ | else{ | ||||
"/${lineId}/#" | "/${lineId}/#" | ||||
} | } | ||||
Show All 21 Lines | data class MQTTPositionUpdate( | ||||
val longitude: Double, | val longitude: Double, | ||||
val heading: Int?, | val heading: Int?, | ||||
val speed: Int?, | val speed: Int?, | ||||
val tripId: String?, | val tripId: String?, | ||||
val direct: Int?, | val direct: Int?, | ||||
val nextStop: Int?, | val nextStop: Int?, | ||||
//val full: Int? | //val full: Int? | ||||
) | ) | ||||
No newline at end of file | No newline at end of file |
Public contents are in Creative Commons Attribution-ShareAlike 4.0 (CC-BY-SA) or GNU Free Documentation License (at your option) unless otherwise noted. · Contact / Register