diff --git a/app/build.gradle b/app/build.gradle --- a/app/build.gradle +++ b/app/build.gradle @@ -12,7 +12,7 @@ defaultConfig { applicationId "it.reyboz.bustorino" - minSdkVersion 21 + minSdkVersion 24 targetSdkVersion 35 buildToolsVersion = '35.0.1' versionCode 65 @@ -83,6 +83,8 @@ } } + packagingOptions { resources.excludes.add("META-INF/*") } + } protobuf { @@ -146,8 +148,10 @@ implementation 'com.google.protobuf:protobuf-javalite:3.22.3' // mqtt library - implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' - implementation 'com.github.hannesa2:paho.mqtt.android:4.4' + //implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' + //implementation 'com.github.hannesa2:paho.mqtt.android:4.4' + implementation("com.hivemq:hivemq-mqtt-client:1.3.10") + implementation(platform("com.hivemq:hivemq-mqtt-client-websocket:1.3.10")) // ViewModel implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version" // LiveData diff --git a/app/src/main/java/it/reyboz/bustorino/backend/mato/MQTTMatoClient.kt b/app/src/main/java/it/reyboz/bustorino/backend/mato/MQTTMatoClient.kt --- a/app/src/main/java/it/reyboz/bustorino/backend/mato/MQTTMatoClient.kt +++ b/app/src/main/java/it/reyboz/bustorino/backend/mato/MQTTMatoClient.kt @@ -1,30 +1,29 @@ package it.reyboz.bustorino.backend.mato -import android.app.Notification import android.app.NotificationManager import android.content.Context -import android.os.Build import android.util.Log import androidx.lifecycle.LifecycleOwner -import info.mqtt.android.service.Ack -import info.mqtt.android.service.MqttAndroidClient -import info.mqtt.android.service.QoS -import it.reyboz.bustorino.backend.Notifications +import com.hivemq.client.mqtt.MqttClient +import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient +import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish import it.reyboz.bustorino.backend.gtfs.LivePositionUpdate -import org.eclipse.paho.client.mqttv3.* import org.json.JSONArray import org.json.JSONException import java.lang.ref.WeakReference import java.util.* +import java.util.concurrent.TimeUnit typealias PositionsMap = HashMap > -class MQTTMatoClient(): MqttCallbackExtended{ +class MQTTMatoClient(){ private var isStarted = false - private var subscribedToAll = false + //private var subscribedToAll = false - private var client: MqttAndroidClient? = null + private var client: Mqtt3AsyncClient? = null //private var clientID = "" private val respondersMap = HashMap>>() @@ -34,116 +33,101 @@ private lateinit var lifecycle: LifecycleOwner //TODO: remove class reference to context (always require context in all methods) private var context: Context?= null - private var connectionTrials = 0 - private var notification: Notification? = null + //private var connectionTrials = 0 + //private var notification: Notification? = null - //private lateinit var notification: Notification - private fun connect(context: Context, iMqttActionListener: IMqttActionListener?){ - - val clientID = "mqtt-explorer-${getRandomString(8)}"//"mqttjs_${getRandomString(8)}" + private fun connect(context: Context, onConnect: OnConnect){ + //val clientID = "mqtt-explorer-${getRandomString(8)}"//"mqttjs_${getRandomString(8)}" + if (this.context ==null) + this.context = context.applicationContext //notification = Notifications.makeMQTTServiceNotification(context) - client = MqttAndroidClient(context,SERVER_ADDR,clientID,Ack.AUTO_ACK) - // WE DO NOT WANT A FOREGROUND SERVICE -> it's only more mayhem - // (and the positions need to be downloaded only when the app is shown) - // update, 2024-04: Google Play doesn't understand our needs, so we put back the notification - // and add a video of it working as Google wants - /*if(Build.VERSION.SDK_INT >= Build.VERSION_CODES.O){ - //we need a notification - Notifications.createLivePositionsChannel(context) - val notific = Notifications.makeMQTTServiceNotification(context) - client!!.setForegroundService(notific) - notification=notific - }*/ - - - val options = MqttConnectOptions() - //options.sslProperties = - options.isCleanSession = true - val headersPars = Properties() - headersPars.setProperty("Origin","https://mato.muoversiatorino.it") - headersPars.setProperty("Host","mapi.5t.torino.it") - options.customWebSocketHeaders = headersPars - - Log.d(DEBUG_TAG,"client name: $clientID") + val mclient = makeClientBuilder().buildAsync() + Log.d(DEBUG_TAG, "Connecting to MQTT server") //actually connect - isStarted = true - client!!.connect(options,null, iMqttActionListener) - client!!.setCallback(this) + val mess = mclient.connect().whenComplete { subAck, throwable -> + if(throwable!=null){ + Log.w(DEBUG_TAG,"Failed to connect to MQTT server:") + Log.w(DEBUG_TAG,throwable.toString()) + } + else{ + isStarted = true + Log.d(DEBUG_TAG, "Connected to MQTT server") + onConnect.onConnectSuccess() + } + } + client = mclient + /* if (mess.returnCode != Mqtt3ConnAckReturnCode.SUCCESS){ + Log.w(DEBUG_TAG,"Failed to connect to MQTT client") + return false + } else{ + client = blockingClient.toAsync() + isStarted = true + return true + } - if (this.context ==null) - this.context = context.applicationContext + */ } - override fun connectComplete(reconnect: Boolean, serverURI: String?) { + /*override fun connectComplete(reconnect: Boolean, serverURI: String?) { Log.d(DEBUG_TAG, "Connected to server, reconnect: $reconnect") Log.d(DEBUG_TAG, "Have listeners: $respondersMap") } - private fun connectTopic(topic: String){ + + */ + private fun subscribeTopic(topic: String){ if(context==null){ Log.e(DEBUG_TAG, "Trying to connect but context is null") return } - connectionTrials += 1 - connect(context!!, object : IMqttActionListener{ - override fun onSuccess(asyncActionToken: IMqttToken?) { - val disconnectedBufferOptions = DisconnectedBufferOptions() - disconnectedBufferOptions.isBufferEnabled = true - disconnectedBufferOptions.bufferSize = 100 - disconnectedBufferOptions.isPersistBuffer = false - disconnectedBufferOptions.isDeleteOldestMessages = false - client!!.setBufferOpts(disconnectedBufferOptions) - client!!.subscribe(topic, QoS.AtMostOnce.value) - isStarted = true - } - override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { - Log.e(DEBUG_TAG, "FAILED To connect to the server",exception) - if (connectionTrials < 10) { - Log.d(DEBUG_TAG, "Reconnecting") - connectTopic(topic) + client!!.subscribeWith() + .topicFilter(topic).callback {publish -> + messageArrived(publish.topic.toString(), publish) + }.send().whenComplete { subAck, throwable-> + if(throwable != null){ + Log.e(DEBUG_TAG, "Error while subscribing to topic $topic", throwable) } - else { - //reset connection trials - connectionTrials = 0 + else{ + //add the responder to the list + Log.d(DEBUG_TAG, "Subscribed to topic $topic, responders ready: ${respondersMap[topic]}") } } - }) } fun startAndSubscribe(lineId: String, responder: MQTTMatoListener, context: Context): Boolean{ //start the client, and then subscribe to the topic val topic = mapTopic(lineId) this.context = context.applicationContext + synchronized(this) { if(!isStarted){ - connectTopic(topic) + connect(context.applicationContext) { + //when connection is done, run this + if (!respondersMap.contains(lineId)){ + respondersMap[lineId] = ArrayList() + } + respondersMap[lineId]!!.add(WeakReference(responder)) + subscribeTopic(topic) + + } //wait for connection - } else { - client!!.subscribe(topic, QoS.AtMostOnce.value) } + //recheck if it is started } - - synchronized(this){ - if (!respondersMap.contains(lineId)) - respondersMap[lineId] = ArrayList() - respondersMap[lineId]!!.add(WeakReference(responder)) - Log.d(DEBUG_TAG, "Add MQTT Listener for line $lineId, topic $topic") - } - return true } fun stopMatoRequests(responder: MQTTMatoListener){ var removed = false - for ((linekey,responderList)in respondersMap.entries){ + for ((lineTopic,responderList)in respondersMap.entries){ var done = false for (el in responderList){ if (el.get()==null){ @@ -155,12 +139,20 @@ if (done) break } - if(done) Log.d(DEBUG_TAG, "Removed one listener for line $linekey, listeners: $responderList") + if(done) Log.d(DEBUG_TAG, "Removed one listener for topic $lineTopic, listeners: $responderList") //if (done) break if (responderList.isEmpty()){ //actually unsubscribe try { - client?.unsubscribe(mapTopic(linekey)) + val topic = mapTopic(lineTopic) + client?.run{ + unsubscribeWith().addTopicFilter(topic).send().whenComplete { subAck, throwable -> + if (throwable!=null){ + //error occurred + Log.e(DEBUG_TAG, "Error while unsubscribing to topic $topic",throwable) + } + } + } } catch (e: Exception){ Log.e(DEBUG_TAG, "Tried unsubscribing but there was an error in the client library:\n$e") } @@ -207,7 +199,7 @@ return count } - override fun connectionLost(cause: Throwable?) { + /*override fun connectionLost(cause: Throwable?) { var doReconnect = false for ((line,elms) in respondersMap.entries){ if(!elms.isEmpty()){ @@ -252,38 +244,31 @@ } - override fun messageArrived(topic: String?, message: MqttMessage?) { - if (topic==null || message==null) return + */ + + fun messageArrived(topic: String?, message: Mqtt3Publish) { + if (topic==null) return //Log.d(DEBUG_TAG,"Arrived message on topic $topic, ${String(message.payload)}") parseMessageAndAddToList(topic, message) //GlobalScope.launch { } } - private fun parseMessageAndAddToList(topic: String, message: MqttMessage){ + private fun parseMessageAndAddToList(topic: String, message: Mqtt3Publish){ + //val mqttTopic = message.topic + //Log.d(DEBUG_TAG, "Topic of message received: $mqttTopic") val vals = topic.split("/") val lineId = vals[1] val vehicleId = vals[2] val timestamp = (System.currentTimeMillis() / 1000 ) as Long - val messString = String(message.payload) - + val messString = String(message.payloadAsBytes) + Log.d(DEBUG_TAG, "Received message on topic: $topic") try { val jsonList = JSONArray(messString) - /*val posUpdate = MQTTPositionUpdate(lineId+"U", vehicleId, - jsonList.getDouble(0), - jsonList.getDouble(1), - if(jsonList.get(2).equals(null)) null else jsonList.getInt(2), - if(jsonList.get(3).equals(null)) null else jsonList.getInt(3), - if(jsonList.get(4).equals(null)) null else jsonList.getString(4)+"U", - if(jsonList.get(5).equals(null)) null else jsonList.getInt(5), - if(jsonList.get(6).equals(null)) null else jsonList.getInt(6), - //full - ) - */ if(jsonList.get(4)==null){ Log.d(DEBUG_TAG, "We have null tripId: line $lineId veh $vehicleId: $jsonList") return @@ -334,7 +319,7 @@ //try unsubscribing to all if(emptyResp) { Log.d(DEBUG_TAG, "Unsubscribe all") - client!!.unsubscribe(LINES_ALL) + //client!!.unsubscribe(LINES_ALL) } } //Log.d(DEBUG_TAG, "We have update on line $lineId, vehicle $vehicleId") @@ -347,10 +332,6 @@ } - override fun deliveryComplete(token: IMqttDeliveryToken?) { - //NOT USED (we're not sending any messages) - } - /*/** * Stop the service forever. Client has not to be used again!! */ @@ -366,13 +347,15 @@ companion object{ - const val SERVER_ADDR="wss://mapi.5t.torino.it:443/scre" + const val SERVER_ADDR="mapi.5t.torino.it" + const val SERVER_PATH="/scre" const val LINES_ALL="ALL" private const val DEBUG_TAG="BusTO-MatoMQTT" //this has to match the value in MQTT library (MQTTAndroidClient) const val MQTT_NOTIFICATION_ID: Int = 77 + @JvmStatic fun mapTopic(lineId: String): String{ return if(lineId== LINES_ALL || lineId == "#") @@ -394,10 +377,42 @@ //positionsMap is a dict with line -> vehicle -> Update fun onUpdateReceived(posUpdates: PositionsMap) } + fun getHTTPHeaders(): HashMap { + val headers = HashMap() + headers["Origin"] = "https://mato.muoversiatorino.it" + headers["Host"] = "mapi.5t.torino.it" + + return headers + } + private fun makeClientBuilder() : Mqtt3ClientBuilder{ + val clientID = "mqtt-explorer-${getRandomString(8)}" + + val r = MqttClient.builder() + .useMqttVersion3() + .identifier(clientID) + .serverHost(SERVER_ADDR) + .serverPort(443) + .sslWithDefaultConfig() + //.automaticReconnect(MqttClientAutoReconnect.builder() + // .initialDelay(100, TimeUnit.MILLISECONDS) + // .maxDelay(60, TimeUnit.SECONDS).build()) + .webSocketConfig() + .httpHeaders(getHTTPHeaders()) + .serverPath("scre") + .applyWebSocketConfig() + //.webSocketWithDefaultConfig() + return r + } + + + } + + private fun interface OnConnect{ + fun onConnectSuccess() } } -data class MQTTPositionUpdate( +/*data class MQTTPositionUpdate( val lineId: String, val vehicleId: String, val latitude: Double, @@ -408,4 +423,4 @@ val direct: Int?, val nextStop: Int?, //val full: Int? -) \ No newline at end of file +)*/ \ No newline at end of file diff --git a/build.gradle b/build.gradle --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ ext.coroutines_version = "1.10.2" dependencies { classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.4' // or latest - classpath 'com.android.tools.build:gradle:8.6.0' + classpath 'com.android.tools.build:gradle:8.6.1' classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" } }