RxJava2 Android

Reactive sensor monitoring - Java vs Kotlin

Introduction

Most smartphones have built-in sensors, like an accelerometer, gyroscope, or magnetometer. Accessing, storing and processing their data in real time can be a complicated and a highly error-prone task. This blog post illustrates how can we leverage Kotlin and RxJava to make this process concise and effective.

Traditional way

The first approach is the simplest and most common way to react to data changes or new events. Your Activity or Service needs to implement the SensorEventListener interface, with the following two functions to be overridden. onAccuracyChanged is called when the accuracy of the sensors has changed, and onSensorChanged is called when the data is updated the signature contains the type of the sensor, the timestamp, the accuracy value, and the measured values.
Also, you’ll need to register (and unregister) the listeners with the sensor service, with a specified delay between the measurements.

public class MainActivity extends AppCompatActivity implements SensorEventListener {
.
.
  @Override
  protected void onCreate(Bundle savedInstanceState) {
  .
  .
    manager = (SensorManager) getSystemService(SENSOR_SERVICE);
  }

  @Override
  protected void onResume() {
    super.onResume();
    manager.registerListener(this,
    manager.getDefaultSensor(Sensor.TYPE_ACCELEROMETER), 100);
  }

  @Override
  protected void onPause() {
    super.onPause();
    manager.unregisterListener(this);
  }

  @Override
  public void onSensorChanged(SensorEvent sensorEvent) {
    sensorTypeText.setText(sensorEvent.sensor.toString());
    sensorValueText.setText(Arrays.toString(sensorEvent.values));
  }

  @Override
  public void onAccuracyChanged(Sensor sensor, int i) {
  }
}

Issues

This solution is super simple, and can be used in lots of use cases, but has its downsides:

  • Inconvenient and difficult correction of hardware-based issues
  • Data collection and data processing runs in parallel

Introducing reactive programming

Since we're dealing with async events, RxJava and RxAndroid come to mind as an option to make our code more convenient. Let's see how can we improve our code with them!

First, we wrap the SensorEventListener callbacks into an Observable. Now we can access the data through this Observable with its .subscribe() method.

private BehaviorSubject<SensorEvent> proxy = BehaviorSubject.create();

public void onSensorChanged(SensorEvent sensorEvent) {
    proxy.onNext(sensorEvent);
}

proxy.subscribe(new DefaultObserver<SensorEvent>() {
      @Override
      public void onNext(@NonNull SensorEvent sensorEvent) {
        sensorTypeText.setText(sensorEvent.sensor.getName());
        sensorValueText.setText(Arrays.toString(sensorEvent.values));
      }

      @Override
      public void onError(@NonNull Throwable e) {
      }

      @Override
      public void onComplete() {
      }
    });

The Subject is a special type that can act both as an Observer and an Observable. In this case, we use it to push the new sensor values to every suubscriber.

Frequency correction

This issue derives from that fact that our approach uses a pushed typed event handling methodology. If precise sample rate is a requirement, this approach could cause us some problems. High memory usage, and hardware outage are just two of the possible reasons behind the delays in the data flow. Changing the direction of the data forces our system to return a value in every millisecond or second. And voila - we successfully migrated from a push-based to a pull-based approach.

Observable.interval(200, TimeUnit.MILLISECONDS)
      .map(new Function<Long, SensorEvent>() {
        @Override
        public SensorEvent apply(@NonNull Long aLong) throws Exception {
          return proxy.getValue();
        }
      })
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new DefaultObserver<SensorEvent>() {
      @Override
      public void onNext(@NonNull SensorEvent sensorEvent) {
        sensorTypeText.setText(sensorEvent.sensor.getName());
        sensorValueText.setText(Arrays.toString(sensorEvent.values));
      }

      @Override
      public void onError(@NonNull Throwable e) {
      }

      @Override
      public void onComplete() {
      }
    });

The Observable.interval() gives us an accurate timer and ensures the consistent time frame between the values. The map function contains the actual sensor querying, passing the data along. Important - this solution does not force the hardware to present a new value, it just pushes the latest one available. And thus we solved our frequency problem.

Data processing

Imagine a scenario where you need to batch the data into an x-second (let's say x=13) long sampling window, persist them, separate values based on the sensor type and return the average of each batch. Doing this the traditional Android way is a lengthy and burdensome task, however, in the reactive world, the features for solving these problems are already in our hands - we just need to put the pieces together.

For mathematical functions, like average calculation, we'll use RxJava2 extensions, an extension library developed by the maintainer of RxJava.

Observable.interval(2000, TimeUnit.MILLISECONDS)
  .map(new Function<Long, SensorEvent>() {
    @Override
    public SensorEvent apply(@NonNull Long aLong) throws Exception {
      return proxy.getValue();
    }
  })
  .window(13, TimeUnit.SECONDS)
  //.customSavingFunction()
  .flatMap(new Function<Observable<SensorEvent>, Observable<Map<String, Collection<SensorEvent>>>>() {
    @Override
    public Observable<Map<String, Collection<SensorEvent>>> apply(Observable<SensorEvent> inner) throws Exception {
      return inner.toMultimap(new Function<SensorEvent, String>() {
        @Override
        public String apply(SensorEvent event) throws Exception {
          return event.sensor.getName();
        }
      }).toObservable();
    }
  })
  .map(new Function<Map<String, Collection<SensorEvent>>, Map<String, Float>>() {
    @Override
    public Map<String, Float> apply(Map<String, Collection<SensorEvent>> map) throws Exception {
      Map<String, Float> results = new HashMap<>();
      for (Map.Entry<String, Collection<SensorEvent>> entry : map.entrySet()) {
        Float value = MathObservable.averageFloat(
          Observable.fromIterable(entry.getValue()).map(new Function<SensorEvent, Float>() {
            @Override
            public Float apply(SensorEvent event) throws Exception {
              return event.values[0];
            }
          })).blockingFirst();
        results.put(entry.getKey(), value);
      }
      return results;
    }
  })
  .subscribe(new Consumer<Map<String, Float>>() {
    @Override
    public void accept(Map<String, Float> map) throws Exception {
      for (Map.Entry<String, Float> entry : map.entrySet()) {
        System.out.println(entry.getKey() + ": " + entry.getValue());
      }
    }
  });

The problem can be split into some smaller, more exact subtasks. The window function collects data in batches according to its parameter. Then the flatMap sorts the SensorEvents according to their type, and puts them into a regular Map where the key is the Sensor type, and the the corresponding values are wrapped into a List. The last step is calculating the average every list. This is achieved in a map function, that converts the SensorEvent type to Double and gets the first value from the values array, which in this case is the X axis data. After simple average calculation, we put the result into a map, and the task is completed.

Hello Kotlin

I'm sure you noticed this solution is everything but not clear and easy to read. Kotlin to the rescue - let's make our code style great again!🚀 We concentrate on the ugliest part of the application: processing sensor data.

Observable.interval(2000, TimeUnit.MILLISECONDS)
  .map { proxy.value }
  .window(13, TimeUnit.SECONDS)
  .flatMap { list -> list.toMultimap { event -> event.sensor.name }.toObservable() }
  .map { map -> map.mapValues { it.value.map { it.values[0] }.average() } }
  .subscribe { map ->
    for ((key, value) in map) {
      println(key + ": " + value)
    }
  }

44 lines reduced to 10.😍 With lambdas, the anonymous classes are gone, and thanks to Kotlin collections, map transformations with calculating an average is as simple as it looks like.

Takeaway

By the end of the day, our application collects sensor data with a consistent frequency rate, and processes these values during in the meantime. Sounds like a difficult and time-consuming task, but thanks to Kotlin and Rx, all only takes a couple of tens of code.