Skip to content
On this page

动态数据应用 · 用数据流概念重新理解数据转换


第 18 节 动态数据应用 · 用数据流概念重新理解数据转换

我们在前面的章节(如第 8 节和第 9 节)中学习了如何对我们不同的数据内容进行转换从而得到另外一种新的数据以满足我们的实际需求。

而在动态数据应用的开发中,数据的来源很有可能是多个数据源甚至来自不确定的数据源(统一数据接口的不同数据源),而这些数据源中很有可能包括流式数据源。流式数据源与传统统计学中的数据集有着非常大的差别,一般来说统计学所使用的数据源都来自于静态数据集,也就是说统计结果的时效性依赖于数据集的时效性,一旦数据集过时也就意味着统计结果的过时。而流式数据集的好处是数据内容可以不断地更新,比如股票数据、人口数据、天气数据等等,具有较强的时效性。

而在 JavaScript 的数据应用中,流式数据集的来源可以有很多种类,可以使来自服务端的实时数据(实时通讯或服务器推送等)也可以是来自用户的实时操作(数据录入等)。当然如何接受实时数据并不是我们这本小册所关注的内容,我们要学习的是当我们需要利用流式数据集时,如何更好地处理数据处理或者进行数据统计。

18.1 数据层面 · 不断产生的数据——流式数据

流式数据顾名思义就是以流的方式产生的数据,最显而易见的流式数据就是随着时间发展不断产生新元素的时序数据。

18.1.1 两种不同的流式数据

一般来说数据流是以块(chunk)的形式不断到达数据处理层的,比如时序数据中,数据流会不断地把每一个单位时间内数据块产生和传递到数据消费方。


dataSource.on('tick', function(time, chunkData) {
  console.log(time, chunkData)
})
//=> 1 {...}
//=> 2 {...}
//=> 3 {...}
//=> ...

数据块是数据流的基本形式,但有的时候数据提供方(如一些第三方服务的 SDK)会先行对实时数据进行处理,并通过如响应式(Reactive)的方式将变更后的完整实时数据集提供给数据消费方。 如果需要进行处理和统计的实时数据集形式为后者,那么作为消费方只需要关心如何响应新数据集即可,其余的与静态数据集并无差别。

setInterval(function() {
  console.log(dataSource.dataset.length)
}, 1e3) // run per second
//=> 0
//=> 1
//=> 2
//=> ...

18.1.2 流式数据的处理和计算

我们可以使用两个非常简单且熟悉的例子,来说明这两种不同的数据流的差别,分别为平均数和众数。对一组数据进行数学统计时,这两个数学特征值在中都是非常重要的指标。在第 5 节中我们就已经学习过这两个数学特征值的计算方法。

假设我们有一个实时数据源,它会随着时间不断地产生一个数值(实时温度、股票价格、股票交易量等等),而我们需要计算其在一个时间周期内的平均数和众数。

假若该实时数据源是以完整的数据集提供给我们的时候我们就可以直接使用前面的方法进行计算即可。(_.reduceByKey 方法请参考第 5 节)

function mode(array) {
  if (array.length <= 0) {
    return []
  }

  const countTuples = _.reduceByKey(
    array.map(function(item) {
      return [ item, 1 ]
    }),
    function(left, right) {
      return left + right
    }
  )
  
  return _.chain(countTuples)
    .map(function(tuple) {
      // Reverse the tuple
      return [ tuple[1], tuple[0] ]
    })
    .groupBy(0)
    .mapValues(function(tuples) {
      return tuples.map(function(tuple) {
        return tuple[1]
      })
    })
    .toPairs()
    .map(function(tuple) {
      return [ parseInt(tuple[0]), tuple[1] ]
    })
    .sort(function(leftTuple, rightTuple) {
      return rightTuple[0] - leftTuple[0]
    })
    .head()
    .value()[1]
    .map(function(item) {
      return parseFloat(item)
    })
}

setInterval(function() {
  const mean = _.mean(dataSource.dataset)
  const modeNums = mode(dataSource.dataset)
  
  console.log(dataSource.dataset, mean, modeNums)
}, 1e3)
//=> [] NaN []
//=> [ 1 ] 1 [ 1 ]
//=> [ 1, 2 ] 1.5 [ 1, 2 ]
//=> [ 1, 2, 2 ] 1.67 [ 2 ]
//=> ...

但若数据源是通过数据块的形式提供给消费方时,情况就有点不一样了,我们先来看看平均值的计算。

我们知道平均值的计算公式是数组的总和 um_{i=1}^n x_i 除以数组的长度 n,假设我们要给数组 x 的子集,前 n  1 个元素的数组进行求平均,就可以得到以下两条公式。

egin{gather}
m_n = rac{um_{i=1}^n x_i}{n} 
m_{n-1}= rac{um_{i=1}^{n-1} x_i}{n-1}
nd{gather}

我们通过对 2 式对 1 式进行变形就可以得到我们所需要的公式。

m_n = rac{{m_{n-1} imes (n-1) + x_n}}{n}

这条公式表示了长度为 n 的数组 x 的平均值等于数组 x 的前 n  1 个元素的平均值乘以 n-1 再加上第 n 个元素后除以 n。其中 x_n 就可以理解为数据流中的数据块,我们只需要维护上一个平均值和上一个数据集长度即可。以下例子假设数据集不断“吐出”数值 1,2,3,4 并以此类推。

let mean = 0
let n = 0

dataSource.on('tick', function(time, chunkData) {
  mean = ((mean * n) + chunkData) / (++n)
  console.log(mean)
})
//=> 1
//=> 1.5
//=> 2
//=> 2.5
// ...

但众数就不一样了,众数的每次计算都需要对数据集的整体进行计算,而没办法像平均值一样简单地通过增量式的计算方法进行统计。所以对于以数据块形式进行消费的数据集来说,如统计众数这种整体计算或同比环比等错位计算需求,消费方需要自行维护所接收到的所有数据块,并组合成一个完整的数据集,然后在对数据集进行统计。

当然只针对于统计众数来说,也可以通过维护一个元素和频次的的哈希表来减少计算的次数以满足增量计算的需求,但这也只是一个变相的维护完整数据集而已。当我们需要对数据集进行多种不同的处理和统计需求时,更稳妥的方式还是需要维护完整的数据集。

18.2 逻辑层面 · 流式处理数据 —— 函数串流

上面我们先从数据层面利用数据流的概念重新理解了数据集,而现在我们把目光往上移动,看看如何利用数据流的概念重新理解数据处理。

事实上我们在本小册的前面这么多章节中就已经接触了很多这样的例子了,就比如最常见的词频统计,我们将一个一维的字符串数据一步一步地进行拆分、转换处理,最后得到一个二维的数据集,中间经过了以下步骤:

  1. 分割单词:"foo foo bar"["foo", "foo", "bar"]
  2. 添加频次:["foo", "foo", "bar"][["foo", 1], ["foo", 1], ["bar", 1]]
  3. 合并同类项:[["foo", 1], ["foo", 1], ["bar", 1]][["foo", 2], ["bar", 1]]
  4. 数据形式转换:[["foo", 2], ["bar", 1]][{ word: "foo", count: 2 }, { word: "bar", count: 1 }]

如果我们把每一个步骤都单独以一个函数的形式编写,便可以得到以下处理函数。

// 分割单词
function splitWords(string) {
  return string.split(/\s+/g)
}

// 添加频次
function addCount(words) {
  return words.map(function(word) {
    return [ word, 1 ]
  })
}

// 合并同类项
function sumWordCount(tuples) {
  return _.reduceByKey(tuples, function(left, right) {
    return left + right
  })
}

// 数据形式转换
function convertTuplesToDataset(tuples) {
  return tuples.map(function(tuple) {
    return {
      word: tuple[0],
      count: tuple[1]
    }
  })
}

把这些处理函数组合起来,便完成了整个词频统计流程。

const rawText = "foo foo bar"
const dataset = convertTuplesToDataset(
  sumWordCount(
    addCount(
      splitWords(rawText)
    )
  )
)

console.log(dataset)
//=> [
//=>   { word: "foo", count: 2 },
//=>   { word: "bar", count: 1 }
//=> ]

以函数封装的方式将数据处理的的逻辑抽象出来,第一可以让代码逻辑变得比较简洁干净,二来可以避免代码中副作用(原数据被修改)的产生,减少数据上出现以外的情况。

18.2.1 虚拟实体 Getter

而且暂且抛开词频统计不说,假设我们将上面的四个步骤用 A、B、C、D 表示,可以表示为以下流程。

如果说我们有另外一个数据转换的流程可以复用其中的步骤 A 和 B,并在其后接着完成步骤 E 和 F。

而因为两个流程的起始点都是一样的,所以不仅仅可以复用前端重合的步骤,就连结果也是可以被复用的。这里就要介绍到一种编程语言中的概念 Getter。

Getter 指的是通过定义一个无传入参数函数,在函数中经过若干处理逻辑后返回一个值,而 Getter 的使用方并知道该对象是一个函数,使用的时候只像在调用一个变量。

const object = {
  name: 'iwillwen',
  
  // Getter
  get message() {
    return `Hello ${this.name}`
  }
}

console.log(object.message) //=> Hello iwillwen

object.name = 'juejin'
console.log(object.message) //=> Hello juejin

我们可以把前面的词频统计流程利用 Getter 整合起来。

const wordCountAnalyzer = {
  rawText: '',
  
  get splittedWords() {
    return splitWords(this.rawText)
  },
  
  get wordsWithOne() {
    return addCount(this.splittedWords)
  },
  
  get wordsWithCount() {
    return sumWordCount(this.wordsWithOne)
  },
  
  get wordCountDataset() {
    return convertTuplesToDataset(this.wordsWithCount)
  }
}

wordCountAnalyzer.rawText = 'hello world'
console.log(wordCountAnalyzer.wordCountDataset)
//=> [
//   { word: 'hello', count: 1 },
//   { word: 'world', count: 1 }
// ]

wordCountAnalyzer.rawText = 'Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua'
console.log(wordCountAnalyzer.wordCountDataset)
//=> [
//   { word: 'Lorem', count: 1 },
//   { word: 'ipsum', count: 1 },
//   ...
// ]

我们可以发现当我们修改 wordCountAnalyzer.rawText 后,wordCountAnalyzer.wordCountDataset 也同时随着改变,而中间流程的 Getter 也会随着变化。

应用到我们前面假设的多处理流程中就会是这样的:

const object = {
  originalValue: '<something>',        // ─┐   数据源
                                       //  │
  get A() {                            // <┘
    return methodA(this.originalValue) // ─┐
  },                                   //  │
                                       //  │
  get B() {                            // <┘
    return methodB(this.A)             // ─┬─┐
  },                                   //  │ │
                                       //  │ │
  get C() {                            // <┘ │ 流程 1
    return methodC(this.B)             // ─┐ │
  },                                   //  │ │
                                       //  │ │
  get D() {                            // <┘ │
    return methodD(this.C)             //    │
  },                                   //    │
                                       //    │
  get E() {                            // <──┘ 流程 2
    return methodE(this.B)             // ─┐
  },                                   //  │
                                       //  │
  get F() {                            // <┘
    return methodF(this.E)
  }

}

18.2.2 Getter with Vue.js

而 Getter 的特性在一些开发框架中则通过其内部的响应逻辑完成,比如 Vue.js 中则提供 computed 的接口以完成对其 data 内容的转换。

后面的章节中将会使用 Vue.js 作为本小册的 UI 开发框架,不了解的同学可以自行通过官方文档、其他教程或小册进行学习。

const vm = new Vue({
  // ...
  
  data: {
    originalValue: '<something>'
  },
  
  computed: {
  
    A() {
      return methodA(this.originalValue)
    },
    
    B() {
      return methodB(this.A)
    }
    
  }
})

console.log(vm.B) //=> something processed by method B

18.3 动态地流式处理数据流

上面我们分别介绍了数据流的概念以及通过封装函数和应用 Getter 的方式对数据进行流式的处理。Getter 的好处是可以让数据处理的过程在数据的不断变化中自动化起来,而不断变化恰恰正是数据流的特点,那么将这两者组合起来便会将数据流的流动路径往后延长,让流式数据集也能享受全自动数据处理的优势。

但有的时候在我们的数据处理逻辑中,需要处理的对象并不只有原数据本身,有一些数据处理逻辑是需要引入参数的,比如筛选过滤需要引入一个或多个筛选条件,这样便跟上面 Getter 的“无参数函数”有所冲突了。更甚者,这些需要引入的参数本身也是动态的,无法写死在处理逻辑中。

对于这种情况,我们一般的做法便是将这些参数也看作是一个流式数据源,并将其一同引入到处理流程中,当然其切入的点可能并一定是从流程的最初始位置,而是在其被需要的位置直接引入。

这里我们直接使用 Vue.js 作为例子,我们通过模拟一个不断有新数据产生的实时数据集,然后我们需要通过一个过滤参数将过滤后的数据展示出来。

<!-- Vue.js App -->
<div id="app">
  <label for="type">Type Filter: </label>
  <select name="type" id="type" v-model="typeFilter">
    <option value="none">None</option>
    <option>----------</option>
    <option v-for="type in typesSet" :value="type" :key="type">{{type}}</option>
  </select>
  
  <ul>
    <li v-for="item in filteredDataset" :key="item.timestamp">Timestamp: {{item.timestamp}} - Type: {{item.type}} - Value: {{item.value}}</li>
  </ul>
</div>
// app.js
const vm = new Vue({
  
  el: '#app',
  
  data: {
    dataset: [],
    typeFilter: 'none',
    
    typesSet: [ 'foo', 'bar', 'test' ]
  },
  
  computed: {
    filteredDataset() {
      if (this.typeFilter === 'none') {
        return this.dataset
      }
      
      return this.dataset.filter(item => item.type === this.typeFilter)
    }
  },
  
  mounted() {
    // 模拟流式数据集
    setInterval(() => {
      const randomType = this.typesSet[Math.round(Math.random() * (this.typesSet.length - 1))]

      this.dataset.push({
        type: randomType,
        timestamp: Date.now(),
        value: Math.random().toString(32).substr(2)
      })
    }, 1e3)
  }
  
})

DEMO 在线地址:https://codepen.io/iwillwen/pen/ebEwZE?editors=1010

小结

我们终于在这一小节中介绍到了跟本小册标题相关的动态数据了,这对许多只了解和使用过静态数据集的同学来说会是一个非常新鲜的事物。我们还学习了两种不同的数据流数据和它们对应的处理消费方式,知道了如何使用合适的方式进行相应的处理。

在接下来的章节中我们将更深入动态数据应用的开发中来。

习题

  1. 请模仿平均数和众数的应用,分别举出一对可以应用在数据块和整体流式数据集的例子;
  2. 请利用 18.3 中的 DEMO,对其中的动态数据集和过滤条件进行添加和调整,寻找更多的可能性;
  3. 请模仿 18.3 的 DEMO,将我们前面所学习到的词频统计使用到 Vue.js 应用中。