如何使用Pyspark中的时间序列数据滑动窗口转换数据

如何使用Pyspark中的时间序列数据滑动窗口转换数据,第1张

概述我试图基于时间序列数据滑动窗口提取功能. 在 Scala中,似乎有一个基于 this post和 the documentation的滑动功能 import org.apache.spark.mllib.rdd.RDDFunctions._sc.parallelize(1 to 100, 10) .sliding(3) .map(curSlice => (curSlice.sum / 我试图基于时间序列数据的滑动窗口提取功能.
在 Scala中,似乎有一个基于 this post和 the documentation的滑动功能

import org.apache.spark.mllib.rdd.RDDFunctions._sc.parallelize(1 to 100,10)  .slIDing(3)  .map(curSlice => (curSlice.sum / curSlice.size))  .collect()

我的问题是PySpark中有类似的功能吗?或者,如果没有这样的功能,我们如何实现类似的滑动窗口转换呢?

解决方法 据我所知,滑动功能不能从Python获得,SlIDinGrdD是私有类,不能在MLlib外部访问.

如果你在现有的RDD上使用滑动,你可以像这样创建穷人滑动:

def slIDing(rdd,n):    assert n > 0    def gen_window(xi,n):        x,i = xi        return [(i - offset,(i,x)) for offset in xrange(n)]    return (        rdd.        zipwithIndex(). # Add index        flatMap(lambda xi: gen_window(xi,n)). # Generate pairs with offset        groupByKey(). # Group to create windows        # Sort values to ensure order insIDe window and drop indices        mapValues(lambda vals: [x for (i,x) in sorted(vals)]).        sortByKey(). # Sort to makes sure we keep original order        values(). # Get values        filter(lambda x: len(x) == n)) # Drop beginning and end

或者你可以尝试这样的东西(在toolz的小帮助下)

from toolz.itertoolz import slIDing_window,concatdef slIDing2(rdd,n):    assert n > 1    def get_last_el(i,iter):        """Return last n - 1 elements from the partition"""        return  [(i,[x for x in iter][(-n + 1):])]    def slIDe(i,iter):        """Prepend prevIoUs items and return slIDing window"""        return slIDing_window(n,concat([last_items.value[i - 1],iter]))    def clean_last_items(last_items):        """Adjust for empty or to small partitions"""        clean = {-1: [None] * (n - 1)}        for i in range(rdd.getNumPartitions()):            clean[i] = (clean[i - 1] + List(last_items[i]))[(-n + 1):]        return {k: tuple(v) for k,v in clean.items()}    last_items = sc.broadcast(clean_last_items(        rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))    return rdd.mapPartitionsWithIndex(slIDe)
总结

以上是内存溢出为你收集整理的如何使用Pyspark中的时间序列数据滑动窗口转换数据全部内容,希望文章能够帮你解决如何使用Pyspark中的时间序列数据滑动窗口转换数据所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/1191039.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-06-03
下一篇2022-06-03

发表评论

登录后才能评论

评论列表(0条)

    保存