package dispatch
import io.Source
import collection.Map
import collection.immutable.{Map => IMap}
import util.DynamicVariable
import java.io.{InputStream,OutputStream,BufferedInputStream,BufferedOutputStream,File}
import java.net.URI
import java.util.zip.GZIPInputStream
import org.apache.http._
import org.apache.http.client._
import org.apache.http.impl.client.{DefaultHttpClient, BasicCredentialsProvider}
import org.apache.http.client.methods._
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.utils.URLEncodedUtils
import org.apache.http.entity.{StringEntity,FileEntity}
import org.apache.http.message.BasicNameValuePair
import org.apache.http.params.{HttpProtocolParams, BasicHttpParams}
import org.apache.http.util.EntityUtils
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials, Credentials}
import org.apache.commons.codec.binary.Base64.encodeBase64
case class StatusCode(code: Int, contents:String)
extends Exception("Exceptional response code: " + code + "\n" + contents)
trait Logger { def info(msg: String, items: Any*) }
class Http extends HttpExecutor {
val client = new ConfiguredHttpClient
lazy val log: Logger = try {
new Logger {
def getObject(name: String) = Class.forName(name + "$").getField("MODULE$").get(null)
val delegate = getObject("net.lag.logging.Logger")
.asInstanceOf[{ def get(n: String): { def ifInfo(o: => Object) } }]
.get(classOf[Http].getCanonicalName)
def info(msg: String, items: Any*) { delegate.ifInfo(msg.format(items: _*)) }
}
} catch {
case _: ClassNotFoundException | _: NoClassDefFoundError => new Logger {
def info(msg: String, items: Any*) {
println("INF: [console logger] dispatch: " + msg.format(items: _*))
}
}
}
def execute(host: HttpHost, req: HttpUriRequest): HttpResponse = {
log.info("%s %s%s", req.getMethod, host, req.getURI)
client.execute(host, req)
}
val execute: (Option[HttpHost], Option[Credentials], HttpUriRequest) => HttpResponse = {
case (Some(host), Some(creds), req) =>
client.credentials.withValue(Some((new AuthScope(host.getHostName, host.getPort), creds)))(execute(host, req))
case (None, Some(creds), _) => error("Credentials specified without explicit host")
case (Some(host), _, req) => execute(host, req)
case (_, _, req) =>
log.info("%s %s", req.getMethod, req.getURI)
client.execute(req)
}
type HttpPackage[T] = T
def pack[T](result: => T) = result
}
trait HttpExecutor {
type HttpPackage[T]
def pack[T](result: => T): HttpPackage[T]
val execute: (Option[HttpHost], Option[Credentials], HttpUriRequest) => HttpResponse
final def x[T](hand: Handler[T]): HttpPackage[T] = x(hand.request)(hand.block)
final def x [T](req: Request)(block: Handler.F[T]) = pack {
val res = execute(req.host, req.creds, req.req)
val ent = res.getEntity match {
case null => None
case ent => Some(ent)
}
try { block(res.getStatusLine.getStatusCode, res, ent) }
finally { ent foreach (_.consumeContent) }
}
final def when[T](chk: Int => Boolean)(hand: Handler[T]) = x(hand.request) {
case (code, res, ent) if chk(code) => hand.block(code, res, ent)
case (code, _, Some(ent)) => throw StatusCode(code, EntityUtils.toString(ent, Request.factoryCharset))
case (code, _, _) => throw StatusCode(code, "[no entity]")
}
final def also[A,B](hand: Handler[B])(block: Handler.F[A]) =
x(hand.request) { (code, res, ent) => ( hand.block(code, res, ent), block(code, res, ent) ) }
final def apply[T](hand: Handler[T]) = (this when {code => (200 to 204) contains code})(hand)
}
object /\ extends Request(None)
object :/ {
def apply(hostname: String, port: Int): Request =
new Request(Some(new HttpHost(hostname, port)))
def apply(hostname: String): Request = new Request(Some(new HttpHost(hostname)))
}
object / {
def apply(path: String) = /\ / path
}
object Request {
type Xf = HttpRequestBase => HttpRequestBase
def uri_xf(sxf: String => String)(req: HttpRequestBase) = {
req.setURI(URI.create(sxf(req.getURI.toString)))
req
}
def mimic[T <: HttpRequestBase](dest: T)(req: HttpRequestBase) = {
dest.setURI(req.getURI)
dest.setHeaders(req.getAllHeaders)
dest
}
val factoryCharset = org.apache.http.protocol.HTTP.UTF_8
}
case class Handler[T](request: Request, block: Handler.F[T]) {
def ~> [R](after: T => R) = Handler(request, (code, res, ent) => after(block(code,res,ent)))
def apply[R](next: (Int, HttpResponse, Option[HttpEntity], () => T) => R) =
new Handler(request, {(code, res, ent) =>
next(code, res, ent, () => block(code, res, ent))
})
}
object Handler {
type F[T] = (Int, HttpResponse, Option[HttpEntity]) => T
def apply[T](req: Request, block: HttpEntity => T): Handler[T] =
Handler(req, { (code, res, ent) => ent match {
case Some(ent) => block(ent)
case None => error("response has no entity: " + res)
} } )
}
trait Post[P <: Post[P]] extends HttpPost { self: P =>
def oauth_values: IMap[String, Any]
def add(more: Map[String, Any]): P
}
class SimplePost(val oauth_values: IMap[String, Any], charset: String) extends Post[SimplePost] {
@deprecated def this(oauth_values: IMap[String, Any]) = this(oauth_values, Request.factoryCharset)
this setEntity new UrlEncodedFormEntity(Http.map2ee(oauth_values), charset)
def add(more: Map[String, Any]) = new SimplePost(oauth_values ++ more.elements, charset)
}
class Request(
val host: Option[HttpHost],
val creds: Option[Credentials],
val xfs: List[Request.Xf],
val defaultCharset: String
) extends Handlers {
def this(str: String) = this(None, None, Request.uri_xf(cur => cur + str)_ :: Nil, Request.factoryCharset)
def this(req: Request) = this(req.host, req.creds, req.xfs, req.defaultCharset)
def this(host: Option[HttpHost]) = this(host, None, Nil, Request.factoryCharset)
def next(xf: Request.Xf) = new Request(host, creds, xf :: xfs, defaultCharset)
def next_uri(sxf: String => String) = next(Request.uri_xf(sxf))
def as (name: String, pass: String) =
new Request(host, Some(new UsernamePasswordCredentials(name, pass)), xfs, defaultCharset)
def as_! (name: String, pass: String) = this <:< IMap("Authorization" -> (
"Basic " + new String(encodeBase64("%s:%s".format(name, pass).getBytes))
))
def secure = new Request(host map {
h => new HttpHost(h.getHostName, h.getPort, "https")
} orElse { error("secure requires an explicit host") }, creds, xfs, defaultCharset)
def <& (req: Request) = new Request(host orElse req.host, creds orElse req.creds, req.xfs ::: xfs, defaultCharset)
def >\ (charset: String) = new Request(host, creds, xfs, charset)
def >& [T] (other: Handler[T]) = new Handler(this <& other.request, other.block)
def / (path: String) = next_uri { _ + "/" + path }
def <:< (values: Map[String, String]) = next { req =>
values foreach { case (k, v) => req.addHeader(k, v) }
req
}
def gzip = this <:< IMap("Accept-Encoding" -> "gzip")
def <<< (body: Any) = next {
val m = new HttpPut
m setEntity new StringEntity(body.toString, defaultCharset)
HttpProtocolParams.setUseExpectContinue(m.getParams, false)
Request.mimic(m)_
}
def <<< (file: File, content_type: String) = next {
val m = new HttpPut
m setEntity new FileEntity(file, content_type)
Request.mimic(m) _
}
def << (values: Map[String, Any]) = next {
case p: Post[_] => Request.mimic(p.add(values))(p)
case r => Request.mimic(new SimplePost(IMap.empty ++ values, defaultCharset))(r)
}
def << (string_body: String) = next {
val m = new HttpPost
m setEntity new StringEntity(string_body, defaultCharset)
Request.mimic(m)_
}
def <<? (values: Map[String, Any]) = next_uri { uri =>
if (values.isEmpty) uri
else uri + (
if (uri contains '?') '&' + Http.q_str(values) else (Http ? values)
)
}
def POST = next { Request.mimic(new SimplePost(IMap.empty, defaultCharset))_ }
def DELETE = next { Request.mimic(new HttpDelete)_ }
def HEAD = next { Request.mimic(new HttpHead)_ }
def req = {
val start: HttpRequestBase = new HttpGet("")
(xfs :\ start) { (a,b) => a(b) }
}
def to_uri = Http.to_uri(host, req)
val request = this
}
trait Handlers {
val request: Request
def >> [T] (block: (InputStream, String) => T) = Handler(request, { ent =>
val stm = (ent.getContent, ent.getContentEncoding) match {
case (stm, null) => stm
case (stm, enc) if enc.getValue == "gzip" => new GZIPInputStream(stm)
case (stm, _) => stm
}
val charset = EntityUtils.getContentCharSet(ent) match {
case null => request.defaultCharset
case charset => charset
}
block(stm, charset)
} )
def >> [T] (block: InputStream => T): Handler[T] = >> { (stm, charset) => block(stm) }
def >~ [T] (block: Source => T) = >> { (stm, charset) =>
import java.io._
def read(reader: BufferedReader, buf: StringBuilder) {
val ch = reader.read()
if (ch != -1)
read(reader, buf.append(ch.asInstanceOf[Char]))
}
val buf = new StringBuilder()
read(new BufferedReader(new InputStreamReader(stm, charset)), buf)
block(Source.fromString(buf.toString))
}
def as_source = >~ { so => so }
def >- [T] (block: String => T) = >~ { so => block(so.mkString) }
def as_str = >- { s => s }
def >>> [OS <: OutputStream](out: OS) = Handler(request, { ent => ent.writeTo(out); out })
def <> [T] (block: xml.Elem => T) = >> { stm => block(xml.XML.load(stm)) }
def >:> [T] (block: IMap[String, Set[String]] => T) =
Handler(request, (_, res, _) =>
block((IMap[String, Set[String]]().withDefaultValue(Set()) /: res.getAllHeaders) {
(m, h) => m + (h.getName -> (m(h.getName) + h.getValue))
} )
)
def >| = Handler(request, (code, res, ent) => ())
def >+ [A, B] (block: Handlers => (Handler[A], Handler[B])) = {
new Handler[(A,B)] ( request, { (code, res, opt_ent) =>
val (a, b) = block(new Handlers { val request = /\ })
(a.block(code, res, opt_ent), b.block(code,res,opt_ent))
} )
}
def >+> [T] (block: Handlers => Handler[Handler[T]]) = {
new Handler[T] ( request, { (code, res, opt_ent) =>
(block(new Handlers { val request = /\})).block(code, res, opt_ent).block(code, res, opt_ent)
} )
}
}
class ConfiguredHttpClient extends DefaultHttpClient {
override def createHttpParams = {
val params = new BasicHttpParams
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1)
HttpProtocolParams.setContentCharset(params, Request.factoryCharset)
HttpProtocolParams.setUseExpectContinue(params, false)
params
}
val credentials = new DynamicVariable[Option[(AuthScope, Credentials)]](None)
setCredentialsProvider(new BasicCredentialsProvider {
override def getCredentials(scope: AuthScope) = credentials.value match {
case Some((auth_scope, creds)) if scope.`match`(auth_scope) >= 0 => creds
case _ => null
}
})
}
trait Builder[T] { def product:T }
object Http extends Http with Threads {
implicit def str2req(str: String) = new Request(str)
implicit def builder2product[T](builder: Builder[T]) = builder.product
def map2ee(values: Map[String, Any]) = java.util.Arrays asList (
values.toSeq map { case (k, v) => new BasicNameValuePair(k, v.toString) } toArray : _*
)
def % (s: String) = java.net.URLEncoder.encode(s, Request.factoryCharset)
def -% (s: String) = java.net.URLDecoder.decode(s, Request.factoryCharset)
def q_str (values: Map[String, Any]) = URLEncodedUtils.format(map2ee(values), Request.factoryCharset)
def ? (values: Map[String, Any]) = if (values.isEmpty) "" else "?" + q_str(values)
def to_uri(host: Option[HttpHost], req: HttpRequestBase) =
URI.create(host.map(_.toURI).getOrElse("")).resolve(req.getURI)
}