
在 Scala中,似乎有一个基于 this post和 the documentation的滑动功能
import org.apache.spark.mllib.rdd.RDDFunctions._sc.parallelize(1 to 100,10) .slIDing(3) .map(curSlice => (curSlice.sum / curSlice.size)) .collect()
我的问题是PySpark中有类似的功能吗?或者,如果没有这样的功能,我们如何实现类似的滑动窗口转换呢?
解决方法 据我所知,滑动功能不能从Python获得,SlIDinGrdD是私有类,不能在MLlib外部访问.如果你在现有的RDD上使用滑动,你可以像这样创建穷人滑动:
def slIDing(rdd,n): assert n > 0 def gen_window(xi,n): x,i = xi return [(i - offset,(i,x)) for offset in xrange(n)] return ( rdd. zipwithIndex(). # Add index flatMap(lambda xi: gen_window(xi,n)). # Generate pairs with offset groupByKey(). # Group to create windows # Sort values to ensure order insIDe window and drop indices mapValues(lambda vals: [x for (i,x) in sorted(vals)]). sortByKey(). # Sort to makes sure we keep original order values(). # Get values filter(lambda x: len(x) == n)) # Drop beginning and end
或者你可以尝试这样的东西(在toolz的小帮助下)
from toolz.itertoolz import slIDing_window,concatdef slIDing2(rdd,n): assert n > 1 def get_last_el(i,iter): """Return last n - 1 elements from the partition""" return [(i,[x for x in iter][(-n + 1):])] def slIDe(i,iter): """Prepend prevIoUs items and return slIDing window""" return slIDing_window(n,concat([last_items.value[i - 1],iter])) def clean_last_items(last_items): """Adjust for empty or to small partitions""" clean = {-1: [None] * (n - 1)} for i in range(rdd.getNumPartitions()): clean[i] = (clean[i - 1] + List(last_items[i]))[(-n + 1):] return {k: tuple(v) for k,v in clean.items()} last_items = sc.broadcast(clean_last_items( rdd.mapPartitionsWithIndex(get_last_el).collectAsMap())) return rdd.mapPartitionsWithIndex(slIDe) 总结 以上是内存溢出为你收集整理的如何使用Pyspark中的时间序列数据滑动窗口转换数据全部内容,希望文章能够帮你解决如何使用Pyspark中的时间序列数据滑动窗口转换数据所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)